256 lines
11 KiB
Python
Executable File
256 lines
11 KiB
Python
Executable File
"""
|
|
Copyright (c) 2020 Sdj Geek
|
|
Voir le fichier LICENSE
|
|
|
|
Classe d'accès aux données INSEE dans la base SQLite
|
|
|
|
"""
|
|
|
|
import os
|
|
import peewee
|
|
import requests
|
|
import datetime
|
|
|
|
|
|
# Objet de connexion à la base
|
|
database = peewee.SqliteDatabase(None)
|
|
|
|
|
|
# Définition du modèle de donnée
|
|
class BaseModel(peewee.Model):
|
|
class Meta:
|
|
database = database
|
|
|
|
|
|
class LastName(BaseModel):
|
|
"""Classe contenant tous les noms de famille
|
|
|
|
"""
|
|
last_name = peewee.CharField(unique=True)
|
|
|
|
|
|
class FirstName(BaseModel):
|
|
"""Classe contenant tous les prénoms
|
|
|
|
"""
|
|
first_name = peewee.CharField(unique=True)
|
|
|
|
|
|
class Person(BaseModel):
|
|
"""Classe représentant une personne dans la base
|
|
|
|
"""
|
|
first_name = peewee.ForeignKeyField(FirstName)
|
|
last_name = peewee.ForeignKeyField(LastName)
|
|
is_woman = peewee.BooleanField()
|
|
date_naissance = peewee.DateField(index=True)
|
|
code_lieu_naissance = peewee.CharField()
|
|
commune_naissance = peewee.CharField()
|
|
pays_naissance = peewee.CharField()
|
|
date_deces = peewee.DateField()
|
|
code_lieu_deces = peewee.CharField()
|
|
numero_act_deces = peewee.CharField()
|
|
|
|
|
|
class ImportedDataset(BaseModel):
|
|
"""
|
|
|
|
"""
|
|
dataset = peewee.CharField(unique=True)
|
|
|
|
# Gestion de l'accès aux données
|
|
class BddInsee:
|
|
"""Classe encapsulant les accès aux données.
|
|
|
|
"""
|
|
|
|
def __init__(self, chemin_base_donnees=None):
|
|
"""Initialisation
|
|
|
|
:param chemin_base_donnees: chemin vers le fichier SQLite
|
|
|
|
"""
|
|
if not chemin_base_donnees:
|
|
chemin_base_donnees = os.path.join(os.path.dirname(os.path.abspath(__file__)), "bdd_insee.sqlite")
|
|
new = not os.path.isfile(chemin_base_donnees)
|
|
database.init(chemin_base_donnees)
|
|
if new:
|
|
database.create_tables([LastName, FirstName, Person, ImportedDataset])
|
|
|
|
# Fonctions d'accès aux données
|
|
def find_person(self, first_name, last_name, maiden_name, annee_naissance, mois_naissance, jour_naissance):
|
|
"""Rechercher une personne dans la base
|
|
|
|
:param first_name: prénom de la personne à rechercher
|
|
:param last_name: nom de famille de la personne à rechercher
|
|
:param maiden_name: nom de jeune fille de la personne à rechercher (None si non défini)
|
|
:param annee_naissance: année de naissance de la personne à rechercher
|
|
:param mois_naissance: mois de naissance de la personne à rechercher
|
|
:param jour_naissance: jour de naissance de la personne à rechercher
|
|
:returns: liste contenant un dictionnaire par personne trouvée
|
|
|
|
"""
|
|
if maiden_name:
|
|
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
|
|
# & (Person.mois_naissance == int(mois_naissance))
|
|
# & (Person.jour_naissance == int(jour_naissance))
|
|
# & (Person.first_name.contains(first_name.upper()))
|
|
# & ((Person.last_name.contains(last_name.upper()))
|
|
# | Person.last_name.contains(maiden_name.upper())))
|
|
query = Person.select().join(FirstName).switch(Person).join(LastName)\
|
|
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
|
|
& (FirstName.first_name.contains(first_name.upper()))
|
|
& ((LastName.last_name.contains(last_name.upper()))
|
|
| LastName.last_name.contains(maiden_name.upper())))
|
|
else:
|
|
# return Person.select().where((Person.annee_naissance == int(annee_naissance))
|
|
# & (Person.mois_naissance == int(mois_naissance))
|
|
# & (Person.jour_naissance == int(jour_naissance))
|
|
# & (Person.first_name.contains(first_name.upper()))
|
|
# & (Person.last_name.contains(last_name.upper())))
|
|
query = Person.select().join(FirstName).switch(Person).join(LastName)\
|
|
.where((Person.date_naissance == datetime.date(int(annee_naissance), int(mois_naissance), int(jour_naissance)))
|
|
& (FirstName.first_name.contains(first_name.upper()))
|
|
& (LastName.last_name.contains(last_name.upper())))
|
|
result = list()
|
|
if query:
|
|
for row in query:
|
|
result.append({
|
|
'first_name': row.first_name.first_name,
|
|
'last_name': row.last_name.last_name,
|
|
'date_naissance': row.date_naissance,
|
|
'code_lieu_naissance': row.code_lieu_naissance,
|
|
'date_deces': row.date_deces,
|
|
'code_lieu_deces': row.code_lieu_deces
|
|
})
|
|
return result
|
|
|
|
# Fonctions d'import des données
|
|
def parse_insee_data(self, data_text):
|
|
"""Parse le texte d'un fichier de l'INSEE
|
|
|
|
:param data_text: texte contenu dans un fichier de l'INSEE
|
|
:returns: liste de dictionnaires contenant les informations à insérer
|
|
"""
|
|
data_list = []
|
|
for line_number, line in enumerate(data_text.split('\n')):
|
|
if not line:
|
|
continue
|
|
try:
|
|
try:
|
|
last_name, first_name = line[0:80].strip()[:-1].split("*")
|
|
except ValueError:
|
|
if '*' not in line[0:80]:
|
|
continue
|
|
raise
|
|
date_naissance = self.parse_date(int(line[81:85]), int(line[85:87]), int(line[87:89]))
|
|
date_deces = self.parse_date(int(line[154:158]), int(line[158:160]), int(line[160:162]))
|
|
data_list.append({'last_name': last_name.upper(),
|
|
'first_name': first_name.strip('/').upper(),
|
|
'is_woman': (line[80] == "2"),
|
|
'date_naissance': date_naissance,
|
|
'code_lieu_naissance': line[89:94].upper(),
|
|
'commune_naissance': line[94: 124].strip().upper(),
|
|
'pays_naissance': line[124: 154].strip().upper(),
|
|
'date_deces': date_deces,
|
|
'code_lieu_deces': line[162:167].upper(),
|
|
'numero_act_deces': line[167:].strip().upper()})
|
|
except ValueError as e:
|
|
print(f"Erreur pour parser ligne numéro {line_number}.")
|
|
print(line)
|
|
raise
|
|
return data_list
|
|
|
|
def parse_date(self, year, month, day):
|
|
if year == 0:
|
|
return datetime.date.min
|
|
if month == 0:
|
|
month = 1
|
|
elif month > 12:
|
|
month = 12
|
|
if day == 0:
|
|
day = 1
|
|
elif day > 30 and month in [4, 6, 9, 11]:
|
|
day = 30
|
|
elif day > 31 and month in [1, 3, 5, 7, 8, 10, 12]:
|
|
day = 31
|
|
elif day > 29 and month == 2:
|
|
day = 29
|
|
try:
|
|
date = datetime.date(year, month, day)
|
|
except ValueError:
|
|
if month == 2 and day == 29 and year not in [1844, 1848, 1852, 1856, 1860, 1864, 1868, 1872, 1876, 1880,
|
|
1884, 1888, 1892, 1896, 1904, 1908, 1912, 1916, 1920,
|
|
1924, 1928, 1932, 1936, 1940, 1944, 1948, 1952, 1956, 1960,
|
|
1964, 1968, 1972, 1976, 1980, 1984, 1988, 1992, 1996, 2000,
|
|
2004, 2008, 2012, 2016, 2020]:
|
|
day = 28
|
|
date = datetime.date(year, month, day)
|
|
return date
|
|
|
|
def import_url_list(self, file_path):
|
|
"""Importer les données à partir d'un fichier contenant les URL
|
|
|
|
Un fichier d'URL contient un nom de dataset et une URL par ligne, sapéras par une espace. Chaque URL conduit
|
|
vers un fichier de l'insée qui doit être importé.
|
|
|
|
:param file_path: chemin vers le fichier d'URL
|
|
"""
|
|
if not os.path.isfile(file_path):
|
|
print(f"Le fichier {file_path} est introuvable.")
|
|
return
|
|
with open(file_path, 'r') as file_content:
|
|
for line in file_content.readlines():
|
|
try:
|
|
dataset, url = line.split(' ')
|
|
url = url.strip('\n')
|
|
except:
|
|
print(f"Le fichier {file_path} contient une ligne non conforme : {line}.")
|
|
continue
|
|
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset)
|
|
if not len(dataset_id) == 0:
|
|
print(f"Le dataset {dataset} ne sera pas traité, car il a déjà été importé.")
|
|
continue
|
|
print(f"Importation du dataset {dataset}")
|
|
print(f"Téléchargement de : '{url}'.")
|
|
result = requests.get(url)
|
|
try:
|
|
result.raise_for_status()
|
|
except:
|
|
print(f"Erreur lors du téléchargement du fichier {url}.")
|
|
continue
|
|
print("Parse des données téléchargées")
|
|
data_list = self.parse_insee_data(result.text)
|
|
self.import_data_list(data_list, dataset)
|
|
|
|
def import_data_list(self, data_list, dataset_name):
|
|
"""Insérer des données dans la base
|
|
|
|
:param data_list: liste de dictionnaires contenant les informations à insérer
|
|
:param dataset_name: nom du jeu de données
|
|
"""
|
|
dataset_id = ImportedDataset.select().where(ImportedDataset.dataset == dataset_name)
|
|
if not len(dataset_id) == 0:
|
|
print(f"Le dataset {dataset_name} a déjà été importé.")
|
|
return
|
|
print(f"Import en base des {len(data_list)} personnes.")
|
|
with database.atomic():
|
|
for data in data_list:
|
|
# Gestion de la clé étrangère prénom
|
|
first_name = data.pop('first_name')
|
|
try:
|
|
first_name_id = FirstName.get(FirstName.first_name == first_name)
|
|
except peewee.DoesNotExist:
|
|
first_name_id = FirstName.create(first_name=first_name)
|
|
data['first_name'] = first_name_id
|
|
# Gestion de la clé étrangère nom de famille
|
|
last_name = data.pop('last_name')
|
|
try:
|
|
last_name_id = LastName.get(LastName.last_name == last_name)
|
|
except peewee.DoesNotExist:
|
|
last_name_id = LastName.create(last_name=last_name)
|
|
data['last_name'] = last_name_id
|
|
Person.insert(**data).execute()
|
|
ImportedDataset.insert(dataset=dataset_name).execute()
|
|
print("Import terminé")
|