import re from bs4 import BeautifulSoup from sqlalchemy import Engine, select from sqlalchemy.orm import Session from tqdm import tqdm from nupes.cache import get_file from nupes.models.europeennes2024 import ResultatsFrance, ResultatsRegion, ResultatsDepartement, ResultatsCommune, \ VoixListeFrance, VoixListeRegion, VoixListeDepartement, VoixListeCommune, Liste from nupes.models.geographie import Region, Departement, Commune BASE_URL = "https://www.resultats-elections.interieur.gouv.fr/europeennes2024/ensemble_geographique" def importer_resultats_france(engine: Engine) -> None: file = get_file(f"{BASE_URL}/index.html", "resultats2024/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: return with Session(engine) as session: if resultats_france := session.execute(select(ResultatsFrance)).scalar_one_or_none(): resultats_france.inscrits = resultats["inscrits"] resultats_france.abstentions = resultats["abstentions"] resultats_france.votants = resultats["votants"] resultats_france.blancs = resultats["blancs"] resultats_france.nuls = resultats["nuls"] resultats_france.exprimes = resultats["exprimes"] else: resultats_france = ResultatsFrance( inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_france) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_france := session.execute( select(VoixListeFrance).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_france.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())).scalar_one() voix_liste_france = VoixListeFrance(liste_id=liste.id, voix=voix_liste["voix"]) session.add(voix_liste_france) session.commit() def importer_resultats_regions(engine: Engine) -> None: with Session(engine) as session: for region in tqdm(session.execute(select(Region)).scalars().all(), desc="Régions"): reg_code = region.code_insee if reg_code == "984" or reg_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif reg_code == "977" or reg_code == "978": # Saint-Barthélemy et Saint-Martin, pas mis dans une région mais dans un unique département continue file = get_file(f"{BASE_URL}/{reg_code}/index.html", f"resultats2024/region_{reg_code}/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_region := session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \ .scalar_one_or_none(): resultats_region.inscrits = resultats["inscrits"] resultats_region.abstentions = resultats["abstentions"] resultats_region.votants = resultats["votants"] resultats_region.blancs = resultats["blancs"] resultats_region.nuls = resultats["nuls"] resultats_region.exprimes = resultats["exprimes"] else: resultats_region = ResultatsRegion( region_id=reg_code, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_region) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_region := session.execute( select(VoixListeRegion).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_region.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_region = VoixListeRegion(liste_id=liste.id, voix=voix_liste["voix"]) session.add(voix_liste_region) session.commit() def importer_resultats_departements(engine: Engine) -> None: with Session(engine) as session: for dpt in tqdm(session.execute(select(Departement)).scalars().all(), desc="Départements"): reg_code = dpt.region_code reg_path = f"{reg_code}/" dpt_code = dpt.code_insee if dpt_code == "984" or dpt_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif dpt_code == "977" or dpt_code == "978": # Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX reg_path = "" dpt_code = "ZX" if dpt_code in ["975", "977", "978", "986", "987", "988"]: # Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions, # on ne les regroupe pas dans des régions reg_path = "" file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/index.html", f"resultats2024/region_{reg_code}/dpt_{dpt_code}/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_departement := session.execute( select(ResultatsDepartement).filter_by(dpt_id=dpt_code)).scalar_one_or_none(): resultats_departement.inscrits = resultats["inscrits"] resultats_departement.abstentions = resultats["abstentions"] resultats_departement.votants = resultats["votants"] resultats_departement.blancs = resultats["blancs"] resultats_departement.nuls = resultats["nuls"] resultats_departement.exprimes = resultats["exprimes"] else: resultats_departement = ResultatsDepartement( dpt_id=dpt_code, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_departement) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_departement := session.execute( select(VoixListeDepartement).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_departement.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_departement = VoixListeDepartement(liste_id=liste.id, voix=voix_liste["voix"]) session.add(voix_liste_departement) session.commit() def importer_resultats_communes(engine: Engine) -> None: with Session(engine) as session: # FIXME On n'importe que les communes du Bas-Rhin pour des raisons de gain de temps for commune in tqdm(session.execute(select(Commune).filter_by(departement_code=67)).scalars().all(), desc="Communes"): reg_code = commune.departement.region_code reg_path = f"{reg_code}/" dpt_code = commune.departement_code com_code = commune.code_insee if dpt_code == "984" or dpt_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif dpt_code == "977" or dpt_code == "978": # Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX reg_path = "" dpt_code = "ZX" com_code = f"ZX{com_code[2:]}" if dpt_code in ["975", "977", "978", "986", "987", "988"]: # Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions, # on ne les regroupe pas dans des régions reg_path = "" if com_code in ["08294", "16355", "18131", "25282", "25549", "35112", "49321", "55039", "55050", "55139", "55189", "55239", "55307", "64541", "69152", "75056", "85041", "85271", "86231", "95282", "98611", "98612", "98613"]: # Aucun⋅e habitant⋅e donc pas de bureau de vote # Ou alors la commune a récemment fusionné continue file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/{com_code}/index.html", f"resultats2024/region_{reg_code}/dpt_{dpt_code}" f"/commune_{com_code}/resultats.html") if not file: continue with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_commune := session.execute( select(ResultatsCommune).filter_by(commune_id=com_code)).scalar_one_or_none(): resultats_commune.inscrits = resultats["inscrits"] resultats_commune.abstentions = resultats["abstentions"] resultats_commune.votants = resultats["votants"] resultats_commune.blancs = resultats["blancs"] resultats_commune.nuls = resultats["nuls"] resultats_commune.exprimes = resultats["exprimes"] else: resultats_commune = ResultatsCommune( commune_id=com_code, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_commune) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_commune := session.execute( select(VoixListeCommune).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_commune.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_commune = VoixListeCommune(liste_id=liste.id, voix=voix_liste["voix"]) session.add(voix_liste_commune) session.commit() def analyser_resultats(file) -> dict: parsed_data = {} soup = BeautifulSoup(file, 'html.parser') tables = soup.find_all('table') for table in tables: if table.find('th', text=re.compile("Voix")): resultats_par_liste = [] parsed_data['resultats_par_liste'] = resultats_par_liste tbody = table.tbody for line in tbody.find_all('tr'): cells = line.find_all('td') if len(cells) == 0: continue liste = {} resultats_par_liste.append(liste) liste['nom'] = cells[0].string liste['voix'] = int(cells[1].string.replace(" ", "")) elif table.find('td', text=re.compile("Inscrits")): tbody = table.tbody for line in tbody.find_all('tr'): cells = line.find_all('td') if len(cells) == 0: continue nom = cells[0].string number = int(cells[1].string.replace(" ", "")) match nom: case "Inscrits": parsed_data["inscrits"] = number case "Abstentions": parsed_data["abstentions"] = number case "Votants": parsed_data["votants"] = number case "Blancs": parsed_data["blancs"] = number case "Nuls": parsed_data["nuls"] = number case "Exprimés": parsed_data["exprimes"] = number case _: print(f"Unknown cell: {nom}") return parsed_data def run(engine: Engine) -> None: importer_resultats_france(engine) importer_resultats_regions(engine) importer_resultats_departements(engine) importer_resultats_communes(engine)