diff --git a/alembic/versions/42943ee95bdb_ajout_modèles_législatives.py b/alembic/versions/99948a33112b_ajout_modèles_législatives.py similarity index 96% rename from alembic/versions/42943ee95bdb_ajout_modèles_législatives.py rename to alembic/versions/99948a33112b_ajout_modèles_législatives.py index dbbb790..83eddcf 100644 --- a/alembic/versions/42943ee95bdb_ajout_modèles_législatives.py +++ b/alembic/versions/99948a33112b_ajout_modèles_législatives.py @@ -1,8 +1,8 @@ """ajout modèles législatives -Revision ID: 42943ee95bdb +Revision ID: 99948a33112b Revises: c0443e979a90 -Create Date: 2024-06-19 22:03:16.934581 +Create Date: 2024-06-20 12:47:52.732224 """ from typing import Sequence, Union @@ -12,7 +12,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision: str = '42943ee95bdb' +revision: str = '99948a33112b' down_revision: Union[str, None] = 'c0443e979a90' branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -100,7 +100,7 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['nuance_id'], ['legislatives_2022_nuance.code'], ), sa.ForeignKeyConstraint(['resultats_france_id'], ['legislatives_2022_resultats_france.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_france_id') + sa.PrimaryKeyConstraint('id') ) op.create_table('legislatives_2022_voix_region', sa.Column('id', sa.Integer(), nullable=False), @@ -110,13 +110,13 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['nuance_id'], ['legislatives_2022_nuance.code'], ), sa.ForeignKeyConstraint(['resultats_region_id'], ['legislatives_2022_resultats_region.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_region_id') + sa.PrimaryKeyConstraint('id') ) op.create_table('legislatives_2022_candidat', sa.Column('id', sa.Integer(), nullable=False), sa.Column('circonscription_id', sa.String(length=6), nullable=False), sa.Column('numero', sa.Integer(), nullable=False), - sa.Column('nuance_id', sa.String(length=8), nullable=True), + sa.Column('nuance_id', sa.String(length=8), nullable=False), sa.Column('bloc_id', sa.Integer(), nullable=False), sa.Column('nom', sa.String(length=256), nullable=False), sa.Column('prenom', sa.String(length=256), nullable=False), @@ -182,7 +182,7 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['nuance_id'], ['legislatives_2022_nuance.code'], ), sa.ForeignKeyConstraint(['resultats_departement_id'], ['legislatives_2022_resultats_departement.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_departement_id') + sa.PrimaryKeyConstraint('id') ) op.create_table('legislatives_2022_resultats_bureau_vote', sa.Column('id', sa.Integer(), nullable=False), @@ -214,7 +214,7 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['candidat_id'], ['legislatives_2022_candidat.id'], ), sa.ForeignKeyConstraint(['resultats_circonscription_id'], ['legislatives_2022_resultats_circonscription.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_circonscription_id') + sa.PrimaryKeyConstraint('id') ) op.create_table('legislatives_2022_voix_commune', sa.Column('id', sa.Integer(), nullable=False), @@ -224,7 +224,7 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['nuance_id'], ['legislatives_2022_nuance.code'], ), sa.ForeignKeyConstraint(['resultats_commune_id'], ['legislatives_2022_resultats_commune.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_commune_id') + sa.PrimaryKeyConstraint('id') ) op.create_table('legislatives_2022_voix_bureau_vote', sa.Column('id', sa.Integer(), nullable=False), @@ -234,7 +234,7 @@ def upgrade() -> None: sa.Column('voix_t2', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['candidat_id'], ['legislatives_2022_candidat.id'], ), sa.ForeignKeyConstraint(['resultats_bureau_vote_id'], ['legislatives_2022_resultats_bureau_vote.id'], ), - sa.PrimaryKeyConstraint('id', 'resultats_bureau_vote_id') + sa.PrimaryKeyConstraint('id') ) # ### end Alembic commands ### diff --git a/nupes/models/legislatives2022.py b/nupes/models/legislatives2022.py index d108a42..3db0192 100644 --- a/nupes/models/legislatives2022.py +++ b/nupes/models/legislatives2022.py @@ -55,7 +55,7 @@ class CandidatLegislatives2022(Base): id: Mapped[int] = mapped_column(primary_key=True) circonscription_id: Mapped[str] = mapped_column(ForeignKey("circonscription.id")) numero: Mapped[int] = mapped_column(Integer()) - nuance_id = mapped_column(ForeignKey("legislatives_2022_nuance.code")) + nuance_id: Mapped[str] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) bloc_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_bloc.id")) nom: Mapped[str] = mapped_column(String(256)) prenom: Mapped[str] = mapped_column(String(256)) @@ -246,9 +246,8 @@ class VoixFranceLegislatives2022(Base): __tablename__ = "legislatives_2022_voix_france" id: Mapped[int] = mapped_column(primary_key=True) - nuance_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) - resultats_france_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_france.id"), - primary_key=True) + nuance_id: Mapped[str] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) + resultats_france_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_france.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) @@ -262,9 +261,8 @@ class VoixRegionLegislatives2022(Base): __tablename__ = "legislatives_2022_voix_region" id: Mapped[int] = mapped_column(primary_key=True) - nuance_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) - resultats_region_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_region.id"), - primary_key=True) + nuance_id: Mapped[str] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) + resultats_region_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_region.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) @@ -278,9 +276,8 @@ class VoixDepartementLegislatives2022(Base): __tablename__ = "legislatives_2022_voix_departement" id: Mapped[int] = mapped_column(primary_key=True) - nuance_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) - resultats_departement_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_departement.id"), - primary_key=True) + nuance_id: Mapped[str] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) + resultats_departement_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_departement.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) @@ -296,7 +293,7 @@ class VoixCirconscriptionLegislatives2022(Base): id: Mapped[int] = mapped_column(primary_key=True) candidat_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_candidat.id")) resultats_circonscription_id: Mapped[int] = mapped_column( - ForeignKey("legislatives_2022_resultats_circonscription.id"), primary_key=True) + ForeignKey("legislatives_2022_resultats_circonscription.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) @@ -311,8 +308,7 @@ class VoixCommuneLegislatives2022(Base): id: Mapped[int] = mapped_column(primary_key=True) nuance_id: Mapped[str] = mapped_column(ForeignKey("legislatives_2022_nuance.code")) - resultats_commune_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_commune.id"), - primary_key=True) + resultats_commune_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_commune.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) @@ -327,8 +323,7 @@ class VoixBureauVoteLegislatives2022(Base): id: Mapped[int] = mapped_column(primary_key=True) candidat_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_candidat.id")) - resultats_bureau_vote_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_bureau_vote.id"), - primary_key=True) + resultats_bureau_vote_id: Mapped[int] = mapped_column(ForeignKey("legislatives_2022_resultats_bureau_vote.id")) voix_t1: Mapped[int] = mapped_column(Integer(), default=0) voix_t2: Mapped[int] = mapped_column(Integer(), default=0) diff --git a/nupes/scripts/europeennes2024/import_resultats.py b/nupes/scripts/europeennes2024/import_resultats.py index 9021064..fec5046 100644 --- a/nupes/scripts/europeennes2024/import_resultats.py +++ b/nupes/scripts/europeennes2024/import_resultats.py @@ -11,7 +11,7 @@ from nupes.models.europeennes2024 import * def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/cc1883d9-1265-4365-b754-fb6aef22d82e", - "resultats-europeennes-par-bureau-de-vote.csv") + "resultats-europeennes-2024-par-bureau-de-vote.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') @@ -20,7 +20,8 @@ def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: com_code = str(row['Code commune']).zfill(5) bv_code = row['Code BV'] - if com_code.startswith("987"): # Les communes de Polynésie française ne sont pas importées + if com_code.startswith("987") or com_code.startswith("988"): + # Les communes de Polynésie française ne sont pas importées continue if com_code == "60694": @@ -111,7 +112,7 @@ def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/6a782ef9-8ad6-4e66-832d-338b1041a42d", - "resultats-europeennes-par-commune.csv") + "resultats-europeennes-2024-par-commune.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') @@ -123,7 +124,8 @@ def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: if dpt_code == "ZX": # Saint-Martin/Saint-Barthélémy dpt_code = "977" - if dpt_code == "987": # Les communes de Polynésie française ne sont pas importées + if dpt_code == "977" or dpt_code == "987" or dpt_code == "988": + # Les communes de Polynésie française ne sont pas importées continue if com_code == "60694": @@ -195,7 +197,7 @@ def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/ee37cbef-3d2a-4efe-a395-530b85a63028", - "resultats-europeennes-par-circonscription.csv") + "resultats-europeennes-2024-par-circonscription.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') @@ -268,7 +270,7 @@ def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None: def importer_resultats_departement(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/b77cc4da-644f-4323-b6f7-ae6fe9b33f86", - "resultats-europeennes-par-departement.csv") + "resultats-europeennes-2024-par-departement.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') @@ -330,7 +332,7 @@ def importer_resultats_departement(engine: Engine, verbose: bool = False) -> Non def importer_resultats_region(engine: Engine, verbose: bool = False) -> None: file = get_file("https://www.data.gouv.fr/fr/datasets/r/7c3a854b-7344-4c68-b1f9-9d651b4ca823", - "resultats-europeennes-par-region.csv") + "resultats-europeennes-2024-par-region.csv") with file.open('r') as f: reader = DictReader(f, delimiter=';') diff --git a/nupes/scripts/legislatives2022/__init__.py b/nupes/scripts/legislatives2022/__init__.py index 9bc38a4..bbac4a1 100644 --- a/nupes/scripts/legislatives2022/__init__.py +++ b/nupes/scripts/legislatives2022/__init__.py @@ -1 +1 @@ -from . import import_candidats +from . import import_candidats, import_resultats diff --git a/nupes/scripts/legislatives2022/import_resultats.py b/nupes/scripts/legislatives2022/import_resultats.py new file mode 100644 index 0000000..fbd4efc --- /dev/null +++ b/nupes/scripts/legislatives2022/import_resultats.py @@ -0,0 +1,665 @@ +import csv + +from sqlalchemy import Engine, select +from sqlalchemy.orm import Session +from tqdm import tqdm + +from nupes.cache import get_file +from nupes.models import BureauVote +from nupes.models.legislatives2022 import * + + +def importer_resultats_bv(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/a1f73b85-8194-44f4-a2b7-c343edb47d32"), + (2, "https://www.data.gouv.fr/fr/datasets/r/96ffddda-59b4-41b8-a6a3-dfe1adb7fa36")] + + for tour, file_url in tours: + file = get_file("https://www.data.gouv.fr/fr/datasets/r/a1f73b85-8194-44f4-a2b7-c343edb47d32", + "resultats-legislatives-2022-t1-par-bureau-de-vote.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + with Session(engine) as session: + for row in tqdm(reader, desc=f"Bureau de vote tour {tour}", disable=not verbose): + dpt_code = row[0] + + match dpt_code: + case "ZA": + dpt_code = "971" + case "ZB": + dpt_code = "972" + case "ZC": + dpt_code = "973" + case "ZD": + dpt_code = "974" + case "ZS": + dpt_code = "975" + case "ZM": + dpt_code = "976" + case "ZX": + dpt_code = "977" + case "ZW": + dpt_code = "986" + case "ZP": + dpt_code = "987" + case "ZN": + dpt_code = "988" + + if dpt_code == "987" or dpt_code == "988": + # Les communes de Polynésie française ne sont pas importées + continue + + com_code = str(dpt_code + row[4][(1 if len(dpt_code) == 3 else 0):]).zfill(5) + + if com_code == "98601": + # 3 royaumes à Wallis-et-Futuna, mais un seul résultat de commune + com_code = "98611" + + if com_code.startswith("987"): # Les communes de Polynésie française ne sont pas importées + continue + + bv_code = row[6] + + if bv := session.execute(select(BureauVote).filter_by(commune_code=com_code, code_bureau=bv_code)) \ + .scalar_one_or_none(): + bv_id = bv.id + else: + print(f"Bureau de vote {com_code}_{bv_code} non trouvé") + continue + + circo_code = row[2] + circo_id = f"{dpt_code.zfill(2)}-{circo_code.zfill(2)}" + + resultats_bv = session.execute(select(ResultatsBureauVoteLegislatives2022) + .filter_by(bv_id=bv_id)).scalar_one_or_none() + if not resultats_bv: + resultats_commune = session.execute(select(ResultatsCommuneLegislatives2022) + .filter_by(commune_id=com_code)).scalar_one_or_none() + if not resultats_commune: + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)) \ + .scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement).filter_by(code_insee=dpt_code)) \ + .scalar_one() + resultats_reg = session.execute( + select(ResultatsRegionLegislatives2022).filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one() + + resultats_reg = ResultatsRegionLegislatives2022( + region_id=str(dpt.region_code), resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartementLegislatives2022( + dpt_id=dpt_code, resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one() + + resultats_commune = ResultatsCommuneLegislatives2022(commune_id=com_code, + resultats_dpt_id=resultats_dpt.id) + session.add(resultats_commune) + resultats_commune = session.execute(select(ResultatsCommuneLegislatives2022) + .filter_by(commune_id=com_code)).scalar_one() + + resultats_circo = session.execute(select(ResultatsCirconscriptionLegislatives2022) + .filter_by(circo_id=circo_id)).scalar_one_or_none() + + resultats_bv = ResultatsBureauVoteLegislatives2022(bv_id=bv_id, + resultats_commune_id=resultats_commune.id, + resultats_circo_id=resultats_circo.id) + session.add(resultats_bv) + resultats_bv = session.execute(select(ResultatsBureauVoteLegislatives2022) + .filter_by(bv_id=bv_id)).scalar_one() + + if tour == 1: + resultats_bv.inscrits_t1 = int(row[5]) + resultats_bv.votants_t1 = int(row[8]) + resultats_bv.abstentions_t1 = int(row[6]) + resultats_bv.exprimes_t1 = int(row[16]) + resultats_bv.blancs_t1 = int(row[10]) + resultats_bv.nuls_t1 = int(row[13]) + elif tour == 2: + resultats_bv.inscrits_t2 = int(row[5]) + resultats_bv.votants_t2 = int(row[8]) + resultats_bv.abstentions_t2 = int(row[6]) + resultats_bv.exprimes_t2 = int(row[16]) + resultats_bv.blancs_t2 = int(row[10]) + resultats_bv.nuls_t2 = int(row[13]) + + row_voix = row[19:] + + for i in range(len(row_voix) // 9): + bloc_voix = row_voix[i * 9:(i + 1) * 9] + numero = bloc_voix[0] + voix = int(bloc_voix[5]) + + candidat = session.execute(select(CandidatLegislatives2022) + .filter_by(circonscription_id=circo_id, numero=numero)) \ + .scalar_one_or_none() + + voix_candidat_bv = session.execute(select(VoixBureauVoteLegislatives2022) + .filter_by(resultats_bureau_vote_id=resultats_bv.id, + candidat_id=candidat.id)) \ + .scalar_one_or_none() + if not voix_candidat_bv: + voix_candidat_bv = VoixBureauVoteLegislatives2022( + resultats_bureau_vote_id=resultats_bv.id, candidat_id=candidat.id) + session.add(voix_candidat_bv) + + if tour == 1: + voix_candidat_bv.voix_t1 = voix + elif tour == 2: + voix_candidat_bv.voix_t2 = voix + + session.commit() + + +def importer_resultats_commune(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/a9a82bcc-304e-491f-a4f0-c06575113745"), + (2, "https://www.data.gouv.fr/fr/datasets/r/e79b2e51-0841-4266-b626-57cb271c7a35")] + + with Session(engine) as session: + for resultats_commune in session.execute(select(ResultatsCommuneLegislatives2022)).scalars().all(): + resultats_commune.inscrits_t1 = 0 + resultats_commune.votants_t1 = 0 + resultats_commune.abstentions_t1 = 0 + resultats_commune.exprimes_t1 = 0 + resultats_commune.blancs_t1 = 0 + resultats_commune.nuls_t1 = 0 + resultats_commune.inscrits_t2 = 0 + resultats_commune.votants_t2 = 0 + resultats_commune.abstentions_t2 = 0 + resultats_commune.exprimes_t2 = 0 + resultats_commune.blancs_t2 = 0 + resultats_commune.nuls_t2 = 0 + + for voix_nuance in resultats_commune.voix: + voix_nuance.voix_t1 = 0 + voix_nuance.voix_t2 = 0 + + session.commit() + + for tour, file_url in tours: + file = get_file(file_url, + f"resultats-legislatives-2022-t{tour}-par-commune.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + with Session(engine) as session: + for row in tqdm(reader, desc=f"Commune tour {tour}", disable=not verbose): + dpt_code = row[0] + + match dpt_code: + case "ZA": + dpt_code = "971" + case "ZB": + dpt_code = "972" + case "ZC": + dpt_code = "973" + case "ZD": + dpt_code = "974" + case "ZS": + dpt_code = "975" + case "ZM": + dpt_code = "976" + case "ZX": + dpt_code = "977" + case "ZW": + dpt_code = "986" + case "ZP": + dpt_code = "987" + case "ZN": + dpt_code = "988" + + if dpt_code == "987" or dpt_code == "988": + # Les communes de Polynésie française ne sont pas importées + continue + + com_code = str(dpt_code + row[4][(1 if len(dpt_code) == 3 else 0):]).zfill(5) + + if com_code == "98601": + # 3 royaumes à Wallis-et-Futuna, mais un seul résultat de commune + com_code = "98611" + + resultats_commune = session.execute(select(ResultatsCommuneLegislatives2022) + .filter_by(commune_id=com_code)).scalar_one_or_none() + if not resultats_commune: + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{dpt_code.zfill(2)}")).scalar_one() + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)).scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one() + + resultats_reg = ResultatsRegionLegislatives2022(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartementLegislatives2022(dpt_id=dpt_code, + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one() + + resultats_commune = ResultatsCommuneLegislatives2022(commune_id=com_code, + resultats_dpt_id=resultats_dpt.id) + session.add(resultats_commune) + resultats_commune = session.execute(select(ResultatsCommuneLegislatives2022) + .filter_by(commune_id=com_code)).scalar_one() + + if tour == 1: + resultats_commune.inscrits_t1 = int(row[7]) + resultats_commune.votants_t1 = int(row[10]) + resultats_commune.abstentions_t1 = int(row[8]) + resultats_commune.exprimes_t1 = int(row[18]) + resultats_commune.blancs_t1 = int(row[12]) + resultats_commune.nuls_t1 = int(row[15]) + elif tour == 2: + resultats_commune.inscrits_t2 = int(row[7]) + resultats_commune.votants_t2 = int(row[10]) + resultats_commune.abstentions_t2 = int(row[8]) + resultats_commune.exprimes_t2 = int(row[18]) + resultats_commune.blancs_t2 = int(row[12]) + resultats_commune.nuls_t2 = int(row[15]) + + row_voix = row[21:] + + for i in range(len(row_voix) // 8): + bloc_voix = row_voix[i * 8:(i + 1) * 8] + nuance_code = bloc_voix[4] + voix = int(bloc_voix[5]) + + voix_nuance_commune = session.execute(select(VoixCommuneLegislatives2022) + .filter_by(resultats_commune_id=resultats_commune.id, + nuance_id=nuance_code)) \ + .scalar_one_or_none() + if not voix_nuance_commune: + voix_nuance_commune = VoixCommuneLegislatives2022(resultats_commune_id=resultats_commune.id, + nuance_id=nuance_code) + session.add(voix_nuance_commune) + + if tour == 1: + voix_nuance_commune.voix_t1 += voix + elif tour == 2: + voix_nuance_commune.voix_t2 += voix + + session.commit() + + +def importer_resultats_circo(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/114b13e8-7ff9-437f-9ec8-7a29258a80e3"), + (2, "https://www.data.gouv.fr/fr/datasets/r/436b9eaa-64e1-46bd-ad1d-9638ebbd6caf")] + + for tour, file_url in tours: + file = get_file(file_url, f"resultats-legislatives-2022-t{tour}-par-circonscription.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + with Session(engine) as session: + for row in tqdm(reader, desc=f"Circonscription tour {tour}", disable=not verbose): + dpt_code = row[0] + circo_code = row[2] + + match dpt_code: + case "ZA": + dpt_code = "971" + case "ZB": + dpt_code = "972" + case "ZC": + dpt_code = "973" + case "ZD": + dpt_code = "974" + case "ZS": + dpt_code = "975" + case "ZM": + dpt_code = "976" + case "ZX": + dpt_code = "977" + case "ZW": + dpt_code = "986" + case "ZP": + dpt_code = "987" + case "ZN": + dpt_code = "988" + + circo_id = f"{dpt_code.zfill(2)}-{circo_code.zfill(2)}" + + resultats_circo = session.execute(select(ResultatsCirconscriptionLegislatives2022) + .filter_by(circo_id=circo_id)).scalar_one_or_none() + if not resultats_circo: + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement).filter_by(code_insee=f"{dpt_code.zfill(2)}")) \ + .scalar_one() + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one() + + resultats_reg = ResultatsRegionLegislatives2022(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartementLegislatives2022(dpt_id=dpt_code, + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one() + + resultats_circo = ResultatsCirconscriptionLegislatives2022( + circo_id=circo_id, resultats_departement_id=resultats_dpt.id) + session.add(resultats_circo) + resultats_circo = session.execute(select(ResultatsCirconscriptionLegislatives2022) + .filter_by(circo_id=circo_id)).scalar_one() + + if tour == 1: + resultats_circo.inscrits_t1 = int(row[5]) + resultats_circo.votants_t1 = int(row[8]) + resultats_circo.abstentions_t1 = int(row[6]) + resultats_circo.exprimes_t1 = int(row[16]) + resultats_circo.blancs_t1 = int(row[10]) + resultats_circo.nuls_t1 = int(row[13]) + elif tour == 2: + resultats_circo.inscrits_t2 = int(row[5]) + resultats_circo.votants_t2 = int(row[8]) + resultats_circo.abstentions_t2 = int(row[6]) + resultats_circo.exprimes_t2 = int(row[16]) + resultats_circo.blancs_t2 = int(row[10]) + resultats_circo.nuls_t2 = int(row[13]) + + row_voix = row[19:] + + for i in range(len(row_voix) // 9): + bloc_voix = row_voix[i * 9:(i + 1) * 9] + numero = bloc_voix[0] + voix = int(bloc_voix[5]) + + candidat = session.execute(select(CandidatLegislatives2022) + .filter_by(circonscription_id=circo_id, numero=numero)) \ + .scalar_one_or_none() + + voix_candidat_circo = session.execute(select(VoixCirconscriptionLegislatives2022) + .filter_by(resultats_circonscription_id=resultats_circo.id, + candidat_id=candidat.id)) \ + .scalar_one_or_none() + if not voix_candidat_circo: + voix_candidat_circo = VoixCirconscriptionLegislatives2022( + resultats_circonscription_id=resultats_circo.id, candidat_id=candidat.id) + session.add(voix_candidat_circo) + + if tour == 1: + voix_candidat_circo.voix_t1 = voix + elif tour == 2: + voix_candidat_circo.voix_t2 = voix + + session.commit() + + +def importer_resultats_departement(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/4acba699-e512-4dc5-87c7-ba74db773633"), + (2, "https://www.data.gouv.fr/fr/datasets/r/bc76bd72-cf65-45b2-b651-7716dc3c5520")] + + for tour, file_url in tours: + file = get_file(file_url, f"resultats-legislatives-2022-t{tour}-par-departement.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + with Session(engine) as session: + for row in tqdm(reader, desc=f"Département tour {tour}", disable=not verbose): + dpt_code = row[0] + + match dpt_code: + case "ZA": + dpt_code = "971" + case "ZB": + dpt_code = "972" + case "ZC": + dpt_code = "973" + case "ZD": + dpt_code = "974" + case "ZS": + dpt_code = "975" + case "ZM": + dpt_code = "976" + case "ZX": + dpt_code = "977" + case "ZW": + dpt_code = "986" + case "ZP": + dpt_code = "987" + case "ZN": + dpt_code = "988" + + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one_or_none() + if not resultats_dpt: + dpt = session.execute(select(Departement) + .filter_by(code_insee=f"{dpt_code.zfill(2)}")) \ + .scalar_one() + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)) \ + .scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)) \ + .scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one() + + resultats_reg = ResultatsRegionLegislatives2022(region_id=str(dpt.region_code), + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=dpt.region_code)).scalar_one() + + resultats_dpt = ResultatsDepartementLegislatives2022(dpt_id=dpt_code, + resultats_region_id=resultats_reg.id) + session.add(resultats_dpt) + resultats_dpt = session.execute(select(ResultatsDepartementLegislatives2022) + .filter_by(dpt_id=dpt_code)).scalar_one() + + if tour == 1: + resultats_dpt.inscrits_t1 = int(row[3]) + resultats_dpt.votants_t1 = int(row[6]) + resultats_dpt.abstentions_t1 = int(row[4]) + resultats_dpt.exprimes_t1 = int(row[14]) + resultats_dpt.blancs_t1 = int(row[8]) + resultats_dpt.nuls_t1 = int(row[11]) + elif tour == 2: + resultats_dpt.inscrits_t2 = int(row[3]) + resultats_dpt.votants_t2 = int(row[6]) + resultats_dpt.abstentions_t2 = int(row[4]) + resultats_dpt.exprimes_t2 = int(row[14]) + resultats_dpt.blancs_t2 = int(row[8]) + resultats_dpt.nuls_t2 = int(row[11]) + + row_voix = row[17:] + + for i in range(len(row_voix) // 5): + bloc_voix = row_voix[i * 5:(i + 1) * 5] + nuance_code = bloc_voix[0] + voix = int(bloc_voix[1]) + + voix_nuance_dpt = session.execute(select(VoixDepartementLegislatives2022) + .filter_by(resultats_departement_id=resultats_dpt.id, + nuance_id=nuance_code)) \ + .scalar_one_or_none() + if not voix_nuance_dpt: + voix_nuance_dpt = VoixDepartementLegislatives2022(resultats_departement_id=resultats_dpt.id, + nuance_id=nuance_code) + session.add(voix_nuance_dpt) + + if tour == 1: + voix_nuance_dpt.voix_t1 = voix + elif tour == 2: + voix_nuance_dpt.voix_t2 = voix + + session.commit() + + +def importer_resultats_region(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/f79576c9-9ddc-43f9-8d99-f0856550054e"), + (2, "https://www.data.gouv.fr/fr/datasets/r/fdba421a-d4e9-4f38-9753-982a7c065911")] + + for tour, file_url in tours: + file = get_file(file_url, f"resultats-legislatives-2022-t{tour}-par-region.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + + with Session(engine) as session: + for row in tqdm(reader, desc=f"Région tour {tour}", disable=not verbose): + reg_code = row[0] + + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=reg_code)).scalar_one_or_none() + if not resultats_reg: + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one_or_none() + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one() + + resultats_reg = ResultatsRegionLegislatives2022(region_id=reg_code, + resultats_france_id=resultats_france.id) + session.add(resultats_reg) + session.commit() + resultats_reg = session.execute(select(ResultatsRegionLegislatives2022) + .filter_by(region_id=reg_code)).scalar_one() + + if tour == 1: + resultats_reg.inscrits_t1 = int(row[3]) + resultats_reg.votants_t1 = int(row[6]) + resultats_reg.abstentions_t1 = int(row[4]) + resultats_reg.exprimes_t1 = int(row[14]) + resultats_reg.blancs_t1 = int(row[8]) + resultats_reg.nuls_t1 = int(row[11]) + elif tour == 2: + resultats_reg.inscrits_t2 = int(row[3]) + resultats_reg.votants_t2 = int(row[6]) + resultats_reg.abstentions_t2 = int(row[4]) + resultats_reg.exprimes_t2 = int(row[14]) + resultats_reg.blancs_t2 = int(row[8]) + resultats_reg.nuls_t2 = int(row[11]) + + row_voix = row[17:] + + for i in range(len(row_voix) // 5): + bloc_voix = row_voix[i * 5:(i + 1) * 5] + nuance_code = bloc_voix[0] + voix = int(bloc_voix[1]) + + voix_nuance_reg = session.execute(select(VoixRegionLegislatives2022) + .filter_by(resultats_region_id=resultats_reg.id, + nuance_id=nuance_code)) \ + .scalar_one_or_none() + if not voix_nuance_reg: + voix_nuance_reg = VoixRegionLegislatives2022(resultats_region_id=resultats_reg.id, + nuance_id=nuance_code) + session.add(voix_nuance_reg) + + if tour == 1: + voix_nuance_reg.voix_t1 = voix + elif tour == 2: + voix_nuance_reg.voix_t2 = voix + + session.commit() + + +def importer_resultats_france(engine: Engine, verbose: bool = False) -> None: + tours = [(1, "https://www.data.gouv.fr/fr/datasets/r/cbe72f2c-2c56-4251-89cb-213de8bfbea0"), + (2, "https://www.data.gouv.fr/fr/datasets/r/6639bb39-13ec-49dd-ab4d-1f8227b86c01")] + for tour, file_url in tours: + file = get_file(file_url, f"resultats-legislatives-2022-t{tour}-france-entiere.csv") + + with file.open('r', encoding="ISO-8859-1") as f: + reader = csv.reader(f, delimiter=';') + next(reader) + + with Session(engine) as session: + for row in tqdm(reader, desc=f"France Entière tour {tour}", disable=not verbose): + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one_or_none() + + if not resultats_france: + session.add(ResultatsFranceLegislatives2022()) + session.commit() + resultats_france = session.execute(select(ResultatsFranceLegislatives2022)).scalar_one() + + if tour == 1: + resultats_france.inscrits_t1 = int(row[3]) + resultats_france.votants_t1 = int(row[6]) + resultats_france.abstentions_t1 = int(row[4]) + resultats_france.exprimes_t1 = int(row[14]) + resultats_france.blancs_t1 = int(row[8]) + resultats_france.nuls_t1 = int(row[11]) + elif tour == 2: + resultats_france.inscrits_t2 = int(row[3]) + resultats_france.votants_t2 = int(row[6]) + resultats_france.abstentions_t2 = int(row[4]) + resultats_france.exprimes_t2 = int(row[14]) + resultats_france.blancs_t2 = int(row[8]) + resultats_france.nuls_t2 = int(row[11]) + + row_voix = row[17:] + + for i in range(len(row_voix) // 5): + bloc_voix = row_voix[i * 5:(i + 1) * 5] + nuance_code = bloc_voix[0] + voix = int(bloc_voix[1]) + + voix_nuance_france = session.execute(select(VoixFranceLegislatives2022) + .filter_by(resultats_france_id=resultats_france.id, + nuance_id=nuance_code)) \ + .scalar_one_or_none() + if not voix_nuance_france: + voix_nuance_france = VoixFranceLegislatives2022(resultats_france_id=resultats_france.id, + nuance_id=nuance_code) + session.add(voix_nuance_france) + + if tour == 1: + voix_nuance_france.voix_t1 = voix + elif tour == 2: + voix_nuance_france.voix_t2 = voix + + session.commit() + + +def run(engine: Engine, verbose: bool = False) -> None: + importer_resultats_france(engine, verbose) + importer_resultats_region(engine, verbose) + importer_resultats_departement(engine, verbose) + importer_resultats_circo(engine, verbose) + importer_resultats_commune(engine, verbose) + importer_resultats_bv(engine, verbose)