From 19126e57aab3972986604556f11664a81f0a1d61 Mon Sep 17 00:00:00 2001 From: Sdj GeeK Date: Wed, 30 Sep 2020 13:33:59 +0200 Subject: [PATCH] =?UTF-8?q?Optimisation=20de=20l=20abase=20INSEE=20pour=20?= =?UTF-8?q?acc=C3=A9l=C3=A9rer=20les=20recherches.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- purge-registres-deces-insee/bdd_insee.py | 247 +++++++++++++++++---- purge-registres-deces-insee/membre_base.py | 20 +- 2 files changed, 216 insertions(+), 51 deletions(-) diff --git a/purge-registres-deces-insee/bdd_insee.py b/purge-registres-deces-insee/bdd_insee.py index 9d2ba68..de68689 100755 --- a/purge-registres-deces-insee/bdd_insee.py +++ b/purge-registres-deces-insee/bdd_insee.py @@ -8,50 +8,76 @@ Classe d'accès aux données INSEE dans la base SQLite import os import peewee +import requests +import datetime # Objet de connexion à la base database = peewee.SqliteDatabase(None) +# Définition du modèle de donnée +class BaseModel(peewee.Model): + class Meta: + database = database + + +class LastName(BaseModel): + """Classe contenant tous les noms de famille + + """ + last_name = peewee.CharField(unique=True) + + +class FirstName(BaseModel): + """Classe contenant tous les prénoms + + """ + first_name = peewee.CharField(unique=True) + + +class Person(BaseModel): + """Classe représentant une personne dans la base + + """ + first_name = peewee.ForeignKeyField(FirstName) + last_name = peewee.ForeignKeyField(LastName) + is_woman = peewee.BooleanField() + date_naissance = peewee.DateField(index=True) + code_lieu_naissance = peewee.CharField() + commune_naissance = peewee.CharField() + pays_naissance = peewee.CharField() + date_deces = peewee.DateField() + code_lieu_deces = peewee.CharField() + numero_act_deces = peewee.CharField() + + +class ImportedDataset(BaseModel): + """ + + """ + dataset = peewee.CharField(unique=True) + +# Gestion de l'accès aux données class BddInsee: """Classe encapsulant les accès aux données. """ - def __init__(self, chemin_base_donnees): + def __init__(self, chemin_base_donnees=None): """Initialisation :param chemin_base_donnees: chemin vers le fichier SQLite """ + if not chemin_base_donnees: + chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite") new = not os.path.isfile(chemin_base_donnees) database.init(chemin_base_donnees) if new: - database.create_tables([self.Person]) - - class Person(peewee.Model): - """Classe représentant une personne dans la base - - """ - last_name = peewee.CharField() - first_name = peewee.CharField() - is_woman = peewee.BooleanField() - annee_naissance = peewee.IntegerField() - mois_naissance = peewee.IntegerField() - jour_naissance = peewee.IntegerField() - code_lieu_naissance = peewee.CharField() - commune_naissance = peewee.CharField() - pays_naissance = peewee.CharField() - annee_deces = peewee.IntegerField() - mois_deces = peewee.IntegerField() - jour_deces = peewee.IntegerField() - code_lieu_deces = peewee.CharField() - numero_act_deces = peewee.CharField() - - class Meta: - database = database + database.create_tables([LastName, FirstName, Person, ImportedDataset]) + # Fonctions d'accès aux données def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance): """Rechercher une personne dans la base @@ -65,26 +91,165 @@ class BddInsee: """ if maiden_name: - return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance)) - & (self.Person.mois_naissance == int(mois_naissance)) - & (self.Person.jour_naissance == int(jour_naissance)) - & (self.Person.first_name.contains(first_name.upper())) - & ((self.Person.last_name.contains(last_name.upper())) - | self.Person.last_name.contains(maiden_name.upper()))) + # return Person.select().where((Person.annee_naissance == int(annee_naissance)) + # & (Person.mois_naissance == int(mois_naissance)) + # & (Person.jour_naissance == int(jour_naissance)) + # & (Person.first_name.contains(first_name.upper())) + # & ((Person.last_name.contains(last_name.upper())) + # | Person.last_name.contains(maiden_name.upper()))) + query = Person.select().join(FirstName).switch(Person).join(LastName)\ + .where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance))) + & (FirstName.first_name.contains(first_name.upper())) + & ((LastName.last_name.contains(last_name.upper())) + | LastName.last_name.contains(maiden_name.upper()))) else: - return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance)) - & (self.Person.mois_naissance == int(mois_naissance)) - & (self.Person.jour_naissance == int(jour_naissance)) - & (self.Person.first_name.contains(first_name.upper())) - & (self.Person.last_name.contains(last_name.upper()))) + # return Person.select().where((Person.annee_naissance == int(annee_naissance)) + # & (Person.mois_naissance == int(mois_naissance)) + # & (Person.jour_naissance == int(jour_naissance)) + # & (Person.first_name.contains(first_name.upper())) + # & (Person.last_name.contains(last_name.upper()))) + query = Person.select().join(FirstName).switch(Person).join(LastName)\ + .where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance))) + & (FirstName.first_name.contains(first_name.upper())) + & (LastName.last_name.contains(last_name.upper()))) + result = list() + if query: + for row in query: + result.append({ + 'first_name': row.first_name.first_name, + 'last_name': row.last_name.last_name, + 'date_naissance': row.date_naissance, + 'code_lieu_naissance': row.code_lieu_naissance, + 'date_deces': row.date_deces, + 'code_lieu_deces': row.code_lieu_deces + }) + return result - def import_data_list(self, data_list): + # Fonctions d'import des données + def parse_insee_data(self, data_text): + """Parse le texte d'un fichier de l'INSEE + + :param data_text: texte contenu dans un fichier de l'INSEE + :returns: liste de dictionnaires contenant les informations à insérer + """ + data_list = [] + for line_number, line in enumerate(data_text.split('\n')): + if not line: + continue + try: + try: + last_name, first_name = line[0:80].strip()[:-1].split("*") + except ValueError: + if '*' not in line[0:80]: + continue + raise + date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89])) + date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162])) + data_list.append({'last_name': last_name.upper(), + 'first_name': first_name.strip('/').upper(), + 'is_woman': (line[80] == "2"), + 'date_naissance': date_naissance, + 'code_lieu_naissance': line[89:94].upper(), + 'commune_naissance': line[94: 124].strip().upper(), + 'pays_naissance': line[124: 154].strip().upper(), + 'date_deces': date_deces, + 'code_lieu_deces': line[162:167].upper(), + 'numero_act_deces': line[167:].strip().upper()}) + except ValueError as e: + print(f"Erreur pour parser ligne numéro {line_number}.") + print(line) + raise + return data_list + + def parse_date(self, year, month, day): + if year == 0: + return datetime.date.min + if month == 0: + month = 1 + elif month > 12: + month = 12 + if day == 0: + day = 1 + elif day > 30 and month in [4, 6, 9, 11]: + day = 30 + elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]: + day = 31 + elif day > 29 and month == 2: + day = 29 + try: + date = datetime.date(year, month, day) + except ValueError: + if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880, + 1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920, + 1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960, + 1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000, + 2004, 2008, 2012, 2016, 2020]: + day = 28 + date = datetime.date(year, month, day) + return date + + def import_url_list(self, file_path): + """Importer les données à partir d'un fichier contenant les URL + + Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit + vers un fichier de l'insée qui doit être importé. + + :param file_path: chemin vers le fichier d'URL + """ + if not os.path.isfile(file_path): + print(f"Le fichier {file_path} est introuvable.") + return + with open(file_path, 'r') as file_content: + for line in file_content.readlines(): + try: + dataset, url = line.split(' ') + url = url.strip('\n') + except: + print(f"Le fichier {file_path} contient une ligne non conforme : {line}.") + continue + dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset) + if not len(dataset_id) == 0: + print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.") + continue + print(f"Importation du dataset {dataset}") + print(f"Téléchargement de : '{url}'.") + result = requests.get(url) + try: + result.raise_for_status() + except: + print(f"Erreur lors du téléchargement du fichier {url}.") + continue + print("Parse des données téléchargées") + data_list = self.parse_insee_data(result.text) + self.import_data_list(data_list, dataset) + + def import_data_list(self, data_list, dataset_name): """Insérer des données dans la base - :param data_list: liste de dictionnaires contenant les - informations à insérer - + :param data_list: liste de dictionnaires contenant les informations à insérer + :param dataset_name: nom du jeu de données """ + dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name) + if not len(dataset_id) == 0: + print(f"Le dataset {dataset_name} a déjà été importé.") + return + print(f"Import en base des {len(data_list)} personnes.") with database.atomic(): - for batch in peewee.chunked(data_list, 70): - self.Person.insert_many(batch).execute() + for data in data_list: + # Gestion de la clé étrangère prénom + first_name = data.pop('first_name') + try: + first_name_id = FirstName.get(FirstName.first_name == first_name) + except peewee.DoesNotExist: + first_name_id = FirstName.create(first_name=first_name) + data['first_name'] = first_name_id + # Gestion de la clé étrangère nom de famille + last_name = data.pop('last_name') + try: + last_name_id = LastName.get(LastName.last_name == last_name) + except peewee.DoesNotExist: + last_name_id = LastName.create(last_name=last_name) + data['last_name'] = last_name_id + Person.insert(**data).execute() + ImportedDataset.insert(dataset=dataset_name).execute() + print("Import terminé") diff --git a/purge-registres-deces-insee/membre_base.py b/purge-registres-deces-insee/membre_base.py index d742c79..1a66717 100644 --- a/purge-registres-deces-insee/membre_base.py +++ b/purge-registres-deces-insee/membre_base.py @@ -45,16 +45,16 @@ class MembreBase(ABC): return f"{self.i_last_name}, {self.i_first_name}" def set_insee(self, insee): - self.i_first_name = insee.first_name - self.i_last_name = insee.last_name - self.i_annee_naissance = insee.annee_naissance - self.i_mois_naissance = insee.mois_naissance - self.i_jour_naissance = insee.jour_naissance - self.i_ville_naissance = insee.code_lieu_naissance - self.i_annee_deces = insee.annee_deces - self.i_mois_deces = insee.mois_deces - self.i_jour_deces = insee.jour_deces - self.i_ville_deces = insee.code_lieu_deces + self.i_first_name = insee['first_name'] + self.i_last_name = insee['last_name'] + self.i_annee_naissance = insee['date_naissance'].year + self.i_mois_naissance = insee['date_naissance'].month + self.i_jour_naissance = insee['date_naissance'].day + self.i_ville_naissance = insee['code_lieu_naissance'] + self.i_annee_deces = insee['date_deces'].year + self.i_mois_deces = insee['date_deces'].month + self.i_jour_deces = insee['date_deces'].day + self.i_ville_deces = insee['code_lieu_deces'] def convertir_villes_insee(self): old_value = self.i_ville_naissance