nupes-elections/nupes/scripts/import_resultats_2024.py

389 lines
22 KiB
Python

from csv import DictReader
from sqlalchemy import Engine, select
from sqlalchemy.orm import Session
from tqdm import tqdm
from nupes.cache import get_file
from nupes.models import BureauVote, Departement
from nupes.models.europeennes2024 import ResultatsBureauVote, Liste, ResultatsCommune, \
ResultatsDepartement, ResultatsRegion, ResultatsFrance, \
VoixListeBureauVote, VoixListeCommune, VoixListeDepartement, VoixListeRegion, VoixListeFrance, \
ResultatsCirconscription, VoixListeCirconscription
def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None:
file = get_file("https://www.data.gouv.fr/fr/datasets/r/cc1883d9-1265-4365-b754-fb6aef22d82e",
"resultats-europeennes-par-bureau-de-vote.csv")
with file.open('r') as f:
reader = DictReader(f, delimiter=';')
with Session(engine) as session:
for row in tqdm(reader, desc="Bureau de vote", disable=not verbose):
com_code = str(row['Code commune']).zfill(5)
bv_code = row['Code BV']
if bv := session.execute(select(BureauVote).filter_by(commune_code=com_code, code_bureau=bv_code)) \
.scalar_one_or_none():
bv_id = bv.id
else:
print(f"Bureau de vote {com_code}_{bv_code} non trouvé")
continue
resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one_or_none()
if not resultats_bv:
resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \
.scalar_one_or_none()
if not resultats_commune:
resultats_dpt = session.execute(select(ResultatsDepartement)
.filter_by(dpt_id=row['Code département'])) \
.scalar_one_or_none()
if not resultats_dpt:
dpt = session.execute(select(Departement)
.filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \
.scalar_one()
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)) \
.scalar_one_or_none()
if not resultats_reg:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none()
if not resultats_france:
session.add(ResultatsFrance())
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_reg = ResultatsRegion(region_id=str(dpt.region_code),
resultats_france_id=resultats_france.id)
session.add(resultats_reg)
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)).scalar_one()
resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'],
resultats_region_id=resultats_reg.id)
session.add(resultats_dpt)
resultats_dpt = session.execute(select(ResultatsDepartement)
.filter_by(dpt_id=row['Code département'])).scalar_one()
resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id)
session.add(resultats_commune)
resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \
.scalar_one()
resultats_bv = ResultatsBureauVote(bv_id=bv_id, resultats_commune_id=resultats_commune.id)
session.add(resultats_bv)
resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one()
resultats_bv.inscrits = int(row['Inscrits'])
resultats_bv.votants = int(row['Votants'])
resultats_bv.abstentions = int(row['Abstentions'])
resultats_bv.exprimes = int(row['Exprimés'])
resultats_bv.blancs = int(row['Blancs'])
resultats_bv.nuls = int(row['Nuls'])
for liste in session.execute(select(Liste)).scalars().all():
voix_liste_bv = session.execute(select(VoixListeBureauVote)
.filter_by(resultats_bureau_vote_id=resultats_bv.id,
liste_id=liste.id)) \
.scalar_one_or_none()
if not voix_liste_bv:
voix_liste_bv = VoixListeBureauVote(resultats_bureau_vote_id=resultats_bv.id, liste_id=liste.id)
session.add(voix_liste_bv)
voix_liste_bv.voix = int(row[f"Voix {liste.id}"])
session.commit()
def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None:
file = get_file("https://www.data.gouv.fr/fr/datasets/r/6a782ef9-8ad6-4e66-832d-338b1041a42d",
"resultats-europeennes-par-commune.csv")
with file.open('r') as f:
reader = DictReader(f, delimiter=';')
with Session(engine) as session:
for row in tqdm(reader, desc="Commune", disable=not verbose):
com_code = str(row['Code commune']).zfill(5)
resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \
.scalar_one_or_none()
if not resultats_commune:
resultats_dpt = session.execute(select(ResultatsDepartement)
.filter_by(dpt_id=row['Code département'])) \
.scalar_one_or_none()
if not resultats_dpt:
dpt = session.execute(select(Departement)
.filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \
.scalar_one()
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)) \
.scalar_one_or_none()
if not resultats_reg:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none()
if not resultats_france:
session.add(ResultatsFrance())
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_reg = ResultatsRegion(region_id=str(dpt.region_code),
resultats_france_id=resultats_france.id)
session.add(resultats_reg)
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)).scalar_one()
resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'],
resultats_region_id=resultats_reg.id)
session.add(resultats_dpt)
resultats_dpt = session.execute(select(ResultatsDepartement)
.filter_by(dpt_id=row['Code département'])).scalar_one()
resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id)
session.add(resultats_commune)
resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \
.scalar_one()
resultats_commune.inscrits = int(row['Inscrits'])
resultats_commune.votants = int(row['Votants'])
resultats_commune.abstentions = int(row['Abstentions'])
resultats_commune.exprimes = int(row['Exprimés'])
resultats_commune.blancs = int(row['Blancs'])
resultats_commune.nuls = int(row['Nuls'])
for liste in session.execute(select(Liste)).scalars().all():
voix_liste_com = session.execute(select(VoixListeCommune)
.filter_by(resultats_commune_id=resultats_commune.id,
liste_id=liste.id)) \
.scalar_one_or_none()
if not voix_liste_com:
voix_liste_com = VoixListeCommune(resultats_commune_id=resultats_commune.id, liste_id=liste.id)
session.add(voix_liste_com)
voix_liste_com.voix = int(row[f"Voix {liste.id}"])
session.commit()
def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None:
file = get_file("https://www.data.gouv.fr/fr/datasets/r/ee37cbef-3d2a-4efe-a395-530b85a63028",
"resultats-europeennes-par-circonscription.csv")
with file.open('r') as f:
reader = DictReader(f, delimiter=';')
with Session(engine) as session:
for row in tqdm(reader, desc="Circonscription", disable=not verbose):
circo_code = row['Code circonscription législative']
dpt_code, circo_code = circo_code[:-2], circo_code[-2:]
if dpt_code == "ZX":
dpt_code = "977"
circo_id = f"{dpt_code.zfill(2)}-{circo_code.zfill(2)}"
resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_id)) \
.scalar_one_or_none()
if not resultats_circo:
resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \
.scalar_one_or_none()
if not resultats_dpt:
dpt = session.execute(select(Departement).filter_by(code_insee=f"{dpt_code.zfill(2)}")) \
.scalar_one()
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)) \
.scalar_one_or_none()
if not resultats_reg:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none()
if not resultats_france:
session.add(ResultatsFrance())
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_reg = ResultatsRegion(region_id=str(dpt.region_code),
resultats_france_id=resultats_france.id)
session.add(resultats_reg)
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)).scalar_one()
resultats_dpt = ResultatsDepartement(dpt_id=dpt_code,
resultats_region_id=resultats_reg.id)
session.add(resultats_dpt)
resultats_dpt = session.execute(select(ResultatsDepartement)
.filter_by(dpt_id=dpt_code)).scalar_one()
resultats_circo = ResultatsCirconscription(circo_id=circo_id,
resultats_departement_id=resultats_dpt.id)
session.add(resultats_circo)
resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_id)) \
.scalar_one()
resultats_circo.inscrits = int(row['Inscrits'])
resultats_circo.votants = int(row['Votants'])
resultats_circo.abstentions = int(row['Abstentions'])
resultats_circo.exprimes = int(row['Exprimés'])
resultats_circo.blancs = int(row['Blancs'])
resultats_circo.nuls = int(row['Nuls'])
for liste in session.execute(select(Liste)).scalars().all():
voix_liste_circo = session.execute(select(VoixListeCirconscription)
.filter_by(resultats_circonscription_id=resultats_circo.id,
liste_id=liste.id)) \
.scalar_one_or_none()
if not voix_liste_circo:
voix_liste_circo = VoixListeCirconscription(resultats_circonscription_id=resultats_circo.id,
liste_id=liste.id)
session.add(voix_liste_circo)
voix_liste_circo.voix = int(row[f"Voix {liste.id}"])
session.commit()
def importer_resultats_departement(engine: Engine, verbose: bool = False) -> None:
file = get_file("https://www.data.gouv.fr/fr/datasets/r/b77cc4da-644f-4323-b6f7-ae6fe9b33f86",
"resultats-europeennes-par-departement.csv")
with file.open('r') as f:
reader = DictReader(f, delimiter=';')
with Session(engine) as session:
for row in tqdm(reader, desc="Département", disable=not verbose):
dpt_code = row['Code département']
if dpt_code == "ZX":
dpt_code = "977"
resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \
.scalar_one_or_none()
if not resultats_dpt:
print(f"Département {dpt_code} non trouvé")
dpt = session.execute(select(Departement)
.filter_by(code_insee=f"{dpt_code.zfill(2)}")) \
.scalar_one()
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)) \
.scalar_one_or_none()
if not resultats_reg:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none()
if not resultats_france:
session.add(ResultatsFrance())
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_reg = ResultatsRegion(region_id=str(dpt.region_code),
resultats_france_id=resultats_france.id)
session.add(resultats_reg)
resultats_reg = session.execute(select(ResultatsRegion)
.filter_by(region_id=dpt.region_code)).scalar_one()
resultats_dpt = ResultatsDepartement(dpt_id=dpt_code,
resultats_region_id=resultats_reg.id)
session.add(resultats_dpt)
resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \
.scalar_one()
resultats_dpt.inscrits = int(row['Inscrits'])
resultats_dpt.votants = int(row['Votants'])
resultats_dpt.abstentions = int(row['Abstentions'])
resultats_dpt.exprimes = int(row['Exprimés'])
resultats_dpt.blancs = int(row['Blancs'])
resultats_dpt.nuls = int(row['Nuls'])
for liste in session.execute(select(Liste)).scalars().all():
voix_liste_dpt = session.execute(select(VoixListeDepartement)
.filter_by(resultats_departement_id=resultats_dpt.id,
liste_id=liste.id)) \
.scalar_one_or_none()
if not voix_liste_dpt:
voix_liste_dpt = VoixListeDepartement(resultats_departement_id=resultats_dpt.id,
liste_id=liste.id)
session.add(voix_liste_dpt)
voix_liste_dpt.voix = int(row[f"Voix {liste.id}"])
session.commit()
def importer_resultats_region(engine: Engine, verbose: bool = False) -> None:
file = get_file("https://www.data.gouv.fr/fr/datasets/r/7c3a854b-7344-4c68-b1f9-9d651b4ca823",
"resultats-europeennes-par-region.csv")
with file.open('r') as f:
reader = DictReader(f, delimiter=';')
with Session(engine) as session:
for row in tqdm(reader, desc="Région", disable=not verbose):
reg_code = row['Code région']
resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \
.scalar_one_or_none()
if not resultats_reg:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none()
if not resultats_france:
session.add(ResultatsFrance())
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_reg = ResultatsRegion(region_id=reg_code,
resultats_france_id=resultats_france.id)
session.add(resultats_reg)
session.commit()
resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=reg_code)) \
.scalar_one()
resultats_reg.inscrits = int(row['Inscrits'])
resultats_reg.votants = int(row['Votants'])
resultats_reg.abstentions = int(row['Abstentions'])
resultats_reg.exprimes = int(row['Exprimés'])
resultats_reg.blancs = int(row['Blancs'])
resultats_reg.nuls = int(row['Nuls'])
for liste in session.execute(select(Liste)).scalars().all():
voix_liste_reg = session.execute(select(VoixListeRegion)
.filter_by(resultats_region_id=resultats_reg.id,
liste_id=liste.id)) \
.scalar_one_or_none()
if not voix_liste_reg:
voix_liste_reg = VoixListeRegion(resultats_region_id=resultats_reg.id, liste_id=liste.id)
session.add(voix_liste_reg)
voix_liste_reg.voix = int(row[f"Voix {liste.id}"])
session.commit()
def calculer_resultats_france(engine: Engine, verbose: bool = False) -> None:
with Session(engine) as session:
resultats_france = session.execute(select(ResultatsFrance)).scalar_one()
resultats_france.inscrits = 0
resultats_france.votants = 0
resultats_france.abstentions = 0
resultats_france.exprimes = 0
resultats_france.blancs = 0
resultats_france.nuls = 0
for voix_liste_france in session.execute(
select(VoixListeFrance).filter_by(resultats_france_id=resultats_france.id)).scalars().all():
voix_liste_france.voix = 0 # Réinitialisation des voix
for resultats_dpt in session.execute(select(ResultatsDepartement)).scalars().all():
resultats_france.inscrits += resultats_dpt.inscrits
resultats_france.votants += resultats_dpt.votants
resultats_france.abstentions += resultats_dpt.abstentions
resultats_france.exprimes += resultats_dpt.exprimes
resultats_france.blancs += resultats_dpt.blancs
resultats_france.nuls += resultats_dpt.nuls
for voix_liste_dpt in session.execute(select(VoixListeDepartement)
.filter_by(resultats_departement_id=resultats_dpt.id)) \
.scalars().all():
if voix_liste_france := session.execute(
select(VoixListeFrance).filter_by(resultats_france_id=resultats_france.id,
liste_id=voix_liste_dpt.liste_id)).scalar_one_or_none():
voix_liste_france.voix += voix_liste_dpt.voix
else:
session.add(VoixListeFrance(resultats_france_id=resultats_france.id,
liste_id=voix_liste_dpt.liste_id, voix=voix_liste_dpt.voix))
session.commit()
def run(engine: Engine, verbose: bool = False) -> None:
importer_resultats_region(engine, verbose)
importer_resultats_departement(engine, verbose)
calculer_resultats_france(engine, verbose)
importer_resultats_circo(engine, verbose)
importer_resultats_commune(engine, verbose)
importer_resultats_bv(engine, verbose)