procrasturgence/procrasturgence/procrasturgence.py

249 lines
9.5 KiB
Python
Executable File

from collections import OrderedDict
import datetime
import hashlib
from io import BytesIO
import json
import os.path
import random
import requests
import subprocess
from zipfile import ZipFile
from bs4 import BeautifulSoup
from tweepy import API, Client, OAuth1UserHandler
from . import config
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
LEG = 16
DEPUTES = {}
GROUPES_ID = [
"PO800490",
"PO800502",
"PO800526",
"PO800496",
"PO800538",
"PO800484",
"PO800514",
"PO800532",
"PO800508",
"PO800520",
"PO793087",
]
GROUPES = OrderedDict(**{k: None for k in GROUPES_ID})
def update_scrutins():
url = f"https://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/scrutins/Scrutins.json.zip"
md5 = requests.get(f"{url}.md5").content.decode().strip()
local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Scrutins.md5")
if os.path.isfile(local_md5_file):
with open(local_md5_file) as f:
local_md5 = f.read().strip()
if local_md5 == md5:
print("Scrutins publics déjà à jour")
return
print(f"Mise à jour des scrutins publics de la {LEG}e législature…")
for _ in range(3):
zip_content = requests.get(url).content
if hashlib.md5(zip_content).hexdigest() == md5:
break
else:
print("Mauvaise signature, abandon")
exit(1)
with ZipFile(BytesIO(zip_content), "r") as zf:
for fileinfo in zf.infolist():
basename = os.path.basename(fileinfo.filename)
with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Scrutins', basename), 'wb') as f:
f.write(zf.read(fileinfo.filename))
with open(local_md5_file, 'w') as f:
f.write(md5)
print("Mise à jour effectuée")
def update_amendements():
url = f"http://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/amendements_div_legis/Amendements.json.zip"
md5 = requests.get(f"{url}.md5").content.decode().strip()
local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Amendements.md5")
if os.path.isfile(local_md5_file):
with open(local_md5_file) as f:
local_md5 = f.read().strip()
if local_md5 == md5:
print("Amendements déjà à jour")
return
print(f"Mise à jour des amendements de la {LEG}e législature…")
for _ in range(3):
zip_content = requests.get(url).content
if hashlib.md5(zip_content).hexdigest() == md5:
break
else:
print("Mauvaise signature, abandon")
exit(1)
with ZipFile(BytesIO(zip_content), "r") as zf:
for fileinfo in zf.infolist():
basename = fileinfo.filename.replace('json/', '')
dest = os.path.join(BASE_PATH, 'data', str(LEG), 'Amendements', basename)
if not os.path.isdir(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
with open(dest, 'wb') as f:
f.write(zf.read(fileinfo.filename))
with open(local_md5_file, 'w') as f:
f.write(md5)
print("Mise à jour effectuée")
def publish_amendement(obj, api, client):
with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Scrutins', f"VTANR5L16V{obj['scrutin']}.json")) as f:
scrutin = json.load(f)['scrutin']
with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Amendements', obj['amendement_path'])) as f:
amendement = json.load(f)['amendement']
content = amendement['corps']['contenuAuteur']['dispositif']
date = datetime.date.fromisoformat(scrutin['dateScrutin'])
now = datetime.date.today()
delta = (now - date).days
print(date, delta)
syn = scrutin['syntheseVote']
for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']:
print(g['organeRef'])
print(scrutin['ventilationVotes']['organe']['groupes']['groupe'][-1])
print(sum(len((g['vote']['decompteNominatif']['pours'] or {'votant': []})['votant']) + len((g['vote']['decompteNominatif']['contres'] or {'votant': []})['votant']) for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']))
with open('/tmp/img.html', 'w') as f:
f.write(content)
subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg'])
text_media = api.media_upload("/tmp/img.jpg")
api.create_media_metadata(text_media.media_id, BeautifulSoup(content.replace('</p>', '</p>\n\n'), features="lxml").get_text()[:1000])
vote_groupes = OrderedDict(**{k: None for k in GROUPES_ID})
for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']:
vote_groupes[g['organeRef']] = g['vote']
# FIXME Faites mieux
syn_html = '<html lang="fr">\n<head>\n<meta http-equiv="content-type" content="text/html; charset=UTF-8" />\n</head>\n<body>\n'
syn_html += '<table style="border: 1px solid black;">\n<thead>\n'
syn_html += f"<tr><th colspan=\"4\"><h2>{scrutin['titre']}</h2></th></tr>\n"
syn_html += '</thead>\n<tbody>\n'
for i, (gid, voix) in enumerate(vote_groupes.items()):
if i % 4 == 0:
if i != 0:
syn_html += "</tr>\n"
syn_html += "<tr>\n"
syn_html += f"<td style=\"width: 25%; margin: 5px; padding: 1px; border: 2px solid {GROUPES[gid]['couleurAssociee']};\">\n"
syn_html += f"<strong>{GROUPES[gid]['libelleAbrege']}</strong><br>\n"
syn_html += "<span style=\"color: green;\">"
syn_html += int(voix['decompteVoix']['pour']) * " &#9679;"
syn_html += "</span>"
syn_html += "<span style=\"color: red;\">"
syn_html += int(voix['decompteVoix']['contre']) * " &#9679;"
syn_html += "</span>"
syn_html += "<span style=\"color: orange;\">"
syn_html += int(voix['decompteVoix']['abstentions']) * " &#9679;"
syn_html += "</span>"
syn_html += "<span style=\"color: gray;\">"
syn_html += (len(GROUPES[gid]['deputes']) - int(voix['decompteVoix']['pour']) - int(voix['decompteVoix']['contre']) - int(voix['decompteVoix']['abstentions'])) * " &#9679;"
syn_html += "</span>\n<hr>\n"
syn_html += f"Pour : {voix['decompteVoix']['pour']}<br>Contre : {voix['decompteVoix']['contre']}<br>Abstentions : {voix['decompteVoix']['abstentions']}\n"
syn_html += "</td>\n"
syn_html += "<td style=\"width: 25%; margin: 8px; padding: 2px; border: 2px solid black;\">\n"
syn_html += "<strong>Total</strong>\n"
syn_html += "<hr>\n"
syn_html += f"Pour : {syn['decompte']['pour']}<br>\n"
syn_html += f"Contre : {syn['decompte']['contre']}<br>\n"
syn_html += f"Abstentions : {syn['decompte']['abstentions']}<br>\n"
if scrutin['sort']['code'] == 'rejeté':
syn_html += "<span style=\"color: red;\">"
else:
syn_html += "<span style=\"color: green;\">"
syn_html += scrutin['sort']['libelle']
syn_html += "</span>\n"
syn_html += "</td>\n"
syn_html += "</tr>\n"
syn_html += "</tbody>\n"
syn_html += f"<tfoot>\n<tr>\n<th colspan=\"4\">\nScrutin n°{scrutin['numero']}, réalisé le {datetime.date.fromisoformat(scrutin['dateScrutin']):%d/%m/%Y}</th>\n</tr>\n</tfoot>\n"
syn_html += "</table>\n</body>\n</html>"
with open('/tmp/img.html', 'w') as f:
f.write(syn_html)
alt_text = "Tableau présentant les votes de l'amendement\n\n" + \
f"Exprimés : {syn['suffragesExprimes']}\n" + \
f"Pour : {syn['decompte']['pour']}\n" + \
f"Contre : {syn['decompte']['contre']}\n" + \
f"Abstentions : {syn['decompte']['abstentions']}\n\n" + \
scrutin['sort']['libelle']
subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg'])
result_media = api.media_upload("/tmp/img.jpg")
api.create_media_metadata(result_media.media_id, alt_text)
message = obj['message'].format(date=date, nb_jours=delta)
client.create_tweet(text=message, media_ids=[result_media.media_id, text_media.media_id])
def main():
auth = OAuth1UserHandler(
config.API_KEY,
config.API_KEY_SECRET,
config.ACCESS_TOKEN,
config.ACCESS_TOKEN_SECRET,
)
api = API(auth)
client = Client(
bearer_token=config.BEARER_TOKEN,
consumer_key=config.API_KEY,
consumer_secret=config.API_KEY_SECRET,
access_token=config.ACCESS_TOKEN,
access_token_secret=config.ACCESS_TOKEN_SECRET,
)
update_scrutins()
update_amendements()
with open(os.path.join(BASE_PATH, 'data', 'votes.json')) as f:
data = json.load(f)
for groupe_id in GROUPES_ID:
with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Organes', f"{groupe_id}.json")) as f:
groupe = json.load(f)['organe']
groupe['deputes'] = set()
GROUPES[groupe_id] = groupe
for filename in os.listdir(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs')):
with open(os.path.join(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs', filename))) as f:
acteur = json.load(f)['acteur']
for mandat in acteur['mandats']['mandat']:
organes = mandat['organes']['organeRef']
if not isinstance(organes, list):
organes = [organes]
for organe in organes:
if organe in GROUPES:
groupe = GROUPES[organe]
groupe['deputes'].add(acteur['uid']['#text'])
for g in GROUPES.values():
print(g['libelle'], len(g['deputes']))
obj = random.choice(data)
publish_amendement(obj, api, client)