from csv import DictReader from sqlalchemy import Engine, select from sqlalchemy.orm import Session from tqdm import tqdm from nupes.cache import get_file from nupes.models import BureauVote, Departement from nupes.models.europeennes2024 import ResultatsBureauVote, Liste, ResultatsCommune, \ ResultatsDepartement, ResultatsRegion, ResultatsFrance, \ VoixListeBureauVote, VoixListeCommune, VoixListeDepartement, VoixListeRegion, VoixListeFrance def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/937bb638-a487-40cd-9a0b-610d539a4207", "resultats-temporaires-par-bureau-de-vote.csv") with file.open('r') as f: next(f) # On saute la première ligne reader = DictReader(f) with Session(engine) as session: for row in tqdm(reader, desc="Bureau de vote", disable=not verbose): com_code = str(row['Code commune']).zfill(5) bv_code = row['Code BV'] bv_id = f"{com_code}_{bv_code}" if not session.execute(select(BureauVote).filter_by(id=bv_id)).scalar_one_or_none(): print(f"Bureau de vote {bv_id} non trouvé") continue resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one_or_none() if not resultats_bv: resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one_or_none() if not resultats_commune: resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=row['Code département'])) \ .scalar_one_or_none() if not resultats_dpt: dpt = session.execute(select(Departement) .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ .scalar_one() resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)) \ .scalar_one_or_none() if not resultats_reg: resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() if not resultats_france: session.add(ResultatsFrance()) resultats_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) session.add(resultats_reg) resultats_reg = session.execute(select(ResultatsRegion) .filter_by(region_id=dpt.region_code)).scalar_one() resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'], resultats_region_id=resultats_reg.id) session.add(resultats_dpt) resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(dpt_id=row['Code département'])).scalar_one() resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id) session.add(resultats_commune) resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ .scalar_one() resultats_bv = ResultatsBureauVote(bv_id=bv_id, resultats_commune_id=resultats_commune.id) session.add(resultats_bv) resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one() resultats_bv.inscrits = int(row['Inscrits']) resultats_bv.votants = int(row['Votants']) resultats_bv.abstentions = int(row['Abstentions']) resultats_bv.exprimes = int(row['Exprimés']) resultats_bv.blancs = int(row['Blancs']) resultats_bv.nuls = int(row['Nuls']) for liste in session.execute(select(Liste)).scalars().all(): voix_liste_bv = session.execute(select(VoixListeBureauVote) .filter_by(resultats_bureau_vote_id=resultats_bv.id, liste_id=liste.id)) \ .scalar_one_or_none() if not voix_liste_bv: voix_liste_bv = VoixListeBureauVote(resultats_bureau_vote_id=resultats_bv.id, liste_id=liste.id) session.add(voix_liste_bv) voix_liste_bv.voix = int(row[f"Voix {liste.id}"]) session.commit() def calculer_resultats_commune(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: for resultat_commune in tqdm(session.execute(select(ResultatsCommune)).scalars(), desc="Communes", disable=not verbose): resultats_bv = session.execute(select(ResultatsBureauVote) .filter_by(resultats_commune_id=resultat_commune.id)).scalars().all() resultat_commune.inscrits = sum(r.inscrits for r in resultats_bv) resultat_commune.votants = sum(r.votants for r in resultats_bv) resultat_commune.abstentions = sum(r.abstentions for r in resultats_bv) resultat_commune.exprimes = sum(r.exprimes for r in resultats_bv) resultat_commune.blancs = sum(r.blancs for r in resultats_bv) resultat_commune.nuls = sum(r.nuls for r in resultats_bv) for liste in session.execute(select(Liste)).scalars().all(): voix_liste = session.execute(select(VoixListeBureauVote).filter_by(liste_id=liste.id) .join(ResultatsBureauVote) .filter_by(resultats_commune_id=resultat_commune.id)) \ .scalars().all() voix = sum(v.voix for v in voix_liste) if voix_liste_commune := session.execute( select(VoixListeCommune).filter_by(resultats_commune_id=resultat_commune.id, liste_id=liste.id)).scalar_one_or_none(): voix_liste_commune.voix = voix else: voix_liste_commune = VoixListeCommune(resultats_commune_id=resultat_commune.id, liste_id=liste.id, voix=voix) session.add(voix_liste_commune) session.commit() def calculer_resultats_circo(engine: Engine, verbose: bool = False) -> None: pass def calculer_resultats_departement(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: for resultat_dpt in tqdm(session.execute(select(ResultatsDepartement)).scalars(), desc="Départements", disable=not verbose): resultats_commune = session.execute(select(ResultatsCommune) .filter_by(resultats_dpt_id=resultat_dpt.id)).scalars().all() resultat_dpt.inscrits = sum(r.inscrits for r in resultats_commune) resultat_dpt.votants = sum(r.votants for r in resultats_commune) resultat_dpt.abstentions = sum(r.abstentions for r in resultats_commune) resultat_dpt.exprimes = sum(r.exprimes for r in resultats_commune) resultat_dpt.blancs = sum(r.blancs for r in resultats_commune) resultat_dpt.nuls = sum(r.nuls for r in resultats_commune) for liste in session.execute(select(Liste)).scalars().all(): voix_liste = session.execute(select(VoixListeCommune).filter_by(liste_id=liste.id) .join(ResultatsCommune) .filter_by(resultats_dpt_id=resultat_dpt.id)) \ .scalars().all() voix = sum(v.voix for v in voix_liste) if voix_liste_dpt := session.execute(select(VoixListeDepartement) .filter_by(resultats_departement_id=resultat_dpt.id, liste_id=liste.id)) \ .scalar_one_or_none(): voix_liste_dpt.voix = voix else: voix_liste_dpt = VoixListeDepartement(resultats_departement_id=resultat_dpt.id, liste_id=liste.id, voix=voix) session.add(voix_liste_dpt) session.commit() def calculer_resultats_region(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: for resultat_reg in tqdm(session.execute(select(ResultatsRegion)).scalars(), desc="Régions", disable=not verbose): resultats_dpt = session.execute(select(ResultatsDepartement) .filter_by(resultats_region_id=resultat_reg.id)).scalars().all() resultat_reg.inscrits = sum(r.inscrits for r in resultats_dpt) resultat_reg.votants = sum(r.votants for r in resultats_dpt) resultat_reg.abstentions = sum(r.abstentions for r in resultats_dpt) resultat_reg.exprimes = sum(r.exprimes for r in resultats_dpt) resultat_reg.blancs = sum(r.blancs for r in resultats_dpt) resultat_reg.nuls = sum(r.nuls for r in resultats_dpt) for liste in session.execute(select(Liste)).scalars().all(): voix_liste = session.execute(select(VoixListeDepartement).filter_by(liste_id=liste.id) .join(ResultatsDepartement) .filter_by(resultats_region_id=resultat_reg.id)) \ .scalars().all() voix = sum(v.voix for v in voix_liste) if voix_liste_reg := session.execute(select(VoixListeRegion) .filter_by(resultats_region_id=resultat_reg.id, liste_id=liste.id)).scalar_one_or_none(): voix_liste_reg.voix = voix else: voix_liste_reg = VoixListeRegion(resultats_region_id=resultat_reg.id, liste_id=liste.id, voix=voix) session.add(voix_liste_reg) session.commit() def calculer_resultats_national(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: resultat_france = session.execute(select(ResultatsFrance)).scalar_one() resultats_reg = session.execute(select(ResultatsRegion)).scalars().all() resultat_france.inscrits = sum(r.inscrits for r in resultats_reg) resultat_france.votants = sum(r.votants for r in resultats_reg) resultat_france.abstentions = sum(r.abstentions for r in resultats_reg) resultat_france.exprimes = sum(r.exprimes for r in resultats_reg) resultat_france.blancs = sum(r.blancs for r in resultats_reg) resultat_france.nuls = sum(r.nuls for r in resultats_reg) for liste in session.execute(select(Liste)).scalars().all(): voix_liste = session.execute(select(VoixListeRegion).filter_by(liste_id=liste.id).join(ResultatsRegion) .filter_by(resultats_france_id=resultat_france.id)) \ .scalars().all() voix = sum(v.voix for v in voix_liste) if voix_liste_france := session.execute(select(VoixListeFrance) .filter_by(resultats_france_id=resultat_france.id, liste_id=liste.id)).scalar_one_or_none(): voix_liste_france.voix = voix else: voix_liste_france = VoixListeFrance(resultats_france_id=resultat_france.id, liste_id=liste.id, voix=voix) session.add(voix_liste_france) session.commit() def run(engine: Engine, verbose: bool = False) -> None: importer_resultats_bv(engine, verbose) calculer_resultats_commune(engine, verbose) # calculer_resultats_circo(engine, verbose) calculer_resultats_departement(engine, verbose) calculer_resultats_region(engine, verbose) calculer_resultats_national(engine, verbose)