Optimisation de l abase INSEE pour accélérer les recherches.
This commit is contained in:
parent
35d82173c6
commit
19126e57aa
|
@ -8,50 +8,76 @@ Classe d'accès aux données INSEE dans la base SQLite
|
|||
|
||||
import os
|
||||
import peewee
|
||||
import requests
|
||||
import datetime
|
||||
|
||||
|
||||
# Objet de connexion à la base
|
||||
database = peewee.SqliteDatabase(None)
|
||||
|
||||
|
||||
# Définition du modèle de donnée
|
||||
class BaseModel(peewee.Model):
|
||||
class Meta:
|
||||
database = database
|
||||
|
||||
|
||||
class LastName(BaseModel):
|
||||
"""Classe contenant tous les noms de famille
|
||||
|
||||
"""
|
||||
last_name = peewee.CharField(unique=True)
|
||||
|
||||
|
||||
class FirstName(BaseModel):
|
||||
"""Classe contenant tous les prénoms
|
||||
|
||||
"""
|
||||
first_name = peewee.CharField(unique=True)
|
||||
|
||||
|
||||
class Person(BaseModel):
|
||||
"""Classe représentant une personne dans la base
|
||||
|
||||
"""
|
||||
first_name = peewee.ForeignKeyField(FirstName)
|
||||
last_name = peewee.ForeignKeyField(LastName)
|
||||
is_woman = peewee.BooleanField()
|
||||
date_naissance = peewee.DateField(index=True)
|
||||
code_lieu_naissance = peewee.CharField()
|
||||
commune_naissance = peewee.CharField()
|
||||
pays_naissance = peewee.CharField()
|
||||
date_deces = peewee.DateField()
|
||||
code_lieu_deces = peewee.CharField()
|
||||
numero_act_deces = peewee.CharField()
|
||||
|
||||
|
||||
class ImportedDataset(BaseModel):
|
||||
"""
|
||||
|
||||
"""
|
||||
dataset = peewee.CharField(unique=True)
|
||||
|
||||
# Gestion de l'accès aux données
|
||||
class BddInsee:
|
||||
"""Classe encapsulant les accès aux données.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, chemin_base_donnees):
|
||||
def __init__(self, chemin_base_donnees=None):
|
||||
"""Initialisation
|
||||
|
||||
:param chemin_base_donnees: chemin vers le fichier SQLite
|
||||
|
||||
"""
|
||||
if not chemin_base_donnees:
|
||||
chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite")
|
||||
new = not os.path.isfile(chemin_base_donnees)
|
||||
database.init(chemin_base_donnees)
|
||||
if new:
|
||||
database.create_tables([self.Person])
|
||||
|
||||
class Person(peewee.Model):
|
||||
"""Classe représentant une personne dans la base
|
||||
|
||||
"""
|
||||
last_name = peewee.CharField()
|
||||
first_name = peewee.CharField()
|
||||
is_woman = peewee.BooleanField()
|
||||
annee_naissance = peewee.IntegerField()
|
||||
mois_naissance = peewee.IntegerField()
|
||||
jour_naissance = peewee.IntegerField()
|
||||
code_lieu_naissance = peewee.CharField()
|
||||
commune_naissance = peewee.CharField()
|
||||
pays_naissance = peewee.CharField()
|
||||
annee_deces = peewee.IntegerField()
|
||||
mois_deces = peewee.IntegerField()
|
||||
jour_deces = peewee.IntegerField()
|
||||
code_lieu_deces = peewee.CharField()
|
||||
numero_act_deces = peewee.CharField()
|
||||
|
||||
class Meta:
|
||||
database = database
|
||||
database.create_tables([LastName, FirstName, Person, ImportedDataset])
|
||||
|
||||
# Fonctions d'accès aux données
|
||||
def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance):
|
||||
"""Rechercher une personne dans la base
|
||||
|
||||
|
@ -65,26 +91,165 @@ class BddInsee:
|
|||
|
||||
"""
|
||||
if maiden_name:
|
||||
return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance))
|
||||
& (self.Person.mois_naissance == int(mois_naissance))
|
||||
& (self.Person.jour_naissance == int(jour_naissance))
|
||||
& (self.Person.first_name.contains(first_name.upper()))
|
||||
& ((self.Person.last_name.contains(last_name.upper()))
|
||||
| self.Person.last_name.contains(maiden_name.upper())))
|
||||
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
|
||||
# & (Person.mois_naissance == int(mois_naissance))
|
||||
# & (Person.jour_naissance == int(jour_naissance))
|
||||
# & (Person.first_name.contains(first_name.upper()))
|
||||
# & ((Person.last_name.contains(last_name.upper()))
|
||||
# | Person.last_name.contains(maiden_name.upper())))
|
||||
query = Person.select().join(FirstName).switch(Person).join(LastName)\
|
||||
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
|
||||
& (FirstName.first_name.contains(first_name.upper()))
|
||||
& ((LastName.last_name.contains(last_name.upper()))
|
||||
| LastName.last_name.contains(maiden_name.upper())))
|
||||
else:
|
||||
return self.Person.select().where((self.Person.annee_naissance == int(annee_naissance))
|
||||
& (self.Person.mois_naissance == int(mois_naissance))
|
||||
& (self.Person.jour_naissance == int(jour_naissance))
|
||||
& (self.Person.first_name.contains(first_name.upper()))
|
||||
& (self.Person.last_name.contains(last_name.upper())))
|
||||
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
|
||||
# & (Person.mois_naissance == int(mois_naissance))
|
||||
# & (Person.jour_naissance == int(jour_naissance))
|
||||
# & (Person.first_name.contains(first_name.upper()))
|
||||
# & (Person.last_name.contains(last_name.upper())))
|
||||
query = Person.select().join(FirstName).switch(Person).join(LastName)\
|
||||
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
|
||||
& (FirstName.first_name.contains(first_name.upper()))
|
||||
& (LastName.last_name.contains(last_name.upper())))
|
||||
result = list()
|
||||
if query:
|
||||
for row in query:
|
||||
result.append({
|
||||
'first_name': row.first_name.first_name,
|
||||
'last_name': row.last_name.last_name,
|
||||
'date_naissance': row.date_naissance,
|
||||
'code_lieu_naissance': row.code_lieu_naissance,
|
||||
'date_deces': row.date_deces,
|
||||
'code_lieu_deces': row.code_lieu_deces
|
||||
})
|
||||
return result
|
||||
|
||||
def import_data_list(self, data_list):
|
||||
# Fonctions d'import des données
|
||||
def parse_insee_data(self, data_text):
|
||||
"""Parse le texte d'un fichier de l'INSEE
|
||||
|
||||
:param data_text: texte contenu dans un fichier de l'INSEE
|
||||
:returns: liste de dictionnaires contenant les informations à insérer
|
||||
"""
|
||||
data_list = []
|
||||
for line_number, line in enumerate(data_text.split('\n')):
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
try:
|
||||
last_name, first_name = line[0:80].strip()[:-1].split("*")
|
||||
except ValueError:
|
||||
if '*' not in line[0:80]:
|
||||
continue
|
||||
raise
|
||||
date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89]))
|
||||
date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162]))
|
||||
data_list.append({'last_name': last_name.upper(),
|
||||
'first_name': first_name.strip('/').upper(),
|
||||
'is_woman': (line[80] == "2"),
|
||||
'date_naissance': date_naissance,
|
||||
'code_lieu_naissance': line[89:94].upper(),
|
||||
'commune_naissance': line[94: 124].strip().upper(),
|
||||
'pays_naissance': line[124: 154].strip().upper(),
|
||||
'date_deces': date_deces,
|
||||
'code_lieu_deces': line[162:167].upper(),
|
||||
'numero_act_deces': line[167:].strip().upper()})
|
||||
except ValueError as e:
|
||||
print(f"Erreur pour parser ligne numéro {line_number}.")
|
||||
print(line)
|
||||
raise
|
||||
return data_list
|
||||
|
||||
def parse_date(self, year, month, day):
|
||||
if year == 0:
|
||||
return datetime.date.min
|
||||
if month == 0:
|
||||
month = 1
|
||||
elif month > 12:
|
||||
month = 12
|
||||
if day == 0:
|
||||
day = 1
|
||||
elif day > 30 and month in [4, 6, 9, 11]:
|
||||
day = 30
|
||||
elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]:
|
||||
day = 31
|
||||
elif day > 29 and month == 2:
|
||||
day = 29
|
||||
try:
|
||||
date = datetime.date(year, month, day)
|
||||
except ValueError:
|
||||
if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880,
|
||||
1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920,
|
||||
1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960,
|
||||
1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000,
|
||||
2004, 2008, 2012, 2016, 2020]:
|
||||
day = 28
|
||||
date = datetime.date(year, month, day)
|
||||
return date
|
||||
|
||||
def import_url_list(self, file_path):
|
||||
"""Importer les données à partir d'un fichier contenant les URL
|
||||
|
||||
Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit
|
||||
vers un fichier de l'insée qui doit être importé.
|
||||
|
||||
:param file_path: chemin vers le fichier d'URL
|
||||
"""
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"Le fichier {file_path} est introuvable.")
|
||||
return
|
||||
with open(file_path, 'r') as file_content:
|
||||
for line in file_content.readlines():
|
||||
try:
|
||||
dataset, url = line.split(' ')
|
||||
url = url.strip('\n')
|
||||
except:
|
||||
print(f"Le fichier {file_path} contient une ligne non conforme : {line}.")
|
||||
continue
|
||||
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset)
|
||||
if not len(dataset_id) == 0:
|
||||
print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.")
|
||||
continue
|
||||
print(f"Importation du dataset {dataset}")
|
||||
print(f"Téléchargement de : '{url}'.")
|
||||
result = requests.get(url)
|
||||
try:
|
||||
result.raise_for_status()
|
||||
except:
|
||||
print(f"Erreur lors du téléchargement du fichier {url}.")
|
||||
continue
|
||||
print("Parse des données téléchargées")
|
||||
data_list = self.parse_insee_data(result.text)
|
||||
self.import_data_list(data_list, dataset)
|
||||
|
||||
def import_data_list(self, data_list, dataset_name):
|
||||
"""Insérer des données dans la base
|
||||
|
||||
:param data_list: liste de dictionnaires contenant les
|
||||
informations à insérer
|
||||
|
||||
:param data_list: liste de dictionnaires contenant les informations à insérer
|
||||
:param dataset_name: nom du jeu de données
|
||||
"""
|
||||
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name)
|
||||
if not len(dataset_id) == 0:
|
||||
print(f"Le dataset {dataset_name} a déjà été importé.")
|
||||
return
|
||||
print(f"Import en base des {len(data_list)} personnes.")
|
||||
with database.atomic():
|
||||
for batch in peewee.chunked(data_list, 70):
|
||||
self.Person.insert_many(batch).execute()
|
||||
for data in data_list:
|
||||
# Gestion de la clé étrangère prénom
|
||||
first_name = data.pop('first_name')
|
||||
try:
|
||||
first_name_id = FirstName.get(FirstName.first_name == first_name)
|
||||
except peewee.DoesNotExist:
|
||||
first_name_id = FirstName.create(first_name=first_name)
|
||||
data['first_name'] = first_name_id
|
||||
# Gestion de la clé étrangère nom de famille
|
||||
last_name = data.pop('last_name')
|
||||
try:
|
||||
last_name_id = LastName.get(LastName.last_name == last_name)
|
||||
except peewee.DoesNotExist:
|
||||
last_name_id = LastName.create(last_name=last_name)
|
||||
data['last_name'] = last_name_id
|
||||
Person.insert(**data).execute()
|
||||
ImportedDataset.insert(dataset=dataset_name).execute()
|
||||
print("Import terminé")
|
||||
|
|
|
@ -45,16 +45,16 @@ class MembreBase(ABC):
|
|||
return f"{self.i_last_name}, {self.i_first_name}"
|
||||
|
||||
def set_insee(self, insee):
|
||||
self.i_first_name = insee.first_name
|
||||
self.i_last_name = insee.last_name
|
||||
self.i_annee_naissance = insee.annee_naissance
|
||||
self.i_mois_naissance = insee.mois_naissance
|
||||
self.i_jour_naissance = insee.jour_naissance
|
||||
self.i_ville_naissance = insee.code_lieu_naissance
|
||||
self.i_annee_deces = insee.annee_deces
|
||||
self.i_mois_deces = insee.mois_deces
|
||||
self.i_jour_deces = insee.jour_deces
|
||||
self.i_ville_deces = insee.code_lieu_deces
|
||||
self.i_first_name = insee['first_name']
|
||||
self.i_last_name = insee['last_name']
|
||||
self.i_annee_naissance = insee['date_naissance'].year
|
||||
self.i_mois_naissance = insee['date_naissance'].month
|
||||
self.i_jour_naissance = insee['date_naissance'].day
|
||||
self.i_ville_naissance = insee['code_lieu_naissance']
|
||||
self.i_annee_deces = insee['date_deces'].year
|
||||
self.i_mois_deces = insee['date_deces'].month
|
||||
self.i_jour_deces = insee['date_deces'].day
|
||||
self.i_ville_deces = insee['code_lieu_deces']
|
||||
|
||||
def convertir_villes_insee(self):
|
||||
old_value = self.i_ville_naissance
|
||||
|
|
Loading…
Reference in New Issue