diff --git a/alembic/versions/a2c743418d84_initial.py b/alembic/versions/5bf5e6526891_initial.py similarity index 87% rename from alembic/versions/a2c743418d84_initial.py rename to alembic/versions/5bf5e6526891_initial.py index 0c4ce8b..b6ed75f 100644 --- a/alembic/versions/a2c743418d84_initial.py +++ b/alembic/versions/5bf5e6526891_initial.py @@ -1,8 +1,8 @@ """initial -Revision ID: a2c743418d84 +Revision ID: 5bf5e6526891 Revises: -Create Date: 2024-06-09 23:51:19.572272 +Create Date: 2024-06-14 23:04:55.436991 """ from typing import Sequence, Union @@ -12,7 +12,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision: str = 'a2c743418d84' +revision: str = '5bf5e6526891' down_revision: Union[str, None] = None branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -159,6 +159,20 @@ def upgrade() -> None: sa.ForeignKeyConstraint(['commune_code'], ['commune.code_insee'], ), sa.PrimaryKeyConstraint('id') ) + op.create_table('resultats2024_circonscription', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('circo_id', sa.String(length=6), nullable=False), + sa.Column('resultats_departement_id', sa.Integer(), nullable=False), + sa.Column('inscrits', sa.Integer(), nullable=False), + sa.Column('votants', sa.Integer(), nullable=False), + sa.Column('abstentions', sa.Integer(), nullable=False), + sa.Column('exprimes', sa.Integer(), nullable=False), + sa.Column('blancs', sa.Integer(), nullable=False), + sa.Column('nuls', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['circo_id'], ['circonscription.id'], ), + sa.ForeignKeyConstraint(['resultats_departement_id'], ['resultats2024_departement.id'], ), + sa.PrimaryKeyConstraint('id') + ) op.create_table('resultats2024_commune', sa.Column('id', sa.Integer(), nullable=False), sa.Column('commune_id', sa.String(length=5), nullable=False), @@ -185,6 +199,7 @@ def upgrade() -> None: sa.Column('id', sa.Integer(), nullable=False), sa.Column('bv_id', sa.String(length=16), nullable=False), sa.Column('resultats_commune_id', sa.Integer(), nullable=False), + sa.Column('resultats_circo_id', sa.Integer(), nullable=True), sa.Column('inscrits', sa.Integer(), nullable=False), sa.Column('votants', sa.Integer(), nullable=False), sa.Column('abstentions', sa.Integer(), nullable=False), @@ -192,9 +207,18 @@ def upgrade() -> None: sa.Column('blancs', sa.Integer(), nullable=False), sa.Column('nuls', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['bv_id'], ['bureau_vote.id'], ), + sa.ForeignKeyConstraint(['resultats_circo_id'], ['resultats2024_circonscription.id'], ), sa.ForeignKeyConstraint(['resultats_commune_id'], ['resultats2024_commune.id'], ), sa.PrimaryKeyConstraint('id') ) + op.create_table('voix2024_circonscription', + sa.Column('liste_id', sa.Integer(), nullable=False), + sa.Column('resultats_circonscription_id', sa.Integer(), nullable=False), + sa.Column('voix', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['liste_id'], ['liste2024.id'], ), + sa.ForeignKeyConstraint(['resultats_circonscription_id'], ['resultats2024_circonscription.id'], ), + sa.PrimaryKeyConstraint('liste_id', 'resultats_circonscription_id') + ) op.create_table('voix2024_commune', sa.Column('liste_id', sa.Integer(), nullable=False), sa.Column('resultats_commune_id', sa.Integer(), nullable=False), @@ -218,9 +242,11 @@ def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.drop_table('voix2024_bureau_vote') op.drop_table('voix2024_commune') + op.drop_table('voix2024_circonscription') op.drop_table('resultats2024_bureau_vote') op.drop_table('voix2024_departement') op.drop_table('resultats2024_commune') + op.drop_table('resultats2024_circonscription') op.drop_table('bureau_vote') op.drop_table('voix2024_region') op.drop_table('voix2024_france') diff --git a/nupes/models/__init__.py b/nupes/models/__init__.py index 862fb36..23f996d 100644 --- a/nupes/models/__init__.py +++ b/nupes/models/__init__.py @@ -1,3 +1,7 @@ from .base import Base from .geographie import Region, Departement, Commune, Circonscription, BureauVote -from .europeennes2024 import Bloc as Bloc2024, Nuance as Nuance2024, Liste as Liste2024, Candidat as Candidat2024 +from .europeennes2024 import Bloc as Bloc2024, Nuance as Nuance2024, Liste as Liste2024, Candidat as Candidat2024, \ + ResultatsFrance as ResultatsFrance2024, ResultatsRegion as ResultatsRegion2024, ResultatsDepartement as ResultatsDepartement2024, \ + ResultatsCommune as ResultatsCommune2024, ResultatsBureauVote as ResultatsBureauVote2024, \ + VoixListeFrance as VoixListeFrance2024, VoixListeRegion as VoixListeRegion2024, VoixListeDepartement as VoixListeDepartement2024, \ + VoixListeCommune as VoixListeCommune2024, VoixListeBureauVote as VoixListeBureauVote2024 diff --git a/nupes/models/europeennes2024.py b/nupes/models/europeennes2024.py index 9369106..bd0dd9d 100644 --- a/nupes/models/europeennes2024.py +++ b/nupes/models/europeennes2024.py @@ -45,10 +45,12 @@ class Liste(Base): resultats_nationaux: Mapped[List["VoixListeFrance"]] = relationship("VoixListeFrance", back_populates="liste") resultats_par_region: Mapped[List["VoixListeRegion"]] = relationship("VoixListeRegion", back_populates="liste") resultats_par_departement: Mapped[List["VoixListeDepartement"]] = relationship("VoixListeDepartement", - back_populates="liste") + back_populates="liste") + resultats_par_circonscription: Mapped[List["VoixListeCirconscription"]] = relationship( + "VoixListeCirconscription", back_populates="liste") resultats_par_commune: Mapped[List["VoixListeCommune"]] = relationship("VoixListeCommune", back_populates="liste") resultats_par_bureau_vote: Mapped[List["VoixListeBureauVote"]] = relationship("VoixListeBureauVote", - back_populates="liste") + back_populates="liste") class Candidat(Base): @@ -135,10 +137,33 @@ class ResultatsDepartement(Base): resultats_region = relationship(ResultatsRegion, back_populates="resultats_departements") resultats_communes: Mapped[List["ResultatsCommune"]] = relationship("ResultatsCommune", back_populates="resultats_departement") + resultats_circonscriptions: Mapped[List["ResultatsCirconscription"]] = relationship( + "ResultatsCirconscription", back_populates="resultats_departement") voix_listes: Mapped[List["VoixListeDepartement"]] = relationship("VoixListeDepartement", back_populates="resultats_departement") +class ResultatsCirconscription(Base): + __tablename__ = "resultats2024_circonscription" + + id: Mapped[int] = mapped_column(primary_key=True) + circo_id: Mapped[str] = mapped_column(ForeignKey("circonscription.id")) + resultats_departement_id: Mapped[int] = mapped_column(ForeignKey("resultats2024_departement.id")) + inscrits: Mapped[int] = mapped_column(Integer(), default=0) + votants: Mapped[int] = mapped_column(Integer(), default=0) + abstentions: Mapped[int] = mapped_column(Integer(), default=0) + exprimes: Mapped[int] = mapped_column(Integer(), default=0) + blancs: Mapped[int] = mapped_column(Integer(), default=0) + nuls: Mapped[int] = mapped_column(Integer(), default=0) + + circonscription = relationship("Circonscription", back_populates="resultats2024") + resultats_departement = relationship(ResultatsDepartement, back_populates="resultats_circonscriptions") + resultats_bureaux_vote: Mapped[List["ResultatsBureauVote"]] = relationship( + "ResultatsBureauVote", back_populates="resultats_circonscription") + voix_listes: Mapped[List["VoixListeCirconscription"]] = relationship("VoixListeCirconscription", + back_populates="resultats_circonscription") + + class ResultatsCommune(Base): __tablename__ = "resultats2024_commune" @@ -165,6 +190,7 @@ class ResultatsBureauVote(Base): id: Mapped[int] = mapped_column(primary_key=True) bv_id: Mapped[str] = mapped_column(ForeignKey("bureau_vote.id")) resultats_commune_id: Mapped[int] = mapped_column(ForeignKey("resultats2024_commune.id")) + resultats_circo_id: Mapped[int] = mapped_column(ForeignKey("resultats2024_circonscription.id"), nullable=True) inscrits: Mapped[int] = mapped_column(Integer(), default=0) votants: Mapped[int] = mapped_column(Integer(), default=0) abstentions: Mapped[int] = mapped_column(Integer(), default=0) @@ -174,6 +200,7 @@ class ResultatsBureauVote(Base): bureau_vote = relationship("BureauVote", back_populates="resultats2024") resultats_commune = relationship(ResultatsCommune, back_populates="resultats_bureaux_vote") + resultats_circonscription = relationship(ResultatsCirconscription, back_populates="resultats_bureaux_vote") voix_listes: Mapped[List["VoixListeBureauVote"]] = relationship("VoixListeBureauVote", back_populates="resultats_bureau_vote") @@ -212,6 +239,19 @@ class VoixListeDepartement(Base): back_populates="voix_listes") +class VoixListeCirconscription(Base): + __tablename__ = "voix2024_circonscription" + + liste_id: Mapped[int] = mapped_column(ForeignKey("liste2024.id"), primary_key=True) + resultats_circonscription_id: Mapped[int] = mapped_column(ForeignKey("resultats2024_circonscription.id"), + primary_key=True) + voix: Mapped[int] = mapped_column(Integer(), default=0) + + liste: Mapped[Liste] = relationship(Liste, back_populates="resultats_par_circonscription") + resultats_circonscription: Mapped[ResultatsCirconscription] = relationship(ResultatsCirconscription, + back_populates="voix_listes") + + class VoixListeCommune(Base): __tablename__ = "voix2024_commune" diff --git a/nupes/models/geographie.py b/nupes/models/geographie.py index 4db4000..8b92fd7 100644 --- a/nupes/models/geographie.py +++ b/nupes/models/geographie.py @@ -56,6 +56,8 @@ class Circonscription(Base): departement: Mapped[Departement] = relationship(Departement) bureaux_vote: Mapped[List["BureauVote"]] = relationship("BureauVote", back_populates="circonscription") + resultats2024 = relationship("ResultatsCirconscription", back_populates="circonscription") + class BureauVote(Base): __tablename__ = "bureau_vote" diff --git a/nupes/scripts/import_geographie.py b/nupes/scripts/import_geographie.py index 7665220..32399b9 100644 --- a/nupes/scripts/import_geographie.py +++ b/nupes/scripts/import_geographie.py @@ -4,6 +4,7 @@ from datetime import datetime import requests from sqlalchemy import Engine, select from sqlalchemy.orm import Session +from tqdm import tqdm from nupes.cache import get_file from nupes.models.geographie import BureauVote, Circonscription, Commune, Departement, Region @@ -21,7 +22,7 @@ def importer_regions(engine: Engine, verbose: bool = False) -> None: features = json.load(f)['features'] with Session(engine) as session: - for feature in features: + for feature in tqdm(features, desc="Régions", disable=not verbose): region_dict = feature['properties'] code_region = region_dict['reg_code'][0] nom_region = region_dict['reg_name'][0] @@ -49,7 +50,7 @@ def importer_departements(engine: Engine, verbose: bool = False) -> None: features = json.load(f)['features'] with Session(engine) as session: - for feature in features: + for feature in tqdm(features, desc="Départements", disable=not verbose): dpt_dict = feature['properties'] code_dpt = dpt_dict['dep_code'][0] nom_dpt = dpt_dict['dep_name'][0] @@ -78,7 +79,7 @@ def importer_communes(engine: Engine, verbose: bool = False) -> None: features = json.load(f)['features'] with Session(engine) as session: - for feature in features: + for feature in tqdm(features, desc="Communes", disable=not verbose): commune_dict = feature['properties'] code_commune = commune_dict['com_arm_code'][0] nom_commune = commune_dict['com_name'][0] @@ -96,40 +97,25 @@ def importer_communes(engine: Engine, verbose: bool = False) -> None: def importer_bureaux_vote(engine: Engine, verbose: bool = False) -> None: - etag = requests.get( - "https://public.opendatasoft.com/api/explore/v2.1/catalog/datasets" - "/elections-france-bureau-vote-2022?select=data_processed").json()['data_processed'] - file = get_file("https://public.opendatasoft.com/api/explore/v2.1/catalog/datasets" - "/elections-france-bureau-vote-2022/exports/geojson?lang=fr&timezone=Europe%2FParis", - "elections-france-bureau-vote-2022.geojson", etag) + file = get_file("https://files.data.gouv.fr/reu/contours-france-entiere-latest-v2.geojson", + "contours-france-entiere-latest-v2.geojson") with file.open('r') as f: features = json.load(f)['features'] with Session(engine) as session: - for feature in features: + for feature in tqdm(features, desc="Bureaux de vote", disable=not verbose): bv_dict = feature['properties'] - code_commune = bv_dict['com_code'] - if not code_commune: - print(feature) - continue - code_commune = code_commune.split('/')[0] - code_bv = bv_dict['code'] or "0" - code_circo = bv_dict['circonscription_code'] - bv_id = f"{code_commune}_{code_bv}" - bv_libelle = bv_dict['libelle'] or "Bureau unique" - dpt_code, numero_circo = code_circo.split('-') - numero_circo = int(numero_circo) - - if dpt_code == "987" or dpt_code == "988": - # Les communes de la Polynésie française et de Nouvelle-Calédonie ne sont pas disponibles, - # on les crée à la volée - if not session.execute(select(Commune).filter_by(code_insee=code_commune)).scalar_one_or_none(): - session.add(Commune(code_insee=code_commune, libelle=bv_dict['com_name'], departement_code=dpt_code, - geometry={})) + code_commune = bv_dict['id_bv'].split('_')[0] + code_bv = bv_dict['numeroBureauVote'] + dpt_code = bv_dict['codeDepartement'] + numero_circo = int(bv_dict['codeCirconscription'][len(dpt_code):]) + code_circo = f"{dpt_code}-{numero_circo:02d}" + bv_id = bv_dict['id_bv'].split()[0] + bv_libelle = f"Bureau {code_bv}" if not session.execute(select(Commune).filter_by(code_insee=code_commune)).scalar_one_or_none(): - print("Commune non trouvée avec le code", code_commune, "et le nom", bv_dict['com_name']) + print("Commune non trouvée avec le code", code_commune, "et le nom", bv_dict['nomCommune']) continue if not session.execute(select(Circonscription).filter_by(id=code_circo)).scalar_one_or_none(): @@ -140,43 +126,11 @@ def importer_bureaux_vote(engine: Engine, verbose: bool = False) -> None: bv.code_bureau = code_bv bv.circo_code = code_circo bv.libelle = bv_libelle - bv.adresse = bv_dict['adresse'] - else: - bv = BureauVote(id=bv_id, commune_code=code_commune, code_bureau=code_bv, circo_code=code_circo, - libelle=bv_libelle, adresse=bv_dict['adresse'], - geometry={}) - session.add(bv) - - session.commit() - - -def importer_contours_bureaux_vote(engine: Engine, verbose: bool = False) -> None: - file = get_file("https://www.data.gouv.fr/fr/datasets/r/f98165a7-7c37-4705-a181-bcfc943edc73", - "contours-bureaux-vote.geojson") - - with file.open('r') as f: - features = json.load(f)['features'] - - with Session(engine) as session: - for feature in features: - bv_id: str = feature['properties']['id_bv'] - com_code, bv_code = bv_id.split('_') - bv_code = bv_code.replace("-", " ").replace(".", " ").strip() - while len(bv_code) >= 2 and bv_code[0] == '0': - bv_code = bv_code[1:] - while " " in bv_code: - bv_code = bv_code.replace(" ", " ") - bv_id = f"{com_code}_{bv_code}" - - if bv := session.execute(select(BureauVote).filter_by(id=bv_id)).scalar_one_or_none(): bv.geometry = feature['geometry'] else: - results = session.execute(select(BureauVote).filter_by(commune_code=com_code)).scalars().all() - if len(results) == 1: - bv = results[0] - bv.geometry = feature['geometry'] - else: - print(f"Bureau de vote {bv_id} non trouvé") + bv = BureauVote(id=bv_id, commune_code=code_commune, code_bureau=code_bv, circo_code=code_circo, + libelle=bv_libelle, adresse="", geometry=feature['geometry']) + session.add(bv) session.commit() @@ -186,4 +140,3 @@ def run(engine: Engine, verbose: bool = False) -> None: importer_departements(engine, verbose) importer_communes(engine, verbose) importer_bureaux_vote(engine, verbose) - importer_contours_bureaux_vote(engine, verbose) diff --git a/nupes/scripts/import_resultats_2024.py b/nupes/scripts/import_resultats_2024.py index 0a24685..12ed7e3 100644 --- a/nupes/scripts/import_resultats_2024.py +++ b/nupes/scripts/import_resultats_2024.py @@ -8,24 +8,26 @@ from nupes.cache import get_file from nupes.models import BureauVote, Departement from nupes.models.europeennes2024 import ResultatsBureauVote, Liste, ResultatsCommune, \ ResultatsDepartement, ResultatsRegion, ResultatsFrance, \ - VoixListeBureauVote, VoixListeCommune, VoixListeDepartement, VoixListeRegion, VoixListeFrance + VoixListeBureauVote, VoixListeCommune, VoixListeDepartement, VoixListeRegion, VoixListeFrance, \ + ResultatsCirconscription, VoixListeCirconscription def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: - file = get_file("https://www.data.gouv.fr/fr/datasets/r/937bb638-a487-40cd-9a0b-610d539a4207", - "resultats-temporaires-par-bureau-de-vote.csv") + file = get_file("https://www.data.gouv.fr/fr/datasets/r/cc1883d9-1265-4365-b754-fb6aef22d82e", + "resultats-europeennes-par-bureau-de-vote.csv") with file.open('r') as f: - next(f) # On saute la première ligne - reader = DictReader(f) + reader = DictReader(f, delimiter=';') with Session(engine) as session: for row in tqdm(reader, desc="Bureau de vote", disable=not verbose): com_code = str(row['Code commune']).zfill(5) bv_code = row['Code BV'] - bv_id = f"{com_code}_{bv_code}" - if not session.execute(select(BureauVote).filter_by(id=bv_id)).scalar_one_or_none(): - print(f"Bureau de vote {bv_id} non trouvé") + if bv := session.execute(select(BureauVote).filter_by(commune_code=com_code, code_bureau=bv_code)) \ + .scalar_one_or_none(): + bv_id = bv.id + else: + print(f"Bureau de vote {com_code}_{bv_code} non trouvé") continue resultats_bv = session.execute(select(ResultatsBureauVote).filter_by(bv_id=bv_id)).scalar_one_or_none() @@ -91,140 +93,293 @@ def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: session.commit() -def calculer_resultats_commune(engine: Engine, verbose: bool = False) -> None: - with Session(engine) as session: - for resultat_commune in tqdm(session.execute(select(ResultatsCommune)).scalars(), desc="Communes", - disable=not verbose): - resultats_bv = session.execute(select(ResultatsBureauVote) - .filter_by(resultats_commune_id=resultat_commune.id)).scalars().all() - resultat_commune.inscrits = sum(r.inscrits for r in resultats_bv) - resultat_commune.votants = sum(r.votants for r in resultats_bv) - resultat_commune.abstentions = sum(r.abstentions for r in resultats_bv) - resultat_commune.exprimes = sum(r.exprimes for r in resultats_bv) - resultat_commune.blancs = sum(r.blancs for r in resultats_bv) - resultat_commune.nuls = sum(r.nuls for r in resultats_bv) +def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: + file = get_file("https://www.data.gouv.fr/fr/datasets/r/6a782ef9-8ad6-4e66-832d-338b1041a42d", + "resultats-europeennes-par-commune.csv") - for liste in session.execute(select(Liste)).scalars().all(): - voix_liste = session.execute(select(VoixListeBureauVote).filter_by(liste_id=liste.id) - .join(ResultatsBureauVote) - .filter_by(resultats_commune_id=resultat_commune.id)) \ - .scalars().all() - voix = sum(v.voix for v in voix_liste) - if voix_liste_commune := session.execute( - select(VoixListeCommune).filter_by(resultats_commune_id=resultat_commune.id, - liste_id=liste.id)).scalar_one_or_none(): - voix_liste_commune.voix = voix - else: - voix_liste_commune = VoixListeCommune(resultats_commune_id=resultat_commune.id, - liste_id=liste.id, - voix=voix) - session.add(voix_liste_commune) + with file.open('r') as f: + reader = DictReader(f, delimiter=';') + with Session(engine) as session: + for row in tqdm(reader, desc="Commune", disable=not verbose): + com_code = str(row['Code commune']).zfill(5) - session.commit() + resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ + .scalar_one_or_none() + if not resultats_commune: + resultats_dpt = session.execute(select(ResultatsDepartement) + .filter_by(dpt_id=row['Code département'])) \ + .scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ + .scalar_one() + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFrance()) + resultats_france = session.execute(select(ResultatsFrance)).scalar_one() + + resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'], + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartement) + .filter_by(dpt_id=row['Code département'])).scalar_one() + + resultats_commune = ResultatsCommune(commune_id=com_code, resultats_dpt_id=resultats_dpt.id) + session.add(resultats_commune) + resultats_commune = session.execute(select(ResultatsCommune).filter_by(commune_id=com_code)) \ + .scalar_one() + + resultats_commune.inscrits = int(row['Inscrits']) + resultats_commune.votants = int(row['Votants']) + resultats_commune.abstentions = int(row['Abstentions']) + resultats_commune.exprimes = int(row['Exprimés']) + resultats_commune.blancs = int(row['Blancs']) + resultats_commune.nuls = int(row['Nuls']) + + for liste in session.execute(select(Liste)).scalars().all(): + voix_liste_com = session.execute(select(VoixListeCommune) + .filter_by(resultats_commune_id=resultats_commune.id, + liste_id=liste.id)) \ + .scalar_one_or_none() + if not voix_liste_com: + voix_liste_com = VoixListeCommune(resultats_commune_id=resultats_commune.id, liste_id=liste.id) + session.add(voix_liste_com) + + voix_liste_com.voix = int(row[f"Voix {liste.id}"]) + + session.commit() -def calculer_resultats_circo(engine: Engine, verbose: bool = False) -> None: - pass +def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None: + file = get_file("https://www.data.gouv.fr/fr/datasets/r/ee37cbef-3d2a-4efe-a395-530b85a63028", + "resultats-europeennes-par-circonscription.csv") + + with file.open('r') as f: + reader = DictReader(f, delimiter=';') + with Session(engine) as session: + for row in tqdm(reader, desc="Circonscription", disable=not verbose): + circo_code = row['Code circonscription législative'] + dpt_code, circo_code = circo_code[:] + + resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_code)) \ + .scalar_one_or_none() + if not resultats_circo: + resultats_dpt = session.execute(select(ResultatsDepartement) + .filter_by(dpt_id=row['Code département'])) \ + .scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ + .scalar_one() + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFrance()) + resultats_france = session.execute(select(ResultatsFrance)).scalar_one() + + resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartement(dpt_id=row['Code département'], + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartement) + .filter_by(dpt_id=row['Code département'])).scalar_one() + + resultats_circo = ResultatsCirconscription(circo_id=circo_code, + resultats_departement_id=resultats_dpt.id) + session.add(resultats_circo) + resultats_circo = session.execute(select(ResultatsCirconscription).filter_by(circo_id=circo_code)) \ + .scalar_one() + + resultats_circo.inscrits = int(row['Inscrits']) + resultats_circo.votants = int(row['Votants']) + resultats_circo.abstentions = int(row['Abstentions']) + resultats_circo.exprimes = int(row['Exprimés']) + resultats_circo.blancs = int(row['Blancs']) + resultats_circo.nuls = int(row['Nuls']) + + for liste in session.execute(select(Liste)).scalars().all(): + voix_liste_circo = session.execute(select(VoixListeCirconscription) + .filter_by(resultats_circonscription_id=resultats_circo.id, + liste_id=liste.id)) \ + .scalar_one_or_none() + if not voix_liste_circo: + voix_liste_circo = VoixListeCirconscription(resultats_circonscription_id=resultats_circo.id, + liste_id=liste.id) + session.add(voix_liste_circo) + + voix_liste_circo.voix = int(row[f"Voix {liste.id}"]) + + session.commit() -def calculer_resultats_departement(engine: Engine, verbose: bool = False) -> None: - with Session(engine) as session: - for resultat_dpt in tqdm(session.execute(select(ResultatsDepartement)).scalars(), desc="Départements", - disable=not verbose): - resultats_commune = session.execute(select(ResultatsCommune) - .filter_by(resultats_dpt_id=resultat_dpt.id)).scalars().all() - resultat_dpt.inscrits = sum(r.inscrits for r in resultats_commune) - resultat_dpt.votants = sum(r.votants for r in resultats_commune) - resultat_dpt.abstentions = sum(r.abstentions for r in resultats_commune) - resultat_dpt.exprimes = sum(r.exprimes for r in resultats_commune) - resultat_dpt.blancs = sum(r.blancs for r in resultats_commune) - resultat_dpt.nuls = sum(r.nuls for r in resultats_commune) +def importer_resultats_departement(engine: Engine, verbose: bool = False) -> None: + file = get_file("https://www.data.gouv.fr/fr/datasets/r/b77cc4da-644f-4323-b6f7-ae6fe9b33f86", + "resultats-europeennes-par-circonscription.csv") - for liste in session.execute(select(Liste)).scalars().all(): - voix_liste = session.execute(select(VoixListeCommune).filter_by(liste_id=liste.id) - .join(ResultatsCommune) - .filter_by(resultats_dpt_id=resultat_dpt.id)) \ - .scalars().all() - voix = sum(v.voix for v in voix_liste) - if voix_liste_dpt := session.execute(select(VoixListeDepartement) - .filter_by(resultats_departement_id=resultat_dpt.id, + with file.open('r') as f: + reader = DictReader(f, delimiter=';') + with Session(engine) as session: + for row in tqdm(reader, desc="Département", disable=not verbose): + dpt_code = row['Code département'] + + resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ + .scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ + .scalar_one() + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFrance()) + resultats_france = session.execute(select(ResultatsFrance)).scalar_one() + + resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegion) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartement(dpt_id=dpt_code, + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartement).filter_by(dpt_id=dpt_code)) \ + .scalar_one() + + resultats_dpt.inscrits = int(row['Inscrits']) + resultats_dpt.votants = int(row['Votants']) + resultats_dpt.abstentions = int(row['Abstentions']) + resultats_dpt.exprimes = int(row['Exprimés']) + resultats_dpt.blancs = int(row['Blancs']) + resultats_dpt.nuls = int(row['Nuls']) + + for liste in session.execute(select(Liste)).scalars().all(): + voix_liste_dpt = session.execute(select(VoixListeDepartement) + .filter_by(resultats_departement_id=resultats_dpt.id, + liste_id=liste.id)) \ + .scalar_one_or_none() + if not voix_liste_dpt: + voix_liste_dpt = VoixListeDepartement(resultats_departement_id=resultats_dpt.id, + liste_id=liste.id) + session.add(voix_liste_dpt) + + voix_liste_dpt.voix = int(row[f"Voix {liste.id}"]) + + session.commit() + + +def importer_resultats_region(engine: Engine, verbose: bool = False) -> None: + file = get_file("https://www.data.gouv.fr/fr/datasets/r/6a782ef9-8ad6-4e66-832d-338b1041a42d", + "resultats-europeennes-par-commune.csv") + + with file.open('r') as f: + reader = DictReader(f, delimiter=';') + with Session(engine) as session: + for row in tqdm(reader, desc="Région", disable=not verbose): + dpt_code = row['Code département'] + + resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=dpt_code)) \ + .scalar_one_or_none() + if not resultats_reg: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{row['Code département'].zfill(2)}")) \ + .scalar_one() + resultats_france = session.execute(select(ResultatsFrance)).scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFrance()) + resultats_france = session.execute(select(ResultatsFrance)).scalar_one() + + resultats_reg = ResultatsRegion(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegion).filter_by(region_id=dpt_code)) \ + .scalar_one() + + resultats_reg.inscrits = int(row['Inscrits']) + resultats_reg.votants = int(row['Votants']) + resultats_reg.abstentions = int(row['Abstentions']) + resultats_reg.exprimes = int(row['Exprimés']) + resultats_reg.blancs = int(row['Blancs']) + resultats_reg.nuls = int(row['Nuls']) + + for liste in session.execute(select(Liste)).scalars().all(): + voix_liste_reg = session.execute(select(VoixListeRegion) + .filter_by(resultats_region_id=resultats_reg.id, liste_id=liste.id)) \ - .scalar_one_or_none(): - voix_liste_dpt.voix = voix - else: - voix_liste_dpt = VoixListeDepartement(resultats_departement_id=resultat_dpt.id, - liste_id=liste.id, - voix=voix) - session.add(voix_liste_dpt) + .scalar_one_or_none() + if not voix_liste_reg: + voix_liste_reg = VoixListeRegion(resultats_region_id=resultats_reg.id, liste_id=liste.id) + session.add(voix_liste_reg) - session.commit() + voix_liste_reg.voix = int(row[f"Voix {liste.id}"]) + + session.commit() -def calculer_resultats_region(engine: Engine, verbose: bool = False) -> None: +def calculer_resultats_france(engine: Engine, verbose: bool = False) -> None: with Session(engine) as session: - for resultat_reg in tqdm(session.execute(select(ResultatsRegion)).scalars(), desc="Régions", - disable=not verbose): - resultats_dpt = session.execute(select(ResultatsDepartement) - .filter_by(resultats_region_id=resultat_reg.id)).scalars().all() - resultat_reg.inscrits = sum(r.inscrits for r in resultats_dpt) - resultat_reg.votants = sum(r.votants for r in resultats_dpt) - resultat_reg.abstentions = sum(r.abstentions for r in resultats_dpt) - resultat_reg.exprimes = sum(r.exprimes for r in resultats_dpt) - resultat_reg.blancs = sum(r.blancs for r in resultats_dpt) - resultat_reg.nuls = sum(r.nuls for r in resultats_dpt) + resultats_france = session.execute(select(ResultatsFrance)).scalar_one() - for liste in session.execute(select(Liste)).scalars().all(): - voix_liste = session.execute(select(VoixListeDepartement).filter_by(liste_id=liste.id) - .join(ResultatsDepartement) - .filter_by(resultats_region_id=resultat_reg.id)) \ - .scalars().all() - voix = sum(v.voix for v in voix_liste) - if voix_liste_reg := session.execute(select(VoixListeRegion) - .filter_by(resultats_region_id=resultat_reg.id, - liste_id=liste.id)).scalar_one_or_none(): - voix_liste_reg.voix = voix - else: - voix_liste_reg = VoixListeRegion(resultats_region_id=resultat_reg.id, - liste_id=liste.id, - voix=voix) - session.add(voix_liste_reg) + resultats_france.inscrits = 0 + resultats_france.votants = 0 + resultats_france.abstentions = 0 + resultats_france.exprimes = 0 + resultats_france.blancs = 0 + resultats_france.nuls = 0 - session.commit() + for voix_liste_france in session.execute( + select(VoixListeFrance).filter_by(resultats_france_id=resultats_france.id)).scalars().all(): + voix_liste_france.voix = 0 # Réinitialisation des voix + for resultats_reg in session.execute(select(ResultatsRegion)).scalars().all(): + resultats_france.inscrits += resultats_reg.inscrits + resultats_france.votants += resultats_reg.votants + resultats_france.abstentions += resultats_reg.abstentions + resultats_france.exprimes += resultats_reg.exprimes + resultats_france.blancs += resultats_reg.blancs + resultats_france.nuls += resultats_reg.nuls -def calculer_resultats_national(engine: Engine, verbose: bool = False) -> None: - with Session(engine) as session: - resultat_france = session.execute(select(ResultatsFrance)).scalar_one() - resultats_reg = session.execute(select(ResultatsRegion)).scalars().all() - resultat_france.inscrits = sum(r.inscrits for r in resultats_reg) - resultat_france.votants = sum(r.votants for r in resultats_reg) - resultat_france.abstentions = sum(r.abstentions for r in resultats_reg) - resultat_france.exprimes = sum(r.exprimes for r in resultats_reg) - resultat_france.blancs = sum(r.blancs for r in resultats_reg) - resultat_france.nuls = sum(r.nuls for r in resultats_reg) + for voix_liste_reg in session.execute(select(VoixListeRegion) + .filter_by(resultats_region_id=resultats_reg.id)).scalars().all(): + voix_liste_france = session.execute(select(VoixListeFrance) + .filter_by(resultats_france_id=resultats_france.id, + liste_id=voix_liste_reg.liste_id)) \ + .scalar_one_or_none() + if not voix_liste_france: + voix_liste_france = VoixListeFrance(resultats_france_id=resultats_france.id, + liste_id=voix_liste_reg.liste_id) + session.add(voix_liste_france) - for liste in session.execute(select(Liste)).scalars().all(): - voix_liste = session.execute(select(VoixListeRegion).filter_by(liste_id=liste.id).join(ResultatsRegion) - .filter_by(resultats_france_id=resultat_france.id)) \ - .scalars().all() - voix = sum(v.voix for v in voix_liste) - if voix_liste_france := session.execute(select(VoixListeFrance) - .filter_by(resultats_france_id=resultat_france.id, - liste_id=liste.id)).scalar_one_or_none(): - voix_liste_france.voix = voix - else: - voix_liste_france = VoixListeFrance(resultats_france_id=resultat_france.id, - liste_id=liste.id, - voix=voix) - session.add(voix_liste_france) + voix_liste_france.voix += voix_liste_reg.voix session.commit() def run(engine: Engine, verbose: bool = False) -> None: importer_resultats_bv(engine, verbose) - calculer_resultats_commune(engine, verbose) - # calculer_resultats_circo(engine, verbose) - calculer_resultats_departement(engine, verbose) - calculer_resultats_region(engine, verbose) - calculer_resultats_national(engine, verbose) + importer_resultats_commune(engine, verbose) + importer_resultats_circo(engine, verbose) + importer_resultats_departement(engine, verbose) + importer_resultats_region(engine, verbose) + calculer_resultats_france(engine, verbose)