""" Copyright (c) 2020 Sdj Geek Voir le fichier LICENSE Classe d'accès aux données INSEE dans la base SQLite """ import os import peewee import requests import datetime # Objet de connexion à la base database = peewee.SqliteDatabase(None) # Définition du modèle de donnée class BaseModel(peewee.Model): class Meta: database = database class LastName(BaseModel): """Classe contenant tous les noms de famille """ last_name = peewee.CharField(unique=True) class FirstName(BaseModel): """Classe contenant tous les prénoms """ first_name = peewee.CharField(unique=True) class Person(BaseModel): """Classe représentant une personne dans la base """ first_name = peewee.ForeignKeyField(FirstName) last_name = peewee.ForeignKeyField(LastName) is_woman = peewee.BooleanField() date_naissance = peewee.DateField(index=True) code_lieu_naissance = peewee.CharField() commune_naissance = peewee.CharField() pays_naissance = peewee.CharField() date_deces = peewee.DateField() code_lieu_deces = peewee.CharField() numero_act_deces = peewee.CharField() class ImportedDataset(BaseModel): """ """ dataset = peewee.CharField(unique=True) # Gestion de l'accès aux données class BddInsee: """Classe encapsulant les accès aux données. """ def __init__(self, chemin_base_donnees=None): """Initialisation :param chemin_base_donnees: chemin vers le fichier SQLite """ if not chemin_base_donnees: chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite") new = not os.path.isfile(chemin_base_donnees) database.init(chemin_base_donnees) if new: database.create_tables([LastName, FirstName, Person, ImportedDataset]) # Fonctions d'accès aux données def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance): """Rechercher une personne dans la base :param first_name: prénom de la personne à rechercher :param last_name: nom de famille de la personne à rechercher :param maiden_name: nom de jeune fille de la personne à rechercher (None si non défini) :param annee_naissance: année de naissance de la personne à rechercher :param mois_naissance: mois de naissance de la personne à rechercher :param jour_naissance: jour de naissance de la personne à rechercher :returns: liste contenant un dictionnaire par personne trouvée """ if maiden_name: # return Person.select().where((Person.annee_naissance == int(annee_naissance)) # & (Person.mois_naissance == int(mois_naissance)) # & (Person.jour_naissance == int(jour_naissance)) # & (Person.first_name.contains(first_name.upper())) # & ((Person.last_name.contains(last_name.upper())) # | Person.last_name.contains(maiden_name.upper()))) query = Person.select().join(FirstName).switch(Person).join(LastName)\ .where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance))) & (FirstName.first_name.contains(first_name.upper())) & ((LastName.last_name.contains(last_name.upper())) | LastName.last_name.contains(maiden_name.upper()))) else: # return Person.select().where((Person.annee_naissance == int(annee_naissance)) # & (Person.mois_naissance == int(mois_naissance)) # & (Person.jour_naissance == int(jour_naissance)) # & (Person.first_name.contains(first_name.upper())) # & (Person.last_name.contains(last_name.upper()))) query = Person.select().join(FirstName).switch(Person).join(LastName)\ .where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance))) & (FirstName.first_name.contains(first_name.upper())) & (LastName.last_name.contains(last_name.upper()))) result = list() if query: for row in query: result.append({ 'first_name': row.first_name.first_name, 'last_name': row.last_name.last_name, 'date_naissance': row.date_naissance, 'code_lieu_naissance': row.code_lieu_naissance, 'date_deces': row.date_deces, 'code_lieu_deces': row.code_lieu_deces }) return result # Fonctions d'import des données def parse_insee_data(self, data_text): """Parse le texte d'un fichier de l'INSEE :param data_text: texte contenu dans un fichier de l'INSEE :returns: liste de dictionnaires contenant les informations à insérer """ data_list = [] for line_number, line in enumerate(data_text.split('\n')): if not line: continue try: try: last_name, first_name = line[0:80].strip()[:-1].split("*") except ValueError: if '*' not in line[0:80]: continue raise date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89])) date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162])) data_list.append({'last_name': last_name.upper(), 'first_name': first_name.strip('/').upper(), 'is_woman': (line[80] == "2"), 'date_naissance': date_naissance, 'code_lieu_naissance': line[89:94].upper(), 'commune_naissance': line[94: 124].strip().upper(), 'pays_naissance': line[124: 154].strip().upper(), 'date_deces': date_deces, 'code_lieu_deces': line[162:167].upper(), 'numero_act_deces': line[167:].strip().upper()}) except ValueError as e: print(f"Erreur pour parser ligne numéro {line_number}.") print(line) raise return data_list def parse_date(self, year, month, day): if year == 0: return datetime.date.min if month == 0: month = 1 elif month > 12: month = 12 if day == 0: day = 1 elif day > 30 and month in [4, 6, 9, 11]: day = 30 elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]: day = 31 elif day > 29 and month == 2: day = 29 try: date = datetime.date(year, month, day) except ValueError: if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880, 1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920, 1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960, 1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000, 2004, 2008, 2012, 2016, 2020]: day = 28 date = datetime.date(year, month, day) return date def import_url_list(self, file_path): """Importer les données à partir d'un fichier contenant les URL Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit vers un fichier de l'insée qui doit être importé. :param file_path: chemin vers le fichier d'URL """ if not os.path.isfile(file_path): print(f"Le fichier {file_path} est introuvable.") return with open(file_path, 'r') as file_content: for line in file_content.readlines(): try: dataset, url = line.split(' ') url = url.strip('\n') except: print(f"Le fichier {file_path} contient une ligne non conforme : {line}.") continue dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset) if not len(dataset_id) == 0: print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.") continue print(f"Importation du dataset {dataset}") print(f"Téléchargement de : '{url}'.") result = requests.get(url) try: result.raise_for_status() except: print(f"Erreur lors du téléchargement du fichier {url}.") continue print("Parse des données téléchargées") data_list = self.parse_insee_data(result.text) self.import_data_list(data_list, dataset) def import_data_list(self, data_list, dataset_name): """Insérer des données dans la base :param data_list: liste de dictionnaires contenant les informations à insérer :param dataset_name: nom du jeu de données """ dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name) if not len(dataset_id) == 0: print(f"Le dataset {dataset_name} a déjà été importé.") return print(f"Import en base des {len(data_list)} personnes.") with database.atomic(): for data in data_list: # Gestion de la clé étrangère prénom first_name = data.pop('first_name') try: first_name_id = FirstName.get(FirstName.first_name == first_name) except peewee.DoesNotExist: first_name_id = FirstName.create(first_name=first_name) data['first_name'] = first_name_id # Gestion de la clé étrangère nom de famille last_name = data.pop('last_name') try: last_name_id = LastName.get(LastName.last_name == last_name) except peewee.DoesNotExist: last_name_id = LastName.create(last_name=last_name) data['last_name'] = last_name_id Person.insert(**data).execute() ImportedDataset.insert(dataset=dataset_name).execute() print("Import terminé")