from collections import OrderedDict import datetime import hashlib from io import BytesIO import json import os.path import random import requests import subprocess from zipfile import ZipFile from bs4 import BeautifulSoup from tweepy import API, Client, OAuth1UserHandler from . import config BASE_PATH = os.path.dirname(os.path.dirname(__file__)) LEG = 16 DEPUTES = {} GROUPES_ID = [ "PO800490", "PO800502", "PO800526", "PO800496", "PO800538", "PO800484", "PO800514", "PO800532", "PO800508", "PO800520", "PO793087", ] GROUPES = OrderedDict(**{k: None for k in GROUPES_ID}) def update_scrutins(): url = f"https://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/scrutins/Scrutins.json.zip" md5 = requests.get(f"{url}.md5").content.decode().strip() local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Scrutins.md5") if os.path.isfile(local_md5_file): with open(local_md5_file) as f: local_md5 = f.read().strip() if local_md5 == md5: print("Scrutins publics déjà à jour") return print(f"Mise à jour des scrutins publics de la {LEG}e législature…") for _ in range(3): zip_content = requests.get(url).content if hashlib.md5(zip_content).hexdigest() == md5: break else: print("Mauvaise signature, abandon") exit(1) with ZipFile(BytesIO(zip_content), "r") as zf: for fileinfo in zf.infolist(): basename = os.path.basename(fileinfo.filename) with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Scrutins', basename), 'wb') as f: f.write(zf.read(fileinfo.filename)) with open(local_md5_file, 'w') as f: f.write(md5) print("Mise à jour effectuée") def update_amendements(): url = f"http://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/amendements_div_legis/Amendements.json.zip" md5 = requests.get(f"{url}.md5").content.decode().strip() local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Amendements.md5") if os.path.isfile(local_md5_file): with open(local_md5_file) as f: local_md5 = f.read().strip() if local_md5 == md5: print("Amendements déjà à jour") return print(f"Mise à jour des amendements de la {LEG}e législature…") for _ in range(3): zip_content = requests.get(url).content if hashlib.md5(zip_content).hexdigest() == md5: break else: print("Mauvaise signature, abandon") exit(1) with ZipFile(BytesIO(zip_content), "r") as zf: for fileinfo in zf.infolist(): basename = fileinfo.filename.replace('json/', '') dest = os.path.join(BASE_PATH, 'data', str(LEG), 'Amendements', basename) if not os.path.isdir(os.path.dirname(dest)): os.makedirs(os.path.dirname(dest)) with open(dest, 'wb') as f: f.write(zf.read(fileinfo.filename)) with open(local_md5_file, 'w') as f: f.write(md5) print("Mise à jour effectuée") def publish_amendement(obj, api, client): with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Scrutins', f"VTANR5L16V{obj['scrutin']}.json")) as f: scrutin = json.load(f)['scrutin'] with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Amendements', obj['amendement_path'])) as f: amendement = json.load(f)['amendement'] content = amendement['corps']['contenuAuteur']['dispositif'] date = datetime.date.fromisoformat(scrutin['dateScrutin']) now = datetime.date.today() delta = (now - date).days print(date, delta) syn = scrutin['syntheseVote'] for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']: print(g['organeRef']) print(scrutin['ventilationVotes']['organe']['groupes']['groupe'][-1]) print(sum(len((g['vote']['decompteNominatif']['pours'] or {'votant': []})['votant']) + len((g['vote']['decompteNominatif']['contres'] or {'votant': []})['votant']) for g in scrutin['ventilationVotes']['organe']['groupes']['groupe'])) with open('/tmp/img.html', 'w') as f: f.write(content) subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg']) text_media = api.media_upload("/tmp/img.jpg") api.create_media_metadata(text_media.media_id, BeautifulSoup(content.replace('

', '

\n\n'), features="lxml").get_text()[:1000]) vote_groupes = OrderedDict(**{k: None for k in GROUPES_ID}) for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']: vote_groupes[g['organeRef']] = g['vote'] # FIXME Faites mieux syn_html = '\n\n\n\n\n' syn_html += '\n\n' syn_html += f"\n" syn_html += '\n\n' for i, (gid, voix) in enumerate(vote_groupes.items()): if i % 4 == 0: if i != 0: syn_html += "\n" syn_html += "\n" syn_html += f"\n" syn_html += "\n" syn_html += "\n" syn_html += "\n" syn_html += f"\n\n\n\n\n" syn_html += "

{scrutin['titre']}

\n" syn_html += f"{GROUPES[gid]['libelleAbrege']}
\n" syn_html += "" syn_html += int(voix['decompteVoix']['pour']) * " ●" syn_html += "" syn_html += "" syn_html += int(voix['decompteVoix']['contre']) * " ●" syn_html += "" syn_html += "" syn_html += int(voix['decompteVoix']['abstentions']) * " ●" syn_html += "" syn_html += "" syn_html += (len(GROUPES[gid]['deputes']) - int(voix['decompteVoix']['pour']) - int(voix['decompteVoix']['contre']) - int(voix['decompteVoix']['abstentions'])) * " ●" syn_html += "\n
\n" syn_html += f"Pour : {voix['decompteVoix']['pour']}
Contre : {voix['decompteVoix']['contre']}
Abstentions : {voix['decompteVoix']['abstentions']}\n" syn_html += "
\n" syn_html += "Total\n" syn_html += "
\n" syn_html += f"Pour : {syn['decompte']['pour']}
\n" syn_html += f"Contre : {syn['decompte']['contre']}
\n" syn_html += f"Abstentions : {syn['decompte']['abstentions']}
\n" if scrutin['sort']['code'] == 'rejeté': syn_html += "" else: syn_html += "" syn_html += scrutin['sort']['libelle'] syn_html += "\n" syn_html += "
\nScrutin n°{scrutin['numero']}, réalisé le {datetime.date.fromisoformat(scrutin['dateScrutin']):%d/%m/%Y}
\n\n" with open('/tmp/img.html', 'w') as f: f.write(syn_html) alt_text = "Tableau présentant les votes de l'amendement\n\n" + \ f"Exprimés : {syn['suffragesExprimes']}\n" + \ f"Pour : {syn['decompte']['pour']}\n" + \ f"Contre : {syn['decompte']['contre']}\n" + \ f"Abstentions : {syn['decompte']['abstentions']}\n\n" + \ scrutin['sort']['libelle'] subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg']) result_media = api.media_upload("/tmp/img.jpg") api.create_media_metadata(result_media.media_id, alt_text) message = obj['message'].format(date=date, nb_jours=delta) client.create_tweet(text=message, media_ids=[result_media.media_id, text_media.media_id]) def main(): auth = OAuth1UserHandler( config.API_KEY, config.API_KEY_SECRET, config.ACCESS_TOKEN, config.ACCESS_TOKEN_SECRET, ) api = API(auth) client = Client( bearer_token=config.BEARER_TOKEN, consumer_key=config.API_KEY, consumer_secret=config.API_KEY_SECRET, access_token=config.ACCESS_TOKEN, access_token_secret=config.ACCESS_TOKEN_SECRET, ) update_scrutins() update_amendements() with open(os.path.join(BASE_PATH, 'data', 'votes.json')) as f: data = json.load(f) for groupe_id in GROUPES_ID: with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Organes', f"{groupe_id}.json")) as f: groupe = json.load(f)['organe'] groupe['deputes'] = set() GROUPES[groupe_id] = groupe for filename in os.listdir(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs')): with open(os.path.join(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs', filename))) as f: acteur = json.load(f)['acteur'] for mandat in acteur['mandats']['mandat']: organes = mandat['organes']['organeRef'] if not isinstance(organes, list): organes = [organes] for organe in organes: if organe in GROUPES: groupe = GROUPES[organe] groupe['deputes'].add(acteur['uid']['#text']) for g in GROUPES.values(): print(g['libelle'], len(g['deputes'])) obj = random.choice(data) publish_amendement(obj, api, client)