from csv import DictReader from sqlalchemy import Engine, select from sqlalchemy.orm import Session from tqdm import tqdm from nupes.cache import get_file from nupes.models import BureauVote, Departement from nupes.models.europeennes2024 import ResultatsBureauVote, Liste, ResultatsCommune, \ ResultatsDepartement, ResultatsRegion, ResultatsFrance, \ VoixListeBureauVote, VoixListeCommune, VoixListeDepartement, VoixListeRegion, VoixListeFrance, \ ResultatsCirconscription, VoixListeCirconscription def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/cc1883d9-1265-4365-b754-fb6aef22d82e", "resultats-europeennes-par-bureau-de-vote.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Bureau de vote", disable=not verbose): com_code = str(row['Code commune']).zfill(5) bv_code = row['Code BV'] if com_code == "60694": com_code = "60054" # Les Hauts-Talicans, données pas à jour elif com_code in ["85165", "85212"]: # Communes nouvelles ayant défusionné en 2024 # TODO Gérer continue elif com_code == "98601": # 3 royaumes à Wallis-et-Futuna, mais un seul résultat de commune com_code = "98611" if bv := session.execute(select(BureauVote).filter_by(commune_code=com_code, code_bureau=bv_code)) \ .scalar_one_or_none(): bv_id = bv.id else: print(f"Bureau de vote {com_code}_{bv_code} non trouvé") continue resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one_or_none() if not resultats_bv: resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one_or_none() if not resultats_commune: resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=row['Code département'])) \ .scalar_one_or_none() if not resultats_dpt: dpt = session.execute(select(Departement) .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ .scalar_one() resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) session.add(resultats_reg) resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)).scalar_one() resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'], resultats_region_id=resultats_reg.id) session.add(resultats_dpt) resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=row['Code département'])).scalar_one() resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id) session.add(resultats_commune) resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one() resultats_bv = ResultatsBureauVote(bv_id=bv_id, resultats_commune_id=resultats_commune.id) session.add(resultats_bv) resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one() resultats_bv.inscrits = int(row['Inscrits']) resultats_bv.votants = int(row['Votants']) resultats_bv.abstentions = int(row['Abstentions']) resultats_bv.exprimes = int(row['Exprimés']) resultats_bv.blancs = int(row['Blancs']) resultats_bv.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_bv = session.execute(select(VoixListeBureauVote) .filter_by(resultats_bureau_vote_id=resultats_bv.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_bv: voix_liste_bv = VoixListeBureauVote(resultats_bureau_vote_id=resultats_bv.id, liste_id=liste.id) session.add(voix_liste_bv) voix_liste_bv.voix = int(row[f"Voix {liste.id}"]) session.commit() def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/6a782ef9-8ad6-4e66-832d-338b1041a42d", "resultats-europeennes-par-commune.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Commune", disable=not verbose): com_code = str(row['Code commune']).zfill(5) dpt_code = row['Code département'] if dpt_code == "ZX": dpt_code = "977" if com_code == "60694": com_code = "60054" # Les Hauts-Talicans, données pas à jour elif com_code in ["85165", "85212"]: # Communes nouvelles ayant défusionné en 2024 # TODO Gérer continue elif com_code == "98601": # 3 royaumes à Wallis-et-Futuna, mais un seul résultat de commune com_code = "98611" resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one_or_none() if not resultats_commune: resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=dpt_code)) \ .scalar_one_or_none() if not resultats_dpt: dpt = session.execute(select(Departement) .filter_by(code_insee=f"{dpt_code.zfill(2)}")) \ .scalar_one() resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) session.add(resultats_reg) resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)).scalar_one() resultats_dpt = ResultatsDepartement(dpt_id=dpt_code, resultats_region_id=resultats_reg.id) session.add(resultats_dpt) resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=dpt_code)).scalar_one() resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id) session.add(resultats_commune) resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one() resultats_commune.inscrits = int(row['Inscrits']) resultats_commune.votants = int(row['Votants']) resultats_commune.abstentions = int(row['Abstentions']) resultats_commune.exprimes = int(row['Exprimés']) resultats_commune.blancs = int(row['Blancs']) resultats_commune.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_com = session.execute(select(VoixListeCommune) .filter_by(resultats_commune_id=resultats_commune.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_com: voix_liste_com = VoixListeCommune(resultats_commune_id=resultats_commune.id, liste_id=liste.id) session.add(voix_liste_com) voix_liste_com.voix = int(row[f"Voix {liste.id}"]) session.commit() def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/ee37cbef-3d2a-4efe-a395-530b85a63028", "resultats-europeennes-par-circonscription.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Circonscription", disable=not verbose): circo_code = row['Code circonscription législative'] dpt_code, circo_code = circo_code[:-2], circo_code[-2:] if dpt_code == "ZX": dpt_code = "977" circo_id = f"{dpt_code.zfill(2)}-{circo_code.zfill(2)}" resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_id)) \ .scalar_one_or_none() if not resultats_circo: resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ .scalar_one_or_none() if not resultats_dpt: dpt = session.execute(select(Departement).filter_by(code_insee=f"{dpt_code.zfill(2)}")) \ .scalar_one() resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) session.add(resultats_reg) resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)).scalar_one() resultats_dpt = ResultatsDepartement(dpt_id=dpt_code, resultats_region_id=resultats_reg.id) session.add(resultats_dpt) resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=dpt_code)).scalar_one() resultats_circo = ResultatsCirconscription(circo_id=circo_id, resultats_departement_id=resultats_dpt.id) session.add(resultats_circo) resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_id)) \ .scalar_one() resultats_circo.inscrits = int(row['Inscrits']) resultats_circo.votants = int(row['Votants']) resultats_circo.abstentions = int(row['Abstentions']) resultats_circo.exprimes = int(row['Exprimés']) resultats_circo.blancs = int(row['Blancs']) resultats_circo.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_circo = session.execute(select(VoixListeCirconscription) .filter_by(resultats_circonscription_id=resultats_circo.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_circo: voix_liste_circo = VoixListeCirconscription(resultats_circonscription_id=resultats_circo.id, liste_id=liste.id) session.add(voix_liste_circo) voix_liste_circo.voix = int(row[f"Voix {liste.id}"]) session.commit() def importer_resultats_departement(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/b77cc4da-644f-4323-b6f7-ae6fe9b33f86", "resultats-europeennes-par-departement.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Département", disable=not verbose): dpt_code = row['Code département'] if dpt_code == "ZX": dpt_code = "977" resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ .scalar_one_or_none() if not resultats_dpt: print(f"Département {dpt_code} non trouvé") dpt = session.execute(select(Departement) .filter_by(code_insee=f"{dpt_code.zfill(2)}")) \ .scalar_one() resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) session.add(resultats_reg) resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)).scalar_one() resultats_dpt = ResultatsDepartement(dpt_id=dpt_code, resultats_region_id=resultats_reg.id) session.add(resultats_dpt) resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ .scalar_one() resultats_dpt.inscrits = int(row['Inscrits']) resultats_dpt.votants = int(row['Votants']) resultats_dpt.abstentions = int(row['Abstentions']) resultats_dpt.exprimes = int(row['Exprimés']) resultats_dpt.blancs = int(row['Blancs']) resultats_dpt.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_dpt = session.execute(select(VoixListeDepartement) .filter_by(resultats_departement_id=resultats_dpt.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_dpt: voix_liste_dpt = VoixListeDepartement(resultats_departement_id=resultats_dpt.id, liste_id=liste.id) session.add(voix_liste_dpt) voix_liste_dpt.voix = int(row[f"Voix {liste.id}"]) session.commit() def importer_resultats_region(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/7c3a854b-7344-4c68-b1f9-9d651b4ca823", "resultats-europeennes-par-region.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Région", disable=not verbose): reg_code = row['Code région'] resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=reg_code, resultats_france_id=resultats_france.id) session.add(resultats_reg) session.commit() resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \ .scalar_one() resultats_reg.inscrits = int(row['Inscrits']) resultats_reg.votants = int(row['Votants']) resultats_reg.abstentions = int(row['Abstentions']) resultats_reg.exprimes = int(row['Exprimés']) resultats_reg.blancs = int(row['Blancs']) resultats_reg.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_reg = session.execute(select(VoixListeRegion) .filter_by(resultats_region_id=resultats_reg.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_reg: voix_liste_reg = VoixListeRegion(resultats_region_id=resultats_reg.id, liste_id=liste.id) session.add(voix_liste_reg) voix_liste_reg.voix = int(row[f"Voix {liste.id}"]) session.commit() def calculer_resultats_france(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_france.inscrits = 0 resultats_france.votants = 0 resultats_france.abstentions = 0 resultats_france.exprimes = 0 resultats_france.blancs = 0 resultats_france.nuls = 0 for voix_liste_france in session.execute( select(VoixListeFrance).filter_by(resultats_france_id=resultats_france.id)).scalars().all(): voix_liste_france.voix = 0 # Réinitialisation des voix for resultats_dpt in session.execute(select(ResultatsDepartement)).scalars().all(): resultats_france.inscrits += resultats_dpt.inscrits resultats_france.votants += resultats_dpt.votants resultats_france.abstentions += resultats_dpt.abstentions resultats_france.exprimes += resultats_dpt.exprimes resultats_france.blancs += resultats_dpt.blancs resultats_france.nuls += resultats_dpt.nuls for voix_liste_dpt in session.execute(select(VoixListeDepartement) .filter_by(resultats_departement_id=resultats_dpt.id)) \ .scalars().all(): if voix_liste_france := session.execute( select(VoixListeFrance).filter_by(resultats_france_id=resultats_france.id, liste_id=voix_liste_dpt.liste_id)).scalar_one_or_none(): voix_liste_france.voix += voix_liste_dpt.voix else: session.add(VoixListeFrance(resultats_france_id=resultats_france.id, liste_id=voix_liste_dpt.liste_id, voix=voix_liste_dpt.voix)) session.commit() def run(engine: Engine, verbose: bool = False) -> None: importer_resultats_region(engine, verbose) importer_resultats_departement(engine, verbose) calculer_resultats_france(engine, verbose) importer_resultats_circo(engine, verbose) importer_resultats_commune(engine, verbose) importer_resultats_bv(engine, verbose)