#!/usr/bin/env python3 from collections import OrderedDict import csv from datetime import datetime from math import log, pi, tan import enum import json from flask import Flask, Response, abort, redirect from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy import requests from sqlalchemy import Boolean, Column, Date, Enum, Float, ForeignKey, Integer, JSON, String, desc, func from sqlalchemy.orm import relationship from tqdm import tqdm app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://nupes:nupes@psql.adm.ynerant.fr:5432/nupes' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True app.config['SECRET_KEY'] = "random string" # app.config['SQLALCHEMY_ECHO'] = True db = SQLAlchemy(app) Migrate(app, db) class Nuance(db.Model): __tablename__ = 'nuance' code = Column(String(3), primary_key=True, nullable=False) name = Column(String(255), unique=True, nullable=False) color = Column(String(6), nullable=False) class CandidatPresidentielle(db.Model): __tablename__ = 'candidat_presidentielle' id = Column(Integer, primary_key=True) last_name = Column(String(255)) first_name = Column(String(255)) slug = Column(String(255), unique=True) nuance_id = Column(String(3), ForeignKey('nuance.code')) nuance = relationship('Nuance') class CandidatLegislatives(db.Model): class Gender(enum.Enum): MALE = 'M' FEMALE = 'F' __tablename__ = 'candidat_legislatives' id = Column(Integer, primary_key=True) last_name = Column(String(255)) first_name = Column(String(255)) birth_date = Column(Date) gender = Column(Enum(Gender)) nuance_id = Column(String(3), ForeignKey('nuance.code')) nuance = relationship('Nuance') circonscription_id = Column(String(8), ForeignKey('circonscription.id')) circonscription = relationship('Circonscription') pane_number = Column(Integer) job = Column(String(255)) exitting = Column(Boolean) last_name_suppl = Column(String(255)) first_name_suppl = Column(String(255)) birth_date_suppl = Column(Date) gender_suppl = Column(Enum(Gender)) exitting_suppl = Column(Boolean) class Region(db.Model): __tablename__ = 'region' id = Column(Integer, primary_key=True) name = Column(String(32), unique=True) code = Column(Integer, unique=True) class Departement(db.Model): __tablename__ = 'departement' id = Column(Integer, primary_key=True) name = Column(String(32), unique=True) region_id = Column(Integer, ForeignKey('region.id')) region = relationship('Region') class Circonscription(db.Model): __tablename__ = 'circonscription' id = Column(String(8), primary_key=True) number = Column(Integer) label = Column(String(255)) departement_id = Column(Integer, ForeignKey("departement.id")) departement = relationship('Departement') geometry = Column(JSON) class Commune(db.Model): __tablename__ = 'commune' id = Column(Integer, primary_key=True) code = Column(Integer) name = Column(String(255), index=True) departement_id = Column(Integer, ForeignKey('departement.id')) departement = relationship('Departement') class BureauVote(db.Model): __tablename__ = 'bureauvote' id = Column(String(16), primary_key=True) label = Column(String(255)) code = Column(String(8)) epci_code = Column(Integer) epci_name = Column(String(255)) address = Column(String(255)) circonscription_id = Column(String(8), ForeignKey('circonscription.id')) circonscription = relationship('Circonscription') commune_id = Column(Integer, ForeignKey('commune.id')) commune = relationship('Commune') latitude = Column(Float, nullable=True) longitude = Column(Float, nullable=True) class VotePresidentielle(db.Model): __tablename__ = 'vote_presidentielle' id = Column(Integer, primary_key=True) bv_id = Column(String(16), ForeignKey('bureauvote.id')) bv = relationship('BureauVote') inscrits = Column(Integer) votants = Column(Integer) abstentions = Column(Integer) exprimes = Column(Integer) blancs = Column(Integer) nuls = Column(Integer) class VoteCandidatPresidentielle(db.Model): __tablename__ = 'vote_candidat_presidentielle' id = Column(Integer, primary_key=True) vote_id = Column(Integer, ForeignKey('vote_presidentielle.id')) vote = relationship('VotePresidentielle') candidat_id = Column(Integer, ForeignKey('candidat_presidentielle.id')) candidat = relationship('CandidatPresidentielle') voix = Column(Integer) class VoteLegislatives(db.Model): __tablename__ = 'vote_legislatives' id = Column(Integer, primary_key=True) circonscription_id = Column(String(8), ForeignKey('circonscription.id')) circonscription = relationship('Circonscription') round = Column(Integer, default=1) inscrits = Column(Integer) votants = Column(Integer) abstentions = Column(Integer) exprimes = Column(Integer) blancs = Column(Integer) nuls = Column(Integer) class VoteCandidatLegislatives(db.Model): __tablename__ = 'vote_candidat_legislatives' id = Column(Integer, primary_key=True) vote_id = Column(Integer, ForeignKey('vote_legislatives.id')) vote = relationship('VoteLegislatives') candidat_id = Column(Integer, ForeignKey('candidat_legislatives.id')) candidat = relationship('CandidatLegislatives') voix = Column(Integer) @app.get('/upload') def upload(): content = "" with open('elections-france-presidentielles-2022-1er-tour-par-bureau-de-vote.json') as f: data = json.load(f) #data = [] candidats = {candidat.last_name: candidat for candidat in CandidatPresidentielle.query.all()} regs = {reg.code: reg for reg in Region.query.all()} dpts = {dpt.id: dpt for dpt in Departement.query.all()} circos = {circo.id: circo for circo in Circonscription.query.all()} comms = {(comm.code, comm.name): comm for comm in Commune.query.all()} bvs = {bv.id: bv for bv in BureauVote.query.all()} vps = {vp.bv_id: vp for vp in VotePresidentielle.query.all()} vcps = {(vcp.vote_id, vcp.candidat_id): vcp for vcp in VoteCandidatPresidentielle.query.all()} for i, record in enumerate(tqdm(data)): try: fields = record['fields'] dpt_id = fields['dep_code'] dpt_name = fields['dep_name'] if dpt_name == "Corse-du-Sud": dpt_id = 201 elif dpt_name == "Haute-Corse": dpt_id = 202 elif dpt_name == "Guadeloupe": dpt_id = 971 elif dpt_name == "Martinique": dpt_id = 972 elif dpt_name == "Guyane": dpt_id = 973 elif dpt_name == "La Réunion": dpt_id = 974 elif dpt_name == "Saint-Pierre-et-Miquelon": dpt_id = 975 elif dpt_name == "Mayotte": dpt_id = 976 elif dpt_name == "Saint-Martin/Saint-Barthélemy": dpt_id = 977 elif dpt_name == "Wallis et Futuna": dpt_id = 986 elif dpt_name == "Polynésie française": dpt_id = 987 elif dpt_name == "Nouvelle-Calédonie": dpt_id = 988 else: dpt_id = int(dpt_id) reg_name = fields.get('reg_name', dpt_name) reg_code = int(fields.get('reg_code', dpt_id)) if reg_code in regs: reg = regs[reg_code] else: q = Region.query.filter(Region.code == reg_code) if True or not q.count(): reg = Region(code=reg_code, name=reg_name) db.session.add(reg) else: reg = q.one() regs[reg_code] = reg if dpt_id in dpts: dpt = dpts[dpt_id] else: q = Departement.query.filter(Departement.id == dpt_id) if True or not q.count(): dpt = Departement(id=dpt_id, name=dpt_name, region_id=reg.id) db.session.add(dpt) else: dpt = q.one() dpts[dpt_id] = dpt circo_label = fields['libelle_de_la_circonscription'] circo_nb = int(fields['code_de_la_circonscription']) circo_id = f"{dpt_id:003d}-{circo_nb:02d}" if circo_id in circos: circo = circos[circo_id] else: q = Circonscription.query.filter(Circonscription.id == circo_id) if True or not q.count(): circo = Circonscription(id=circo_id, number=circo_nb, label=circo_label, departement_id=dpt.id) db.session.add(circo) else: circo = q.one() circos[circo_id] = circo com_code = int(fields['com_code'].replace('2A', '201').replace('2B', '202')) com_name = fields['com_name'] if (com_code, com_name) in comms: com = comms[(com_code, com_name)] else: q = Commune.query.filter(Commune.code == com_code, Commune.name == com_name) if True or not q.count(): com = Commune(code=com_code, name=com_name, departement_id=dpt_id) db.session.add(com) else: com = q.one() comms[(com_code, com_name)] = com bv_code = fields['code_du_b_vote'] if bv_code.endswith('.0'): bv_code = bv_code[:-2] bv_id = f"{com_code}-{bv_code}" if bv_id in bvs: bv = bvs[bv_id] else: q = BureauVote.query.filter(BureauVote.id == bv_id) if True or not q.count(): bv_label = fields.get('lib_du_b_vote', "") bv_location = fields.get('location', None) bv_address = fields.get('address', "") bv_epci_name = fields.get('epci_name', "") bv_epci_code = int(fields.get('epci_code', "0")) bv = BureauVote(id=bv_id, code=bv_code, label=bv_label, address=bv_address, epci_name=bv_epci_name, epci_code=bv_epci_code, circonscription_id=circo.id, commune_id=com.id) if bv_location: bv.latitude, bv.longitude = bv_location db.session.add(bv) else: bv = q.one() bvs[bv_id] = bv # q = VotePresidentielle.query.filter(VotePresidentielle.bv_id == bv_id) # if not q.count(): if bv_id not in vps: vp = VotePresidentielle(bv_id=bv_id, inscrits=fields['inscrits'], abstentions=fields['abstentions'], votants=fields['votants'], exprimes=fields['exprimes'], nuls=fields['nuls'], blancs=fields['blancs']) vps[bv_id] = vp db.session.add(vp) else: vp = vps[bv_id] # vp = q.one() candidat = candidats[fields['nom']] # candidat = CandidatPresidentielle.query.filter( # CandidatPresidentielle.last_name == fields['nom']).one() if (vp.id, candidat.id) not in vcps: # q = VoteCandidatPresidentielle.query.filter( # VoteCandidatPresidentielle.vote_id == vp.id, # VoteCandidatPresidentielle.candidat_id == candidat.id, # ) # if not q.count(): vcp = VoteCandidatPresidentielle( vote_id=vp.id, candidat_id=candidat.id, voix=fields['voix'], ) vcps[(vp.id, candidat.id)] = vcp db.session.add(vcp) except Exception: print(record['fields']) raise if i % 50000 == 0: db.session.commit() db.session.commit() return "done" @app.get('/upload-circos') def upload_circos(): with open('france-circonscriptions-legislatives-2012.json') as f: data = json.load(f) for feature in tqdm(data['features']): prop = feature['properties'] dpt_id = prop['code_dpt'] num_circ = int(prop['num_circ']) if dpt_id == '2A': # Corse du Sud dpt_id = 201 elif dpt_id == '2B': # Haute-Corse dpt_id = 202 elif dpt_id == 'ZA': # Guadeloupe dpt_id = 971 elif dpt_id == 'ZB': # Martinique dpt_id = 972 elif dpt_id == 'ZC': # Guyane dpt_id = 973 elif dpt_id == 'ZD': # La Réunion dpt_id = 974 elif dpt_id == 'ZS': # Saint-Pierre-et-Miquelon dpt_id = 975 elif dpt_id == 'ZM': # Mayotte dpt_id = 976 elif dpt_id == 'ZX': # Saint-Martin / Saint-Barthélémy dpt_id = 977 elif dpt_id == 'ZW': # Wallis-et-Futuna dpt_id = 986 elif dpt_id == 'ZP': # Polynésie française dpt_id = 987 elif dpt_id == 'ZN': # Nouvelle-Calédonie dpt_id = 988 else: dpt_id = int(dpt_id) circ = Circonscription.query.filter(Circonscription.departement_id == dpt_id, Circonscription.number == num_circ).one() circ.geometry = feature['geometry']['coordinates'] db.session.add(circ) db.session.commit() return "done" @app.get('/upload-legislatives') def upload_legislatives(): with open('candidats-legislatives-2022.csv') as f: csvfile = csv.DictReader(f) for row in tqdm(csvfile): dpt_id = row['Code du département'] if dpt_id == '2A': # Corse du Sud dpt_id = 201 elif dpt_id == '2B': # Haute-Corse dpt_id = 202 elif dpt_id == 'ZA': # Guadeloupe dpt_id = 971 elif dpt_id == 'ZB': # Martinique dpt_id = 972 elif dpt_id == 'ZC': # Guyane dpt_id = 973 elif dpt_id == 'ZD': # La Réunion dpt_id = 974 elif dpt_id == 'ZS': # Saint-Pierre-et-Miquelon dpt_id = 975 elif dpt_id == 'ZM': # Mayotte dpt_id = 976 elif dpt_id == 'ZX': # Saint-Martin / Saint-Barthélémy dpt_id = 977 elif dpt_id == 'ZW': # Wallis-et-Futuna dpt_id = 986 elif dpt_id == 'ZP': # Polynésie française dpt_id = 987 elif dpt_id == 'ZN': # Nouvelle-Calédonie dpt_id = 988 elif dpt_id == 'ZZ': # Français⋅es établi⋅es à l'étranger dpt_id = 97 else: dpt_id = int(dpt_id) circo_nb = int(row['Code circonscription']) circo_id = f'{dpt_id:03d}-{circo_nb:02d}' pane_id = row['N° panneau'] last_name = row['Nom candidat'] first_name = row['Prénom candidat'] gender = CandidatLegislatives.Gender(row['Sexe candidat']) birth_date = datetime.strptime(row['Date naissance candidat'], '%d/%m/%Y').date() nuance_id = row['Nuance candidat'] pane_number = row['N° panneau'] job = row['Profession candidat'] exitting = row['Le candidat est sortant'] == 'oui' last_name_suppl = row['Nom remplaçant'] first_name_suppl = row['Prénom remplaçant'] birth_date_suppl = datetime.strptime(row['Date naiss. remplaçant'], '%d/%m/%Y').date() gender_suppl = CandidatLegislatives.Gender(row['Sexe remplaçant']) exitting_suppl = row['Le remplaçant est sortant'] == 'oui' q = CandidatLegislatives.query.filter( CandidatLegislatives.last_name == last_name, CandidatLegislatives.first_name == first_name, CandidatLegislatives.circonscription_id == circo_id, CandidatLegislatives.nuance_id == nuance_id, ) if not q.count(): candidat = CandidatLegislatives( last_name=last_name, first_name=first_name, birth_date=birth_date, gender=gender, nuance_id=nuance_id, circonscription_id=circo_id, pane_number=pane_number, job=job, exitting=exitting, last_name_suppl=last_name_suppl, first_name_suppl=first_name_suppl, gender_suppl=gender_suppl, exitting_suppl=exitting_suppl, ) db.session.add(candidat) db.session.commit() return "" @app.get('/draw/') def draw(map_type: str): groups = [ (Circonscription.query.filter((Circonscription.departement_id <= 95) | (Circonscription.departement_id == 201) | (Circonscription.departement_id == 202)), 30000), (Circonscription.query.filter(Circonscription.departement_id.in_([77, 78, 91, 95])), 60000), (Circonscription.query.filter(Circonscription.departement_id.in_([75, 92, 93, 94])), 200000), (Circonscription.query.filter(Circonscription.departement_id == 69), 60000), (Circonscription.query.filter(Circonscription.departement_id == 13), 60000), (Circonscription.query.filter(Circonscription.departement_id == 971), 40000), (Circonscription.query.filter(Circonscription.departement_id == 972), 40000), (Circonscription.query.filter(Circonscription.departement_id == 973), 10000), (Circonscription.query.filter(Circonscription.departement_id == 974), 40000), (Circonscription.query.filter(Circonscription.departement_id == 975), 40000), (Circonscription.query.filter(Circonscription.departement_id == 976), 40000), (Circonscription.query.filter(Circonscription.departement_id == 977), 60000), (Circonscription.query.filter(Circonscription.departement_id == 986), 60000), (Circonscription.query.filter(Circonscription.departement_id == 987), 5000), (Circonscription.query.filter(Circonscription.departement_id == 988), 20000), (Circonscription.query.filter(Circonscription.id == '097-01'), 30000), (Circonscription.query.filter(Circonscription.id == '097-02'), 30000), (Circonscription.query.filter(Circonscription.id == '097-03'), 30000), (Circonscription.query.filter(Circonscription.id == '097-04'), 30000), (Circonscription.query.filter(Circonscription.id == '097-05'), 30000), (Circonscription.query.filter(Circonscription.id == '097-06'), 30000), (Circonscription.query.filter(Circonscription.id == '097-07'), 30000), (Circonscription.query.filter(Circonscription.id == '097-08'), 30000), (Circonscription.query.filter(Circonscription.id == '097-09'), 30000), (Circonscription.query.filter(Circonscription.id == '097-10'), 30000), (Circonscription.query.filter(Circonscription.id == '097-11'), 30000), ] nuances = {n.code: n for n in Nuance.query.all()} content = "\n\n" content += "\n\nÉlections législatives - 2ème tour\n\n\n\n" w, h = 20000, 20000 voix_pres = VoteCandidatPresidentielle.query\ .filter(VoteCandidatPresidentielle.vote_id == VotePresidentielle.id)\ .filter(VoteCandidatPresidentielle.candidat_id == CandidatPresidentielle.id)\ .filter(BureauVote.id == VotePresidentielle.bv_id)\ .group_by(CandidatPresidentielle.nuance_id, BureauVote.circonscription_id)\ .with_entities(CandidatPresidentielle.nuance_id, BureauVote.circonscription_id, func.sum(VoteCandidatPresidentielle.voix).label('tot_voix'))\ .order_by(BureauVote.circonscription_id, desc('tot_voix')).all() per_circo_pres = {} for row in voix_pres: per_circo_pres.setdefault(row[1], OrderedDict()) per_circo_pres[row[1]][row[0]] = row[2] voix_leg_1 = VoteCandidatLegislatives.query\ .filter(VoteCandidatLegislatives.vote_id == VoteLegislatives.id)\ .filter(VoteLegislatives.round == 1)\ .filter(VoteCandidatLegislatives.candidat_id == CandidatLegislatives.id)\ .with_entities(CandidatLegislatives.circonscription_id, CandidatLegislatives.last_name, CandidatLegislatives.first_name, CandidatLegislatives.nuance_id, VoteCandidatLegislatives.voix)\ .order_by(CandidatLegislatives.circonscription_id, VoteCandidatLegislatives.voix.desc(), CandidatLegislatives.pane_number).all() voix_leg_2 = VoteCandidatLegislatives.query\ .filter(VoteCandidatLegislatives.vote_id == VoteLegislatives.id)\ .filter(VoteLegislatives.round == 2)\ .filter(VoteCandidatLegislatives.candidat_id == CandidatLegislatives.id)\ .with_entities(CandidatLegislatives.circonscription_id, CandidatLegislatives.last_name, CandidatLegislatives.first_name, CandidatLegislatives.nuance_id, VoteCandidatLegislatives.voix)\ .order_by(CandidatLegislatives.circonscription_id, VoteCandidatLegislatives.voix.desc(), CandidatLegislatives.pane_number).all() votes_leg_1 = {vl.circonscription_id: vl for vl in VoteLegislatives.query\ .filter(VoteLegislatives.round == 1).all()} votes_leg_2 = {vl.circonscription_id: vl for vl in VoteLegislatives.query\ .filter(VoteLegislatives.round == 2).all()} scores_1_n = {n: 0 for n in nuances} scores_2_n = {n: 0 for n in nuances} stats_1 = { 'inscrits': sum(vote.inscrits for vote in votes_leg_1.values() if vote.inscrits), 'abstentions': sum(vote.abstentions for vote in votes_leg_1.values() if vote.abstentions), 'votants': sum(vote.votants for vote in votes_leg_1.values() if vote.votants), 'exprimes': sum(vote.exprimes for vote in votes_leg_1.values() if vote.exprimes), 'blancs': sum(vote.blancs for vote in votes_leg_1.values() if vote.blancs), 'nuls': sum(vote.nuls for vote in votes_leg_1.values() if vote.nuls), } stats_2 = { 'inscrits': sum(vote.inscrits for vote in votes_leg_2.values() if vote.inscrits), 'abstentions': sum(vote.abstentions for vote in votes_leg_2.values() if vote.abstentions), 'votants': sum(vote.votants for vote in votes_leg_2.values() if vote.votants), 'exprimes': sum(vote.exprimes for vote in votes_leg_2.values() if vote.exprimes), 'blancs': sum(vote.blancs for vote in votes_leg_2.values() if vote.blancs), 'nuls': sum(vote.nuls for vote in votes_leg_2.values() if vote.nuls), } premier_tour, premier_tour_n = set(), {n: 0 for n in nuances} second_tour_1, second_tour_1_n = set(), {n: 0 for n in nuances} second_tour_2, second_tour_2_n = set(), {n: 0 for n in nuances} configurations = {} winners = {n: 0 for n in nuances} per_circo_leg_1, per_circo_leg_2 = {}, {} for row in voix_leg_1: per_circo_leg_1.setdefault(row[0], []) per_circo_leg_1[row[0]].append(row[1:]) for row in voix_leg_2: per_circo_leg_2.setdefault(row[0], []) per_circo_leg_2[row[0]].append(row[1:]) for circo_id, rows in per_circo_leg_1.items(): vote = votes_leg_1[circo_id] if vote.exprimes: winner_r = rows[0] if winner_r[3] / vote.exprimes >= 0.5 and winner_r[3] / vote.inscrits >= 0.25: premier_tour.add(f"{winner_r[1]} {winner_r[0]} ({winner_r[2]}) - {circo_id}") premier_tour_n[winner_r[2]] += 1 winners[winner_r[2]] += 1 else: second_tour_1.add(f"{winner_r[1]} {winner_r[0]} ({winner_r[2]}) - {circo_id}") second_tour_1_n[winner_r[2]] += 1 sec = rows[1] second_tour_2.add(f"{sec[1]} {sec[0]} ({sec[2]}) - {circo_id}") second_tour_2_n[sec[2]] += 1 tr = rows[2] config = [winner_r[2], sec[2]] if tr[3] / vote.inscrits >= 0.125: print("3ème position :", tr) second_tour_2.add(f"{tr[1]} {tr[0]} ({tr[2]}) - {circo_id}") second_tour_2_n[tr[2]] += 1 config.append(tr[2]) config = tuple(sorted(config)) configurations.setdefault(config, 0) configurations[config] += 1 for r in rows: scores_1_n[r[2]] += r[3] for circo_id, rows in per_circo_leg_2.items(): vote = votes_leg_2[circo_id] if vote.exprimes: winners[rows[0][2]] += 1 for r in rows: scores_2_n[r[2]] += r[3] for group, ratio in groups: minx, miny, maxx, maxy = w, h, 0, 0 svg = "" for circo in group: if circo.departement_id != 97: polygons = circo.geometry if isinstance(polygons[0][0][0], float): polygons = [polygons] else: polygons = [[[[0, 0], [0, 1], [1, 1], [1, 0]]]] color = 0xffffff scores = {} if map_type.startswith('p'): if circo.id in per_circo_pres: scores = per_circo_pres[circo.id] winner_id = next(iter(scores)) winner = nuances[winner_id] color = int(winner.color, 16) elif map_type.startswith('l'): if circo.id in per_circo_leg_2: if '1' in map_type: winner_r = per_circo_leg_1[circo.id][0] winner = nuances[winner_r[2]] if per_circo_leg_1[circo.id][0][3]: color = int(winner.color, 16) else: winner_r = per_circo_leg_2[circo.id][0] winner = nuances[winner_r[2]] if per_circo_leg_2[circo.id][0][3] or len(per_circo_leg_2[circo.id]) == 1: color = int(winner.color, 16) scores_1 = {f"{s[1]} {s[0]} ({s[2]})": s[3] for s in per_circo_leg_1[circo.id]} scores_2 = {f"{s[1]} {s[0]} ({s[2]})": s[3] for s in per_circo_leg_2[circo.id]} for polygon in polygons: if len(polygon) > 1: print(circo) polygon = polygon[0] cartesian = [(w * (p[0] + 180) / 360, h / 2 - w / (2 * pi) * log(tan(pi / 4 + p[1] * pi / 360))) for p in polygon] points = cartesian minx = min(minx, min(p[0] for p in points)) miny = min(miny, min(p[1] for p in points)) maxx = max(maxx, max(p[0] for p in points)) maxy = max(maxy, max(p[1] for p in points)) svg += f'\n' svg += f'\n' svg += "" svg += f"{circo.label} de {circo.departement.name}\n" tot_voix_1 = sum(scores_1.values()) tot_voix_2 = sum(scores_2.values()) svg += "\n2<sup>ème tour</sup>\n" for name, voix in scores_2.items(): if tot_voix_2: svg += f"{name} : {100 * voix / tot_voix_2:.02f} % ({voix})\n" else: svg += f"{name}\n" svg += "\n1<sup>er</sup> tour\n" for name, voix in scores_1.items(): if tot_voix_1: svg += f"{name} : {100 * voix / tot_voix_1:.02f} % ({voix})\n" else: svg += f"{name}\n" svg += "\n" svg += "\n\n" svg += "\n" svg = f'\n' + svg content += svg if not map_type.startswith('l'): content += "\n\n" return content content += "
\n" content += "

Second tour

\n" content += "

Nombre de sièges

\n" content += f"Sièges attribués : {sum(winners.values())}/577\n" content += "\n" content += "

Statistiques nationales

\n" if not stats_2['inscrits']: # Résultats pas encore arrivés # On truque pour éviter les divisions par 0 en attendant stats_2['inscrits'] = 1 stats_2['votants'] = 1 stats_2['exprimes'] = 1 content += "

Résultats nationaux

\n" content += "\n" content += "\n" content += "
\n" content += "

Premier tour

\n" content += "

Statistiques nationales

\n" content += "

Résultats nationaux

\n" content += "\n" content += "\n" content += "

Victoires au premier tour

\n" content += "\n" content += "

Seconds tours

\n" second_tour_tot_n = {n: second_tour_1_n[n] + second_tour_2_n[n] for n in nuances} second_tour_tot_n = {k: v for k, v in second_tour_tot_n.items() if v} content += "\n" content += "

Configurations

\n" content += "\n" content += "\n\n" return content @app.get('/circo/') def info_circo(circo_id: str): circo = Circonscription.query.filter(Circonscription.id == circo_id) if not circo.count(): abort(404) circo = circo.one() nuances = {n.code: n for n in Nuance.query.all()} voix_pres = VoteCandidatPresidentielle.query\ .filter(VoteCandidatPresidentielle.vote_id == VotePresidentielle.id)\ .filter(VoteCandidatPresidentielle.candidat_id == CandidatPresidentielle.id)\ .filter(BureauVote.id == VotePresidentielle.bv_id)\ .filter(BureauVote.circonscription_id == circo_id)\ .group_by(CandidatPresidentielle.last_name, CandidatPresidentielle.first_name, CandidatPresidentielle.nuance_id)\ .with_entities(CandidatPresidentielle.last_name, CandidatPresidentielle.first_name, CandidatPresidentielle.nuance_id, func.sum(VoteCandidatPresidentielle.voix).label('tot_voix'), func.sum(VotePresidentielle.inscrits), func.sum(VotePresidentielle.votants), func.sum(VotePresidentielle.abstentions), func.sum(VotePresidentielle.exprimes), func.sum(VotePresidentielle.blancs), func.sum(VotePresidentielle.nuls))\ .order_by(desc('tot_voix')).all() voix_pres_nuance = VoteCandidatPresidentielle.query\ .filter(VoteCandidatPresidentielle.vote_id == VotePresidentielle.id)\ .filter(VoteCandidatPresidentielle.candidat_id == CandidatPresidentielle.id)\ .filter(BureauVote.id == VotePresidentielle.bv_id)\ .filter(BureauVote.circonscription_id == circo_id)\ .group_by(CandidatPresidentielle.nuance_id)\ .with_entities(CandidatPresidentielle.nuance_id, func.sum(VoteCandidatPresidentielle.voix).label('tot_voix'))\ .order_by(desc('tot_voix')).all() vote_leg_1 = VoteLegislatives.query.filter(VoteLegislatives.round == 1, VoteLegislatives.circonscription_id == circo_id).one() vote_leg_2 = VoteLegislatives.query.filter(VoteLegislatives.round == 2, VoteLegislatives.circonscription_id == circo_id).one() voix_leg_1 = VoteCandidatLegislatives.query\ .filter(VoteCandidatLegislatives.vote_id == VoteLegislatives.id)\ .filter(VoteLegislatives.round == 1)\ .filter(VoteCandidatLegislatives.candidat_id == CandidatLegislatives.id)\ .filter(CandidatLegislatives.circonscription_id == circo_id)\ .with_entities(CandidatLegislatives.last_name, CandidatLegislatives.first_name, CandidatLegislatives.nuance_id, VoteCandidatLegislatives.voix)\ .order_by(VoteCandidatLegislatives.voix.desc(), CandidatLegislatives.pane_number).all() voix_leg_2 = VoteCandidatLegislatives.query\ .filter(VoteCandidatLegislatives.vote_id == VoteLegislatives.id)\ .filter(VoteLegislatives.round == 2)\ .filter(VoteCandidatLegislatives.candidat_id == CandidatLegislatives.id)\ .filter(CandidatLegislatives.circonscription_id == circo_id)\ .with_entities(CandidatLegislatives.last_name, CandidatLegislatives.first_name, CandidatLegislatives.nuance_id, VoteCandidatLegislatives.voix)\ .order_by(VoteCandidatLegislatives.voix.desc(), CandidatLegislatives.pane_number).all() has_results_1 = voix_leg_1[0][3] > 0 has_results_2 = voix_leg_2[0][3] > 0 communes = Commune.query.filter(Commune.id == BureauVote.commune_id)\ .filter(BureauVote.circonscription_id == circo_id).order_by(Commune.name).all() html = "\n\n" html += f"\n\nCirconscription {circo_id}\n" html += "\n\n\n" html += "Retour carte de France - Législatives 2ème tour
" html += "Retour carte de France - Présidentielles 1er tour" html += f"

{circo.departement.name} - {circo.label}

\n" svg = "" color = int(nuances[voix_leg_2[0][2] if has_results_2 else voix_leg_1[0][2]].color if has_results_1 else nuances[voix_pres[0][2]].color, 16) w, h = 500, 500 minx, miny, maxx, maxy = w, h, 0, 0 if circo.departement_id != 97: polygons = circo.geometry if isinstance(polygons[0][0][0], float): polygons = [polygons] else: polygons = [[[[0, 0], [0, 1], [1, 1], [1, 0]]]] for polygon in polygons: polygon = polygon[0] cartesian = [(w * (p[0] + 180) / 360, h / 2 - w / (2 * pi) * log(tan(pi / 4 + p[1] * pi / 360))) for p in polygon] points = cartesian minx = min(minx, min(p[0] for p in points)) miny = min(miny, min(p[1] for p in points)) maxx = max(maxx, max(p[0] for p in points)) maxy = max(maxy, max(p[1] for p in points)) svg += f'\n' svg += "" svg = f'\n' + svg html += svg + "\n" html += "

Communes

\n" html += f"Département : {circo.departement.name} - {circo.departement.id} ({circo.departement.region.name})\n" html += "
    \n" for commune in communes: html += f"
  • {commune.name}
  • \n" html += "
\n" html += "

Résultats législatives 2ème tour

\n" if has_results_2: tot_voix = sum(res[3] for res in voix_leg_2) html += "
    \n" for i, res in enumerate(voix_leg_2): html += f"
  • " if i == 0: html += "" html += f"{res[1]} {res[0]} ({res[2]}) : {100 * res[3] / tot_voix:.02f} % ({res[3]})" if i == 0: html += "" html += "
  • \n" html += "
\n" html += "Statistiques :\n" if vote_leg_2.inscrits == 0: vote_leg_2.inscrits = 1 vote_leg_2.votants = 1 vote_leg_2.exprimes = 1 html += "
    \n" html += f"
  • Inscrit⋅es : {vote_leg_2.inscrits}
  • \n" html += f"
  • Votant⋅es : {vote_leg_2.votants} ({100 * vote_leg_2.votants / vote_leg_2.inscrits:.02f} %)
  • \n" html += f"
  • Abstentionistes : {vote_leg_2.abstentions} ({100 * vote_leg_2.abstentions / vote_leg_2.inscrits:.02f} %)
  • \n" html += f"
  • Voix exprimées : {vote_leg_2.exprimes}
  • \n" html += f"
  • Bulletins blancs : {vote_leg_2.blancs} ({100 * vote_leg_2.blancs / vote_leg_2.exprimes:.02f} %)
  • \n" html += f"
  • Bulletins nuls : {vote_leg_2.nuls} ({100 * vote_leg_2.nuls / vote_leg_2.exprimes:.02f} %)
  • \n" html += "
\n" else: html += "Résultats indisponibles. Liste des candidats :\n" html += "
    \n" for res in voix_leg_2: html += f"
  • {res[1]} {res[0]} ({res[2]})
  • \n" html += "
\n" html += "

Résultats législatives 1er tour

\n" if has_results_1: tot_voix = sum(res[3] for res in voix_leg_1) html += "
    \n" for i, res in enumerate(voix_leg_1): qualified = False if i == 0: qualified = True elif voix_leg_1[0][3] / vote_leg_1.exprimes < 0.5 or voix_leg_1[0][3] / vote_leg_1.inscrits < 0.25: if i == 1: qualified = True elif i == 2 and res[3] / vote_leg_1.inscrits >= 0.125: qualified = True html += f"
  • " if qualified: html += "" html += f"{res[1]} {res[0]} ({res[2]}) : {100 * res[3] / tot_voix:.02f} % ({res[3]})" if qualified: html += "" html += "
  • \n" html += "
\n" html += "Statistiques :\n" html += "
    \n" html += f"
  • Inscrit⋅es : {vote_leg_1.inscrits}
  • \n" html += f"
  • Votant⋅es : {vote_leg_1.votants} ({100 * vote_leg_1.votants / vote_leg_1.inscrits:.02f} %)
  • \n" html += f"
  • Abstentionistes : {vote_leg_1.abstentions} ({100 * vote_leg_1.abstentions / vote_leg_1.inscrits:.02f} %)
  • \n" html += f"
  • Voix exprimées : {vote_leg_1.exprimes}
  • \n" html += f"
  • Bulletins blancs : {vote_leg_1.blancs} ({100 * vote_leg_1.blancs / vote_leg_1.exprimes:.02f} %)
  • \n" html += f"
  • Bulletins nuls : {vote_leg_1.nuls} ({100 * vote_leg_1.nuls / vote_leg_1.exprimes:.02f} %)
  • \n" html += "
\n" else: html += "Résultats indisponibles. Liste des candidats :\n" html += "
    \n" for res in voix_leg_1: html += f"
  • {res[1]} {res[0]} ({res[2]})
  • \n" html += "
\n" html += "

Rappels présidentielles 1er tour

\n" tot_voix = sum(res[3] for res in voix_pres) html += "Par candidat :\n" html += "
    \n" for res in voix_pres: html += f"
  • {res[1]} {res[0]} ({res[2]}) : {100 * res[3] / tot_voix:.02f} % ({res[3]})
  • \n" html += "
\n" html += "Par nuance :\n" html += "
    \n" for res in voix_pres_nuance: html += f"
  • {res[0]} : {100 * res[1] / tot_voix:.02f} % ({res[1]})
  • \n" html += "
\n" html += "Statistiques :\n" inscrits, votants, abstentions, exprimes, blancs, nuls = voix_pres[0][4:] html += "
    \n" html += f"
  • Inscrit⋅es : {inscrits}
  • \n" html += f"
  • Votant⋅es : {votants} ({100 * votants / inscrits:.02f} %)
  • \n" html += f"
  • Abstentionistes : {abstentions} ({100 * abstentions / inscrits:.02f} %)
  • \n" html += f"
  • Voix exprimées : {exprimes}
  • \n" html += f"
  • Bulletins blancs : {blancs} ({100 * blancs / exprimes:.02f} %)
  • \n" html += f"
  • Bulletins nuls : {nuls} ({100 * nuls / exprimes:.02f} %)
  • \n" html += "
\n" html += "
    \n" html += "\n\n" return html @app.get('/refresh/') def refresh(circo_id: str): circo = Circonscription.query.filter(Circonscription.id == circo_id) if not circo.count(): abort(404) circo = circo.one() vote = VoteLegislatives.query.filter(VoteLegislatives.circonscription_id == circo_id, VoteLegislatives.round == 2).one() dpt_id = circo.departement_id if dpt_id == 201: dpt_id = '02A' elif dpt_id == 202: dpt_id = '02B' elif dpt_id == 97: dpt_id = '099' else: dpt_id = f'{dpt_id:03d}' url = f"https://www.resultats-elections.interieur.gouv.fr/legislatives-2022/{dpt_id}/{dpt_id}{circo.number:02d}.html" resp = requests.get(url) if resp.status_code != 200: abort(resp.status_code) data = {'status': 'OK', 'resultats': {}, 'statistiques': {'inscrits': 0, 'votants': 0, 'abstentions': 0, 'exprimes': 0, 'blancs': 0, 'nuls': 0}} content = resp.content.decode('iso-8859-15').lower() if "voix" not in content or "rappel" not in content: return Response(json.dumps({'status': 'error', 'message': 'results are not available'}), mimetype='application/json', status=404) content = content[content.index('voix'):content.index('rappel')] lines = content.split('\n') for i, line in enumerate(lines): if ">m." in line or ">mme" in line: name = line.split('>')[-2].split('<')[0] full_name = name.split(' ', 1)[1] field = func.concat(func.lower(CandidatLegislatives.first_name), ' ', func.lower(CandidatLegislatives.last_name)) nuance_code = lines[i + 1].split('>')[-2].split('<')[0].upper() candidat = CandidatLegislatives.query.filter(CandidatLegislatives.circonscription_id == circo_id, CandidatLegislatives.nuance_id == nuance_code, field == full_name).one() vc = VoteCandidatLegislatives.query.filter(VoteCandidatLegislatives.vote_id == vote.id, VoteCandidatLegislatives.candidat_id == candidat.id).one() for j in range(2, 10): l = lines[i + j] candidate = l.split('>')[-2].split('<', 2)[0].replace(' ', '') if candidate.isnumeric(): vc.voix = int(candidate) data['resultats'][f"{candidat.first_name} {candidat.last_name} ({nuance_code})"] = vc.voix db.session.add(vc) break elif '>inscrits' in line or '>abstentions' in line or '>votants' in line\ or '>blancs' in line or '>nuls' in line or '>exprimés' in line: name = line.split('>')[-2].split('<')[0].replace('é', 'e') tot = int(lines[i + 1].split('>')[-2].split('<', 2)[0].replace(' ', '')) setattr(vote, name, tot) data['statistiques'][name] = tot db.session.add(vote) db.session.commit() return Response(json.dumps(data), mimetype='application/json') @app.get('/') def index(): return redirect('/draw/legislatives') if __name__ == '__main__': if Nuance.query.count() == 0: db.session.add_all([ Nuance(name="Divers extrême gauche", code="DXG", color="bb0000"), Nuance(name="Parti radical de gauche", code="RDG", color="ffc0c0"), Nuance(name="Nouvelle union populaire écologique et sociale", code="NUP", color="bb1840"), Nuance(name="Divers gauche", code="DVG", color="ffc0c0"), Nuance(name="Écologistes", code="ECO", color="77ff77"), Nuance(name="Divers", code="DIV", color="eeeeee"), Nuance(name="Régionalistes", code="REG", color="dcbfa3"), Nuance(name="Ensemble", code="ENS", color="ffeb00"), Nuance(name="Divers centre", code="DVC", color="fac577"), Nuance(name="Union des Démocrates et des Indépendants", code="UDI", color="00ffff"), Nuance(name="Les Républicains", code="LR", color="0066cc"), Nuance(name="Divers droite", code="DVD", color="adc1fd"), Nuance(name="Droite souverainiste", code="DSV", color="0082c4"), Nuance(name="Reconquête !", code="REC", color="404040"), Nuance(name="Rassemblement National", code="RN", color="0d378a"), Nuance(name="Divers extrême droite", code="DXD", color="404040"), CandidatPresidentielle(last_name="ARTHAUD", first_name="Nathalie", slug="nathalie_arthaud", nuance_id="DXG"), CandidatPresidentielle(last_name="POUTOU", first_name="Philippe", slug="philippe_poutou", nuance_id="DXG"), CandidatPresidentielle(last_name="ROUSSEL", first_name="Fabien", slug="fabien_roussel", nuance_id="NUP"), CandidatPresidentielle(last_name="MÉLENCHON", first_name="Jean-Luc", slug="jean_luc_melenchon", nuance_id="NUP"), CandidatPresidentielle(last_name="HIDALGO", first_name="Anne", slug="anne_hidalgo", nuance_id="NUP"), CandidatPresidentielle(last_name="JADOT", first_name="Yannick", slug="yannick_jadot", nuance_id="NUP"), CandidatPresidentielle(last_name="MACRON", first_name="Emmanuel", slug="emmanuel_macron", nuance_id="ENS"), CandidatPresidentielle(last_name="LASSALLE", first_name="Jean", slug="jean_lassalle", nuance_id="DIV"), CandidatPresidentielle(last_name="PÉCRESSE", first_name="Valérie", slug="valerie_pecresse", nuance_id="LR"), CandidatPresidentielle(last_name="DUPONT-AIGNAN", first_name="Nicolas", slug="nicolas_dupont_aignan", nuance_id="DSV"), CandidatPresidentielle(last_name="LE PEN", first_name="Marine", slug="marine_le_pen", nuance_id="RN"), CandidatPresidentielle(last_name="ZEMMOUR", first_name="Éric", slug="eric_zemmour", nuance_id="REC"), ]) db.session.commit() if VoteCandidatLegislatives.query.count() == 0: for c in Circonscription.query.order_by(Circonscription.id).all(): vote = VoteLegislatives(circonscription_id=c.id) db.session.add(vote) for candidat in CandidatLegislatives.query.filter(CandidatLegislatives.circonscription_id == c.id).all(): db.session.add(VoteCandidatLegislatives(candidat_id=candidat.id, vote_id=vote.id)) db.session.commit() app.run(debug=True)