Purge_Registres_D-c-s_INSEE/purge-registres-deces-insee/bdd_insee.py

256 lines
11 KiB
Python
Raw Permalink Normal View History

2020-07-17 21:49:45 +02:00
"""
Copyright (c) 2020 Sdj Geek
Voir le fichier LICENSE
Classe d'accès aux données INSEE dans la base SQLite
2020-06-09 00:16:00 +02:00
"""
import os
2020-06-07 00:52:15 +02:00
import peewee
import requests
import datetime
2020-06-07 00:52:15 +02:00
2020-06-09 00:16:00 +02:00
# Objet de connexion à la base
2020-06-07 00:52:15 +02:00
database = peewee.SqliteDatabase(None)
# Définition du modèle de donnée
class BaseModel(peewee.Model):
class Meta:
database = database
class LastName(BaseModel):
"""Classe contenant tous les noms de famille
"""
last_name = peewee.CharField(unique=True)
class FirstName(BaseModel):
"""Classe contenant tous les prénoms
"""
first_name = peewee.CharField(unique=True)
class Person(BaseModel):
"""Classe représentant une personne dans la base
"""
first_name = peewee.ForeignKeyField(FirstName)
last_name = peewee.ForeignKeyField(LastName)
is_woman = peewee.BooleanField()
date_naissance = peewee.DateField(index=True)
code_lieu_naissance = peewee.CharField()
commune_naissance = peewee.CharField()
pays_naissance = peewee.CharField()
date_deces = peewee.DateField()
code_lieu_deces = peewee.CharField()
numero_act_deces = peewee.CharField()
class ImportedDataset(BaseModel):
"""
"""
dataset = peewee.CharField(unique=True)
# Gestion de l'accès aux données
2020-06-07 00:52:15 +02:00
class BddInsee:
2020-06-09 00:16:00 +02:00
"""Classe encapsulant les accès aux données.
"""
2020-06-07 00:52:15 +02:00
def __init__(self, chemin_base_donnees=None):
2020-06-09 00:16:00 +02:00
"""Initialisation
:param chemin_base_donnees: chemin vers le fichier SQLite
"""
if not chemin_base_donnees:
chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite")
new = not os.path.isfile(chemin_base_donnees)
2020-06-07 00:52:15 +02:00
database.init(chemin_base_donnees)
if new:
database.create_tables([LastName, FirstName, Person, ImportedDataset])
2020-06-07 00:52:15 +02:00
# Fonctions d'accès aux données
2020-06-07 00:52:15 +02:00
def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance):
2020-06-09 00:16:00 +02:00
"""Rechercher une personne dans la base
:param first_name: prénom de la personne à rechercher
:param last_name: nom de famille de la personne à rechercher
:param maiden_name: nom de jeune fille de la personne à rechercher (None si non défini)
:param annee_naissance: année de naissance de la personne à rechercher
:param mois_naissance: mois de naissance de la personne à rechercher
:param jour_naissance: jour de naissance de la personne à rechercher
:returns: liste contenant un dictionnaire par personne trouvée
"""
2020-06-07 00:52:15 +02:00
if maiden_name:
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
# & (Person.mois_naissance == int(mois_naissance))
# & (Person.jour_naissance == int(jour_naissance))
# & (Person.first_name.contains(first_name.upper()))
# & ((Person.last_name.contains(last_name.upper()))
# | Person.last_name.contains(maiden_name.upper())))
query = Person.select().join(FirstName).switch(Person).join(LastName)\
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
& (FirstName.first_name.contains(first_name.upper()))
& ((LastName.last_name.contains(last_name.upper()))
| LastName.last_name.contains(maiden_name.upper())))
2020-06-07 00:52:15 +02:00
else:
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
# & (Person.mois_naissance == int(mois_naissance))
# & (Person.jour_naissance == int(jour_naissance))
# & (Person.first_name.contains(first_name.upper()))
# & (Person.last_name.contains(last_name.upper())))
query = Person.select().join(FirstName).switch(Person).join(LastName)\
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
& (FirstName.first_name.contains(first_name.upper()))
& (LastName.last_name.contains(last_name.upper())))
result = list()
if query:
for row in query:
result.append({
'first_name': row.first_name.first_name,
'last_name': row.last_name.last_name,
'date_naissance': row.date_naissance,
'code_lieu_naissance': row.code_lieu_naissance,
'date_deces': row.date_deces,
'code_lieu_deces': row.code_lieu_deces
})
return result
# Fonctions d'import des données
def parse_insee_data(self, data_text):
"""Parse le texte d'un fichier de l'INSEE
:param data_text: texte contenu dans un fichier de l'INSEE
:returns: liste de dictionnaires contenant les informations à insérer
"""
data_list = []
for line_number, line in enumerate(data_text.split('\n')):
if not line:
continue
try:
try:
last_name, first_name = line[0:80].strip()[:-1].split("*")
except ValueError:
if '*' not in line[0:80]:
continue
raise
date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89]))
date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162]))
data_list.append({'last_name': last_name.upper(),
'first_name': first_name.strip('/').upper(),
'is_woman': (line[80] == "2"),
'date_naissance': date_naissance,
'code_lieu_naissance': line[89:94].upper(),
'commune_naissance': line[94: 124].strip().upper(),
'pays_naissance': line[124: 154].strip().upper(),
'date_deces': date_deces,
'code_lieu_deces': line[162:167].upper(),
'numero_act_deces': line[167:].strip().upper()})
except ValueError as e:
print(f"Erreur pour parser ligne numéro {line_number}.")
print(line)
raise
return data_list
def parse_date(self, year, month, day):
if year == 0:
return datetime.date.min
if month == 0:
month = 1
elif month > 12:
month = 12
if day == 0:
day = 1
elif day > 30 and month in [4, 6, 9, 11]:
day = 30
elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]:
day = 31
elif day > 29 and month == 2:
day = 29
try:
date = datetime.date(year, month, day)
except ValueError:
if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880,
1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920,
1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960,
1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000,
2004, 2008, 2012, 2016, 2020]:
day = 28
date = datetime.date(year, month, day)
return date
def import_url_list(self, file_path):
"""Importer les données à partir d'un fichier contenant les URL
Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit
vers un fichier de l'insée qui doit être importé.
:param file_path: chemin vers le fichier d'URL
"""
if not os.path.isfile(file_path):
print(f"Le fichier {file_path} est introuvable.")
return
with open(file_path, 'r') as file_content:
for line in file_content.readlines():
try:
dataset, url = line.split(' ')
url = url.strip('\n')
except:
print(f"Le fichier {file_path} contient une ligne non conforme : {line}.")
continue
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset)
if not len(dataset_id) == 0:
print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.")
continue
print(f"Importation du dataset {dataset}")
print(f"Téléchargement de : '{url}'.")
result = requests.get(url)
try:
result.raise_for_status()
except:
print(f"Erreur lors du téléchargement du fichier {url}.")
continue
print("Parse des données téléchargées")
data_list = self.parse_insee_data(result.text)
self.import_data_list(data_list, dataset)
def import_data_list(self, data_list, dataset_name):
2020-06-09 00:16:00 +02:00
"""Insérer des données dans la base
:param data_list: liste de dictionnaires contenant les informations à insérer
:param dataset_name: nom du jeu de données
2020-06-09 00:16:00 +02:00
"""
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name)
if not len(dataset_id) == 0:
print(f"Le dataset {dataset_name} a déjà été importé.")
return
print(f"Import en base des {len(data_list)} personnes.")
with database.atomic():
for data in data_list:
# Gestion de la clé étrangère prénom
first_name = data.pop('first_name')
try:
first_name_id = FirstName.get(FirstName.first_name == first_name)
except peewee.DoesNotExist:
first_name_id = FirstName.create(first_name=first_name)
data['first_name'] = first_name_id
# Gestion de la clé étrangère nom de famille
last_name = data.pop('last_name')
try:
last_name_id = LastName.get(LastName.last_name == last_name)
except peewee.DoesNotExist:
last_name_id = LastName.create(last_name=last_name)
data['last_name'] = last_name_id
Person.insert(**data).execute()
ImportedDataset.insert(dataset=dataset_name).execute()
print("Import terminé")