import datetime import hashlib from io import BytesIO import json import os.path import random import requests import subprocess from zipfile import ZipFile from tweepy import API, Client, OAuth1UserHandler from . import config LEG = 16 BASE_PATH = os.path.dirname(os.path.dirname(__file__)) def update_scrutins(): url = f"https://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/scrutins/Scrutins.json.zip" md5 = requests.get(f"{url}.md5").content.decode().strip() local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Scrutins.md5") if os.path.isfile(local_md5_file): with open(local_md5_file) as f: local_md5 = f.read().strip() if local_md5 == md5: print("Scrutins publics déjà à jour") return print(f"Mise à jour des scrutins publics de la {LEG}e législature…") for _ in range(3): zip_content = requests.get(url).content if hashlib.md5(zip_content).hexdigest() == md5: break else: print("Mauvaise signature, abandon") exit(1) with ZipFile(BytesIO(zip_content), "r") as zf: for fileinfo in zf.infolist(): basename = os.path.basename(fileinfo.filename) with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Scrutins', basename), 'wb') as f: f.write(zf.read(fileinfo.filename)) with open(local_md5_file, 'w') as f: f.write(md5) print("Mise à jour effectuée") def update_amendements(): url = f"http://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/amendements_div_legis/Amendements.json.zip" md5 = requests.get(f"{url}.md5").content.decode().strip() local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Amendements.md5") if os.path.isfile(local_md5_file): with open(local_md5_file) as f: local_md5 = f.read().strip() if local_md5 == md5: print("Amendements déjà à jour") return print(f"Mise à jour des amendements de la {LEG}e législature…") for _ in range(3): zip_content = requests.get(url).content if hashlib.md5(zip_content).hexdigest() == md5: break else: print("Mauvaise signature, abandon") exit(1) with ZipFile(BytesIO(zip_content), "r") as zf: for fileinfo in zf.infolist(): basename = fileinfo.filename.replace('json/', '') dest = os.path.join(BASE_PATH, 'data', str(LEG), 'Amendements', basename) if not os.path.isdir(os.path.dirname(dest)): os.makedirs(os.path.dirname(dest)) with open(dest, 'wb') as f: f.write(zf.read(fileinfo.filename)) with open(local_md5_file, 'w') as f: f.write(md5) print("Mise à jour effectuée") def publish_amendement(obj, api, client): with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Scrutins', f"VTANR5L16V{obj['scrutin']}.json")) as f: scrutin = json.load(f)['scrutin'] with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Amendements', obj['amendement_path'])) as f: amendement = json.load(f)['amendement'] content = amendement['corps']['contenuAuteur']['dispositif'] date = datetime.date.fromisoformat(scrutin['dateScrutin']) now = datetime.date.today() delta = (now - date).days print(date, delta) syn = scrutin['syntheseVote'] with open('/tmp/img.html', 'w') as f: f.write(content) subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg']) text_media = api.media_upload("/tmp/img.jpg") api.create_media_metadata(text_media.media_id, content[:1000]) # FIXME Faites mieux syn_text = f"

Pour : {syn['decompte']['pour']}

\n" + \ f"

Contre : {syn['decompte']['contre']}

\n" + \ f"

Abstentions : {syn['decompte']['abstentions']}

\n" + \ f"

Exprimés : {syn['suffragesExprimes']}" with open('/tmp/img.html', 'w') as f: f.write(syn_text) subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg']) result_media = api.media_upload("/tmp/img.jpg") api.create_media_metadata(result_media.media_id, syn_text[:1000]) message = obj['message'].format(date=date, nb_jours = delta) client.create_tweet(text=message, media_ids=[result_media.media_id, text_media.media_id])) def main(): auth = OAuth1UserHandler( config.API_KEY, config.API_KEY_SECRET, config.ACCESS_TOKEN, config.ACCESS_TOKEN_SECRET, ) api = API(auth) client = Client( bearer_token=config.BEARER_TOKEN, consumer_key=config.API_KEY, consumer_secret=config.API_KEY_SECRET, access_token=config.ACCESS_TOKEN, access_token_secret=config.ACCESS_TOKEN_SECRET, ) update_scrutins() update_amendements() with open(os.path.join(BASE_PATH, 'data', 'votes.json')) as f: data = json.load(f) obj = random.choice(data) publish_amendement(obj, api, client)