305 lines
14 KiB
Python
305 lines
14 KiB
Python
import re
|
|
|
|
from bs4 import BeautifulSoup
|
|
from sqlalchemy import Engine, select
|
|
from sqlalchemy.orm import Session
|
|
from tqdm import tqdm
|
|
|
|
from nupes.cache import get_file
|
|
from nupes.models.europeennes2024 import ResultatsFrance, ResultatsRegion, ResultatsDepartement, ResultatsCommune, \
|
|
VoixListeFrance, VoixListeRegion, VoixListeDepartement, VoixListeCommune, Liste
|
|
from nupes.models.geographie import Region, Departement, Commune
|
|
|
|
BASE_URL = "https://www.resultats-elections.interieur.gouv.fr/europeennes2024/ensemble_geographique"
|
|
|
|
|
|
def importer_resultats_france(engine: Engine, debug: bool = False) -> None:
|
|
file = get_file(f"{BASE_URL}/index.html", "resultats2024/resultats.html")
|
|
|
|
with file.open() as f:
|
|
resultats = analyser_resultats(f)
|
|
|
|
if not resultats:
|
|
return
|
|
|
|
with Session(engine) as session:
|
|
if resultats_france := session.execute(select(ResultatsFrance)).scalar_one_or_none():
|
|
resultats_france.inscrits = resultats["inscrits"]
|
|
resultats_france.abstentions = resultats["abstentions"]
|
|
resultats_france.votants = resultats["votants"]
|
|
resultats_france.blancs = resultats["blancs"]
|
|
resultats_france.nuls = resultats["nuls"]
|
|
resultats_france.exprimes = resultats["exprimes"]
|
|
else:
|
|
resultats_france = ResultatsFrance(
|
|
inscrits=resultats["inscrits"],
|
|
abstentions=resultats["abstentions"],
|
|
votants=resultats["votants"],
|
|
blancs=resultats["blancs"],
|
|
nuls=resultats["nuls"],
|
|
exprimes=resultats["exprimes"],
|
|
)
|
|
session.add(resultats_france)
|
|
|
|
for voix_liste in resultats["resultats_par_liste"]:
|
|
if voix_liste_france := session.execute(
|
|
select(VoixListeFrance).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())
|
|
).scalar_one_or_none():
|
|
voix_liste_france.voix = voix_liste["voix"]
|
|
else:
|
|
liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())).scalar_one()
|
|
voix_liste_france = VoixListeFrance(liste_id=liste.id, voix=voix_liste["voix"])
|
|
session.add(voix_liste_france)
|
|
|
|
session.commit()
|
|
|
|
def importer_resultats_regions(engine: Engine, debug: bool = False) -> None:
|
|
with Session(engine) as session:
|
|
regions = session.execute(select(Region)).scalars().all()
|
|
regions_iterator = tqdm(regions, desc="Régions") if debug else regions
|
|
for region in regions_iterator:
|
|
reg_code = region.code_insee
|
|
if reg_code == "984" or reg_code == "989":
|
|
# TAAF + Île de Clipperton, pas d'élection
|
|
continue
|
|
elif reg_code == "977" or reg_code == "978":
|
|
# Saint-Barthélemy et Saint-Martin, pas mis dans une région mais dans un unique département
|
|
continue
|
|
|
|
file = get_file(f"{BASE_URL}/{reg_code}/index.html",
|
|
f"resultats2024/region_{reg_code}/resultats.html")
|
|
|
|
with file.open() as f:
|
|
resultats = analyser_resultats(f)
|
|
|
|
if not resultats:
|
|
continue
|
|
|
|
if resultats_region := session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \
|
|
.scalar_one_or_none():
|
|
resultats_region.inscrits = resultats["inscrits"]
|
|
resultats_region.abstentions = resultats["abstentions"]
|
|
resultats_region.votants = resultats["votants"]
|
|
resultats_region.blancs = resultats["blancs"]
|
|
resultats_region.nuls = resultats["nuls"]
|
|
resultats_region.exprimes = resultats["exprimes"]
|
|
else:
|
|
resultats_region = ResultatsRegion(
|
|
region_id=reg_code,
|
|
inscrits=resultats["inscrits"],
|
|
abstentions=resultats["abstentions"],
|
|
votants=resultats["votants"],
|
|
blancs=resultats["blancs"],
|
|
nuls=resultats["nuls"],
|
|
exprimes=resultats["exprimes"],
|
|
)
|
|
session.add(resultats_region)
|
|
|
|
for voix_liste in resultats["resultats_par_liste"]:
|
|
if voix_liste_region := session.execute(
|
|
select(VoixListeRegion).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())
|
|
).scalar_one_or_none():
|
|
voix_liste_region.voix = voix_liste["voix"]
|
|
else:
|
|
liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \
|
|
.scalar_one()
|
|
voix_liste_region = VoixListeRegion(liste_id=liste.id, voix=voix_liste["voix"])
|
|
session.add(voix_liste_region)
|
|
|
|
session.commit()
|
|
|
|
|
|
def importer_resultats_departements(engine: Engine, debug: bool = False) -> None:
|
|
with Session(engine) as session:
|
|
departements = session.execute(select(Departement)).scalars().all()
|
|
iterator = tqdm(departements, desc="Départements") if debug else departements
|
|
for dpt in iterator:
|
|
reg_code = dpt.region_code
|
|
reg_path = f"{reg_code}/"
|
|
dpt_code = dpt.code_insee
|
|
if dpt_code == "984" or dpt_code == "989":
|
|
# TAAF + Île de Clipperton, pas d'élection
|
|
continue
|
|
elif dpt_code == "977" or dpt_code == "978":
|
|
# Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX
|
|
reg_path = ""
|
|
dpt_code = "ZX"
|
|
if dpt_code in ["975", "977", "978", "986", "987", "988"]:
|
|
# Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions,
|
|
# on ne les regroupe pas dans des régions
|
|
reg_path = ""
|
|
|
|
file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/index.html",
|
|
f"resultats2024/region_{reg_code}/dpt_{dpt_code}/resultats.html")
|
|
|
|
with file.open() as f:
|
|
resultats = analyser_resultats(f)
|
|
|
|
if not resultats:
|
|
continue
|
|
|
|
if resultats_departement := session.execute(
|
|
select(ResultatsDepartement).filter_by(dpt_id=dpt_code)).scalar_one_or_none():
|
|
resultats_departement.inscrits = resultats["inscrits"]
|
|
resultats_departement.abstentions = resultats["abstentions"]
|
|
resultats_departement.votants = resultats["votants"]
|
|
resultats_departement.blancs = resultats["blancs"]
|
|
resultats_departement.nuls = resultats["nuls"]
|
|
resultats_departement.exprimes = resultats["exprimes"]
|
|
else:
|
|
resultats_departement = ResultatsDepartement(
|
|
dpt_id=dpt_code,
|
|
inscrits=resultats["inscrits"],
|
|
abstentions=resultats["abstentions"],
|
|
votants=resultats["votants"],
|
|
blancs=resultats["blancs"],
|
|
nuls=resultats["nuls"],
|
|
exprimes=resultats["exprimes"],
|
|
)
|
|
session.add(resultats_departement)
|
|
|
|
for voix_liste in resultats["resultats_par_liste"]:
|
|
if voix_liste_departement := session.execute(
|
|
select(VoixListeDepartement).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())
|
|
).scalar_one_or_none():
|
|
voix_liste_departement.voix = voix_liste["voix"]
|
|
else:
|
|
liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \
|
|
.scalar_one()
|
|
voix_liste_departement = VoixListeDepartement(liste_id=liste.id, voix=voix_liste["voix"])
|
|
session.add(voix_liste_departement)
|
|
|
|
session.commit()
|
|
|
|
|
|
def importer_resultats_communes(engine: Engine, debug: bool = False) -> None:
|
|
with Session(engine) as session:
|
|
communes = session.execute(select(Commune)).scalars().all()
|
|
iterator = tqdm(communes, desc="Communes") if debug else communes
|
|
for commune in iterator:
|
|
reg_code = commune.departement.region_code
|
|
reg_path = f"{reg_code}/"
|
|
dpt_code = commune.departement_code
|
|
com_code = commune.code_insee
|
|
if dpt_code == "984" or dpt_code == "989":
|
|
# TAAF + Île de Clipperton, pas d'élection
|
|
continue
|
|
elif dpt_code == "977" or dpt_code == "978":
|
|
# Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX
|
|
reg_path = ""
|
|
dpt_code = "ZX"
|
|
com_code = f"ZX{com_code[2:]}"
|
|
if dpt_code in ["975", "977", "978", "986", "987", "988"]:
|
|
# Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions,
|
|
# on ne les regroupe pas dans des régions
|
|
reg_path = ""
|
|
if com_code in ["08294", "16355", "18131", "25282", "25549", "35112", "49321", "55039", "55050", "55139",
|
|
"55189", "55239", "55307", "64541", "69152", "75056", "85041", "85271", "86231", "95282",
|
|
"98611", "98612", "98613"]:
|
|
# Aucun⋅e habitant⋅e donc pas de bureau de vote
|
|
# Ou alors la commune a récemment fusionné
|
|
continue
|
|
|
|
file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/{com_code}/index.html",
|
|
f"resultats2024/region_{reg_code}/dpt_{dpt_code}"
|
|
f"/commune_{com_code}/resultats.html")
|
|
if not file:
|
|
continue
|
|
|
|
with file.open() as f:
|
|
resultats = analyser_resultats(f)
|
|
|
|
if not resultats:
|
|
continue
|
|
|
|
if resultats_commune := session.execute(
|
|
select(ResultatsCommune).filter_by(commune_id=com_code)).scalar_one_or_none():
|
|
resultats_commune.inscrits = resultats["inscrits"]
|
|
resultats_commune.abstentions = resultats["abstentions"]
|
|
resultats_commune.votants = resultats["votants"]
|
|
resultats_commune.blancs = resultats["blancs"]
|
|
resultats_commune.nuls = resultats["nuls"]
|
|
resultats_commune.exprimes = resultats["exprimes"]
|
|
else:
|
|
resultats_commune = ResultatsCommune(
|
|
commune_id=com_code,
|
|
inscrits=resultats["inscrits"],
|
|
abstentions=resultats["abstentions"],
|
|
votants=resultats["votants"],
|
|
blancs=resultats["blancs"],
|
|
nuls=resultats["nuls"],
|
|
exprimes=resultats["exprimes"],
|
|
)
|
|
session.add(resultats_commune)
|
|
|
|
for voix_liste in resultats["resultats_par_liste"]:
|
|
if voix_liste_commune := session.execute(
|
|
select(VoixListeCommune).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())
|
|
).scalar_one_or_none():
|
|
voix_liste_commune.voix = voix_liste["voix"]
|
|
else:
|
|
liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \
|
|
.scalar_one()
|
|
voix_liste_commune = VoixListeCommune(liste_id=liste.id, voix=voix_liste["voix"])
|
|
session.add(voix_liste_commune)
|
|
|
|
session.commit()
|
|
|
|
|
|
def analyser_resultats(file) -> dict:
|
|
parsed_data = {}
|
|
|
|
soup = BeautifulSoup(file, 'html.parser')
|
|
tables = soup.find_all('table')
|
|
|
|
for table in tables:
|
|
if table.find('th', text=re.compile("Voix")):
|
|
resultats_par_liste = []
|
|
parsed_data['resultats_par_liste'] = resultats_par_liste
|
|
|
|
tbody = table.tbody
|
|
for line in tbody.find_all('tr'):
|
|
cells = line.find_all('td')
|
|
if len(cells) == 0:
|
|
continue
|
|
|
|
liste = {}
|
|
resultats_par_liste.append(liste)
|
|
|
|
liste['nom'] = cells[0].string
|
|
liste['voix'] = int(cells[1].string.replace(" ", ""))
|
|
elif table.find('td', text=re.compile("Inscrits")):
|
|
tbody = table.tbody
|
|
for line in tbody.find_all('tr'):
|
|
cells = line.find_all('td')
|
|
if len(cells) == 0:
|
|
continue
|
|
|
|
nom = cells[0].string
|
|
number = int(cells[1].string.replace(" ", ""))
|
|
match nom:
|
|
case "Inscrits":
|
|
parsed_data["inscrits"] = number
|
|
case "Abstentions":
|
|
parsed_data["abstentions"] = number
|
|
case "Votants":
|
|
parsed_data["votants"] = number
|
|
case "Blancs":
|
|
parsed_data["blancs"] = number
|
|
case "Nuls":
|
|
parsed_data["nuls"] = number
|
|
case "Exprimés":
|
|
parsed_data["exprimes"] = number
|
|
case _:
|
|
print(f"Unknown cell: {nom}")
|
|
|
|
return parsed_data
|
|
|
|
|
|
def run(engine: Engine, debug: bool = False) -> None:
|
|
importer_resultats_france(engine, debug)
|
|
importer_resultats_regions(engine, debug)
|
|
importer_resultats_departements(engine, debug)
|
|
# FIXME Les communes prennent trop de temps
|
|
# importer_resultats_communes(engine, debug)
|