Merge branch 'optimise_bdd_insee' into 'master'

Optimisation de l abase INSEE pour accélérer les recherches.

Closes #14

See merge request sdjgeek/purge-registres-deces-insee!6
This commit is contained in:
Sdj Geek 2020-09-30 13:36:45 +02:00
commit 2d803d8d59
2 changed files with 216 additions and 51 deletions

View File

@ -8,50 +8,76 @@ Classe d'accès aux données INSEE dans la base SQLite
import os import os
import peewee import peewee
import requests
import datetime
# Objet de connexion à la base # Objet de connexion à la base
database = peewee.SqliteDatabase(None) database = peewee.SqliteDatabase(None)
# Définition du modèle de donnée
class BaseModel(peewee.Model):
class Meta:
database = database
class LastName(BaseModel):
"""Classe contenant tous les noms de famille
"""
last_name = peewee.CharField(unique=True)
class FirstName(BaseModel):
"""Classe contenant tous les prénoms
"""
first_name = peewee.CharField(unique=True)
class Person(BaseModel):
"""Classe représentant une personne dans la base
"""
first_name = peewee.ForeignKeyField(FirstName)
last_name = peewee.ForeignKeyField(LastName)
is_woman = peewee.BooleanField()
date_naissance = peewee.DateField(index=True)
code_lieu_naissance = peewee.CharField()
commune_naissance = peewee.CharField()
pays_naissance = peewee.CharField()
date_deces = peewee.DateField()
code_lieu_deces = peewee.CharField()
numero_act_deces = peewee.CharField()
class ImportedDataset(BaseModel):
"""
"""
dataset = peewee.CharField(unique=True)
# Gestion de l'accès aux données
class BddInsee: class BddInsee:
"""Classe encapsulant les accès aux données. """Classe encapsulant les accès aux données.
""" """
def __init__(self, chemin_base_donnees): def __init__(self, chemin_base_donnees=None):
"""Initialisation """Initialisation
:param chemin_base_donnees: chemin vers le fichier SQLite :param chemin_base_donnees: chemin vers le fichier SQLite
""" """
if not chemin_base_donnees:
chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite")
new = not os.path.isfile(chemin_base_donnees) new = not os.path.isfile(chemin_base_donnees)
database.init(chemin_base_donnees) database.init(chemin_base_donnees)
if new: if new:
database.create_tables([self.Person]) database.create_tables([LastName, FirstName, Person, ImportedDataset])
class Person(peewee.Model):
"""Classe représentant une personne dans la base
"""
last_name = peewee.CharField()
first_name = peewee.CharField()
is_woman = peewee.BooleanField()
annee_naissance = peewee.IntegerField()
mois_naissance = peewee.IntegerField()
jour_naissance = peewee.IntegerField()
code_lieu_naissance = peewee.CharField()
commune_naissance = peewee.CharField()
pays_naissance = peewee.CharField()
annee_deces = peewee.IntegerField()
mois_deces = peewee.IntegerField()
jour_deces = peewee.IntegerField()
code_lieu_deces = peewee.CharField()
numero_act_deces = peewee.CharField()
class Meta:
database = database
# Fonctions d'accès aux données
def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance): def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance):
"""Rechercher une personne dans la base """Rechercher une personne dans la base
@ -65,26 +91,165 @@ class BddInsee:
""" """
if maiden_name: if maiden_name:
return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance)) # return Person.select().where((Person.annee_naissance == int(annee_naissance))
& (self.Person.mois_naissance == int(mois_naissance)) # & (Person.mois_naissance == int(mois_naissance))
& (self.Person.jour_naissance == int(jour_naissance)) # & (Person.jour_naissance == int(jour_naissance))
& (self.Person.first_name.contains(first_name.upper())) # & (Person.first_name.contains(first_name.upper()))
& ((self.Person.last_name.contains(last_name.upper())) # & ((Person.last_name.contains(last_name.upper()))
| self.Person.last_name.contains(maiden_name.upper()))) # | Person.last_name.contains(maiden_name.upper())))
query = Person.select().join(FirstName).switch(Person).join(LastName)\
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
& (FirstName.first_name.contains(first_name.upper()))
& ((LastName.last_name.contains(last_name.upper()))
| LastName.last_name.contains(maiden_name.upper())))
else: else:
return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance)) # return Person.select().where((Person.annee_naissance == int(annee_naissance))
& (self.Person.mois_naissance == int(mois_naissance)) # & (Person.mois_naissance == int(mois_naissance))
& (self.Person.jour_naissance == int(jour_naissance)) # & (Person.jour_naissance == int(jour_naissance))
& (self.Person.first_name.contains(first_name.upper())) # & (Person.first_name.contains(first_name.upper()))
& (self.Person.last_name.contains(last_name.upper()))) # & (Person.last_name.contains(last_name.upper())))
query = Person.select().join(FirstName).switch(Person).join(LastName)\
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
& (FirstName.first_name.contains(first_name.upper()))
& (LastName.last_name.contains(last_name.upper())))
result = list()
if query:
for row in query:
result.append({
'first_name': row.first_name.first_name,
'last_name': row.last_name.last_name,
'date_naissance': row.date_naissance,
'code_lieu_naissance': row.code_lieu_naissance,
'date_deces': row.date_deces,
'code_lieu_deces': row.code_lieu_deces
})
return result
def import_data_list(self, data_list): # Fonctions d'import des données
def parse_insee_data(self, data_text):
"""Parse le texte d'un fichier de l'INSEE
:param data_text: texte contenu dans un fichier de l'INSEE
:returns: liste de dictionnaires contenant les informations à insérer
"""
data_list = []
for line_number, line in enumerate(data_text.split('\n')):
if not line:
continue
try:
try:
last_name, first_name = line[0:80].strip()[:-1].split("*")
except ValueError:
if '*' not in line[0:80]:
continue
raise
date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89]))
date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162]))
data_list.append({'last_name': last_name.upper(),
'first_name': first_name.strip('/').upper(),
'is_woman': (line[80] == "2"),
'date_naissance': date_naissance,
'code_lieu_naissance': line[89:94].upper(),
'commune_naissance': line[94: 124].strip().upper(),
'pays_naissance': line[124: 154].strip().upper(),
'date_deces': date_deces,
'code_lieu_deces': line[162:167].upper(),
'numero_act_deces': line[167:].strip().upper()})
except ValueError as e:
print(f"Erreur pour parser ligne numéro {line_number}.")
print(line)
raise
return data_list
def parse_date(self, year, month, day):
if year == 0:
return datetime.date.min
if month == 0:
month = 1
elif month > 12:
month = 12
if day == 0:
day = 1
elif day > 30 and month in [4, 6, 9, 11]:
day = 30
elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]:
day = 31
elif day > 29 and month == 2:
day = 29
try:
date = datetime.date(year, month, day)
except ValueError:
if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880,
1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920,
1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960,
1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000,
2004, 2008, 2012, 2016, 2020]:
day = 28
date = datetime.date(year, month, day)
return date
def import_url_list(self, file_path):
"""Importer les données à partir d'un fichier contenant les URL
Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit
vers un fichier de l'insée qui doit être importé.
:param file_path: chemin vers le fichier d'URL
"""
if not os.path.isfile(file_path):
print(f"Le fichier {file_path} est introuvable.")
return
with open(file_path, 'r') as file_content:
for line in file_content.readlines():
try:
dataset, url = line.split(' ')
url = url.strip('\n')
except:
print(f"Le fichier {file_path} contient une ligne non conforme : {line}.")
continue
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset)
if not len(dataset_id) == 0:
print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.")
continue
print(f"Importation du dataset {dataset}")
print(f"Téléchargement de : '{url}'.")
result = requests.get(url)
try:
result.raise_for_status()
except:
print(f"Erreur lors du téléchargement du fichier {url}.")
continue
print("Parse des données téléchargées")
data_list = self.parse_insee_data(result.text)
self.import_data_list(data_list, dataset)
def import_data_list(self, data_list, dataset_name):
"""Insérer des données dans la base """Insérer des données dans la base
:param data_list: liste de dictionnaires contenant les :param data_list: liste de dictionnaires contenant les informations à insérer
informations à insérer :param dataset_name: nom du jeu de données
""" """
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name)
if not len(dataset_id) == 0:
print(f"Le dataset {dataset_name} a déjà été importé.")
return
print(f"Import en base des {len(data_list)} personnes.")
with database.atomic(): with database.atomic():
for batch in peewee.chunked(data_list, 70): for data in data_list:
self.Person.insert_many(batch).execute() # Gestion de la clé étrangère prénom
first_name = data.pop('first_name')
try:
first_name_id = FirstName.get(FirstName.first_name == first_name)
except peewee.DoesNotExist:
first_name_id = FirstName.create(first_name=first_name)
data['first_name'] = first_name_id
# Gestion de la clé étrangère nom de famille
last_name = data.pop('last_name')
try:
last_name_id = LastName.get(LastName.last_name == last_name)
except peewee.DoesNotExist:
last_name_id = LastName.create(last_name=last_name)
data['last_name'] = last_name_id
Person.insert(**data).execute()
ImportedDataset.insert(dataset=dataset_name).execute()
print("Import terminé")

View File

@ -45,16 +45,16 @@ class MembreBase(ABC):
return f"{self.i_last_name}, {self.i_first_name}" return f"{self.i_last_name}, {self.i_first_name}"
def set_insee(self, insee): def set_insee(self, insee):
self.i_first_name = insee.first_name self.i_first_name = insee['first_name']
self.i_last_name = insee.last_name self.i_last_name = insee['last_name']
self.i_annee_naissance = insee.annee_naissance self.i_annee_naissance = insee['date_naissance'].year
self.i_mois_naissance = insee.mois_naissance self.i_mois_naissance = insee['date_naissance'].month
self.i_jour_naissance = insee.jour_naissance self.i_jour_naissance = insee['date_naissance'].day
self.i_ville_naissance = insee.code_lieu_naissance self.i_ville_naissance = insee['code_lieu_naissance']
self.i_annee_deces = insee.annee_deces self.i_annee_deces = insee['date_deces'].year
self.i_mois_deces = insee.mois_deces self.i_mois_deces = insee['date_deces'].month
self.i_jour_deces = insee.jour_deces self.i_jour_deces = insee['date_deces'].day
self.i_ville_deces = insee.code_lieu_deces self.i_ville_deces = insee['code_lieu_deces']
def convertir_villes_insee(self): def convertir_villes_insee(self):
old_value = self.i_ville_naissance old_value = self.i_ville_naissance