import re from bs4 import BeautifulSoup from sqlalchemy import Engine, select from sqlalchemy.orm import Session from tqdm import tqdm from nupes.cache import get_file from nupes.models.europeennes2024 import ResultatsFrance, ResultatsRegion, ResultatsDepartement, ResultatsCommune, \ VoixListeFrance, VoixListeRegion, VoixListeDepartement, VoixListeCommune, Liste from nupes.models.geographie import Region, Departement, Commune BASE_URL = "https://www.resultats-elections.interieur.gouv.fr/europeennes2024/ensemble_geographique" def importer_resultats_france(engine: Engine, debug: bool = False) -> None: file = get_file(f"{BASE_URL}/index.html", "resultats2024/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: return with Session(engine) as session: if resultats_france := session.execute(select(ResultatsFrance)).scalar_one_or_none(): resultats_france.inscrits = resultats["inscrits"] resultats_france.abstentions = resultats["abstentions"] resultats_france.votants = resultats["votants"] resultats_france.blancs = resultats["blancs"] resultats_france.nuls = resultats["nuls"] resultats_france.exprimes = resultats["exprimes"] else: resultats_france = ResultatsFrance( inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_france) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_france := session.execute( select(VoixListeFrance).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_france.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())).scalar_one() voix_liste_france = VoixListeFrance(liste_id=liste.id, resultats_france_id=resultats_france.id, voix=voix_liste["voix"]) session.add(voix_liste_france) session.commit() def importer_resultats_regions(engine: Engine, debug: bool = False) -> None: with Session(engine) as session: regions = session.execute(select(Region)).scalars().all() regions_iterator = tqdm(regions, desc="Régions") if debug else regions for region in regions_iterator: reg_code = region.code_insee if reg_code == "984" or reg_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif reg_code == "977" or reg_code == "978": # Saint-Barthélemy et Saint-Martin, pas mis dans une région mais dans un unique département continue file = get_file(f"{BASE_URL}/{reg_code}/index.html", f"resultats2024/region_{reg_code}/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_region := session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \ .scalar_one_or_none(): resultats_region.inscrits = resultats["inscrits"] resultats_region.abstentions = resultats["abstentions"] resultats_region.votants = resultats["votants"] resultats_region.blancs = resultats["blancs"] resultats_region.nuls = resultats["nuls"] resultats_region.exprimes = resultats["exprimes"] else: resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_region = ResultatsRegion( region_id=reg_code, resultats_france_id=resultats_france.id, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_region) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_region := session.execute( select(VoixListeRegion).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_region.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_region = VoixListeRegion(liste_id=liste.id, resultats_region_id=resultats_region.id, voix=voix_liste["voix"]) session.add(voix_liste_region) session.commit() def importer_resultats_departements(engine: Engine, debug: bool = False) -> None: with Session(engine) as session: departements = session.execute(select(Departement)).scalars().all() iterator = tqdm(departements, desc="Départements") if debug else departements for dpt in iterator: reg_code = dpt.region_code reg_path = f"{reg_code}/" dpt_code = dpt.code_insee if dpt_code == "984" or dpt_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif dpt_code == "977" or dpt_code == "978": # Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX reg_path = "" dpt_code = "ZX" if dpt_code in ["975", "977", "978", "986", "987", "988"]: # Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions, # on ne les regroupe pas dans des régions reg_path = "" file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/index.html", f"resultats2024/region_{reg_code}/dpt_{dpt_code}/resultats.html") with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_departement := session.execute( select(ResultatsDepartement).filter_by(dpt_id=dpt.code_insee)).scalar_one_or_none(): resultats_departement.inscrits = resultats["inscrits"] resultats_departement.abstentions = resultats["abstentions"] resultats_departement.votants = resultats["votants"] resultats_departement.blancs = resultats["blancs"] resultats_departement.nuls = resultats["nuls"] resultats_departement.exprimes = resultats["exprimes"] else: resultats_region = session.execute(select(ResultatsRegion).filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() resultats_departement = ResultatsDepartement( dpt_id=dpt.code_insee, resultats_region_id=resultats_region.id if resultats_region else None, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_departement) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_departement := session.execute( select(VoixListeDepartement).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_departement.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_departement = VoixListeDepartement(liste_id=liste.id, resultats_departement_id=resultats_departement.id, voix=voix_liste["voix"]) session.add(voix_liste_departement) session.commit() def importer_resultats_communes(engine: Engine, debug: bool = False) -> None: with Session(engine) as session: communes = session.execute(select(Commune)).scalars().all() iterator = tqdm(communes, desc="Communes") if debug else communes for commune in iterator: reg_code = commune.departement.region_code reg_path = f"{reg_code}/" dpt_code = commune.departement_code com_code = commune.code_insee if dpt_code == "984" or dpt_code == "989": # TAAF + Île de Clipperton, pas d'élection continue elif dpt_code == "977" or dpt_code == "978": # Pour une raison obscure, Saint-Barthélemy et Saint-Martin sont dans le département ZX reg_path = "" dpt_code = "ZX" com_code = f"ZX{com_code[2:]}" if dpt_code in ["975", "977", "978", "986", "987", "988"]: # Pour ces collectivités d'outre-mer, qui ne sont pas dans des régions, # on ne les regroupe pas dans des régions reg_path = "" if com_code in ["08294", "16355", "18131", "25282", "25549", "35112", "49321", "55039", "55050", "55139", "55189", "55239", "55307", "64541", "69152", "75056", "85041", "85271", "86231", "95282", "98611", "98612", "98613"]: # Aucun⋅e habitant⋅e donc pas de bureau de vote # Ou alors la commune a récemment fusionné continue file = get_file(f"{BASE_URL}/{reg_path}{dpt_code}/{com_code}/index.html", f"resultats2024/region_{reg_code}/dpt_{dpt_code}" f"/commune_{com_code}/resultats.html") if not file: continue with file.open() as f: resultats = analyser_resultats(f) if not resultats: continue if resultats_commune := session.execute( select(ResultatsCommune).filter_by(commune_id=com_code)).scalar_one_or_none(): resultats_commune.inscrits = resultats["inscrits"] resultats_commune.abstentions = resultats["abstentions"] resultats_commune.votants = resultats["votants"] resultats_commune.blancs = resultats["blancs"] resultats_commune.nuls = resultats["nuls"] resultats_commune.exprimes = resultats["exprimes"] else: resultats_departement = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ .scalar_one_or_none() resultats_commune = ResultatsCommune( commune_id=com_code, resultats_dpt_id=resultats_departement.id if resultats_departement else None, inscrits=resultats["inscrits"], abstentions=resultats["abstentions"], votants=resultats["votants"], blancs=resultats["blancs"], nuls=resultats["nuls"], exprimes=resultats["exprimes"], ) session.add(resultats_commune) for voix_liste in resultats["resultats_par_liste"]: if voix_liste_commune := session.execute( select(VoixListeCommune).join(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper()) ).scalar_one_or_none(): voix_liste_commune.voix = voix_liste["voix"] else: liste = session.execute(select(Liste).filter_by(nom_majuscules=voix_liste["nom"].upper())) \ .scalar_one() voix_liste_commune = VoixListeCommune(liste_id=liste.id, resultats_commune_id=resultats_commune.id, voix=voix_liste["voix"]) session.add(voix_liste_commune) session.commit() def analyser_resultats(file) -> dict: parsed_data = {} soup = BeautifulSoup(file, 'html.parser') tables = soup.find_all('table') for table in tables: if table.find('th', text=re.compile("Voix")): resultats_par_liste = [] parsed_data['resultats_par_liste'] = resultats_par_liste tbody = table.tbody for line in tbody.find_all('tr'): cells = line.find_all('td') if len(cells) == 0: continue liste = {} resultats_par_liste.append(liste) liste['nom'] = cells[0].text.replace("\n", "").replace("\u202f", " ").strip() liste['voix'] = int(cells[2].string.replace("\n", "").replace(" ", "").replace("\u202f", "").strip()) elif table.find('td', text=re.compile("Inscrits")): tbody = table.tbody for line in tbody.find_all('tr'): cells = line.find_all('td') if len(cells) == 0: continue nom = cells[0].string.replace("\n", "").replace(" ", "").replace("\u202f", "").strip() number = int(cells[1].string.replace("\n", "").replace(" ", "").replace("\u202f", "").strip()) match nom: case "Inscrits": parsed_data["inscrits"] = number case "Abstentions": parsed_data["abstentions"] = number case "Votants": parsed_data["votants"] = number case "Blancs": parsed_data["blancs"] = number case "Nuls": parsed_data["nuls"] = number case "Exprimés": parsed_data["exprimes"] = number case _: print(f"Unknown cell: {nom}") return parsed_data def run(engine: Engine, debug: bool = False) -> None: importer_resultats_france(engine, debug) importer_resultats_regions(engine, debug) importer_resultats_departements(engine, debug) # FIXME Les communes prennent trop de temps # importer_resultats_communes(engine, debug)