249 lines
9.5 KiB
Python
Executable File
249 lines
9.5 KiB
Python
Executable File
from collections import OrderedDict
|
|
import datetime
|
|
import hashlib
|
|
from io import BytesIO
|
|
import json
|
|
import os.path
|
|
import random
|
|
import requests
|
|
import subprocess
|
|
from zipfile import ZipFile
|
|
|
|
from bs4 import BeautifulSoup
|
|
from tweepy import API, Client, OAuth1UserHandler
|
|
|
|
from . import config
|
|
|
|
|
|
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
|
|
|
|
LEG = 16
|
|
DEPUTES = {}
|
|
GROUPES_ID = [
|
|
"PO800490",
|
|
"PO800502",
|
|
"PO800526",
|
|
"PO800496",
|
|
"PO800538",
|
|
"PO800484",
|
|
"PO800514",
|
|
"PO800532",
|
|
"PO800508",
|
|
"PO800520",
|
|
"PO793087",
|
|
]
|
|
GROUPES = OrderedDict(**{k: None for k in GROUPES_ID})
|
|
|
|
|
|
def update_scrutins():
|
|
url = f"https://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/scrutins/Scrutins.json.zip"
|
|
md5 = requests.get(f"{url}.md5").content.decode().strip()
|
|
|
|
local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Scrutins.md5")
|
|
if os.path.isfile(local_md5_file):
|
|
with open(local_md5_file) as f:
|
|
local_md5 = f.read().strip()
|
|
if local_md5 == md5:
|
|
print("Scrutins publics déjà à jour")
|
|
return
|
|
print(f"Mise à jour des scrutins publics de la {LEG}e législature…")
|
|
|
|
for _ in range(3):
|
|
zip_content = requests.get(url).content
|
|
if hashlib.md5(zip_content).hexdigest() == md5:
|
|
break
|
|
else:
|
|
print("Mauvaise signature, abandon")
|
|
exit(1)
|
|
|
|
with ZipFile(BytesIO(zip_content), "r") as zf:
|
|
for fileinfo in zf.infolist():
|
|
basename = os.path.basename(fileinfo.filename)
|
|
with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Scrutins', basename), 'wb') as f:
|
|
f.write(zf.read(fileinfo.filename))
|
|
|
|
with open(local_md5_file, 'w') as f:
|
|
f.write(md5)
|
|
|
|
print("Mise à jour effectuée")
|
|
|
|
|
|
def update_amendements():
|
|
url = f"http://data.assemblee-nationale.fr/static/openData/repository/{LEG}/loi/amendements_div_legis/Amendements.json.zip"
|
|
md5 = requests.get(f"{url}.md5").content.decode().strip()
|
|
|
|
local_md5_file = os.path.join(BASE_PATH, 'data', str(LEG), "Amendements.md5")
|
|
if os.path.isfile(local_md5_file):
|
|
with open(local_md5_file) as f:
|
|
local_md5 = f.read().strip()
|
|
if local_md5 == md5:
|
|
print("Amendements déjà à jour")
|
|
return
|
|
print(f"Mise à jour des amendements de la {LEG}e législature…")
|
|
|
|
for _ in range(3):
|
|
zip_content = requests.get(url).content
|
|
if hashlib.md5(zip_content).hexdigest() == md5:
|
|
break
|
|
else:
|
|
print("Mauvaise signature, abandon")
|
|
exit(1)
|
|
|
|
with ZipFile(BytesIO(zip_content), "r") as zf:
|
|
for fileinfo in zf.infolist():
|
|
basename = fileinfo.filename.replace('json/', '')
|
|
dest = os.path.join(BASE_PATH, 'data', str(LEG), 'Amendements', basename)
|
|
if not os.path.isdir(os.path.dirname(dest)):
|
|
os.makedirs(os.path.dirname(dest))
|
|
with open(dest, 'wb') as f:
|
|
f.write(zf.read(fileinfo.filename))
|
|
|
|
with open(local_md5_file, 'w') as f:
|
|
f.write(md5)
|
|
|
|
print("Mise à jour effectuée")
|
|
|
|
|
|
def publish_amendement(obj, api, client):
|
|
with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Scrutins', f"VTANR5L16V{obj['scrutin']}.json")) as f:
|
|
scrutin = json.load(f)['scrutin']
|
|
|
|
with open(os.path.join(BASE_PATH, 'data', str(obj['legislature']), 'Amendements', obj['amendement_path'])) as f:
|
|
amendement = json.load(f)['amendement']
|
|
|
|
content = amendement['corps']['contenuAuteur']['dispositif']
|
|
|
|
date = datetime.date.fromisoformat(scrutin['dateScrutin'])
|
|
now = datetime.date.today()
|
|
delta = (now - date).days
|
|
print(date, delta)
|
|
syn = scrutin['syntheseVote']
|
|
|
|
for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']:
|
|
print(g['organeRef'])
|
|
|
|
print(scrutin['ventilationVotes']['organe']['groupes']['groupe'][-1])
|
|
print(sum(len((g['vote']['decompteNominatif']['pours'] or {'votant': []})['votant']) + len((g['vote']['decompteNominatif']['contres'] or {'votant': []})['votant']) for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']))
|
|
|
|
with open('/tmp/img.html', 'w') as f:
|
|
f.write(content)
|
|
|
|
subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg'])
|
|
text_media = api.media_upload("/tmp/img.jpg")
|
|
api.create_media_metadata(text_media.media_id, BeautifulSoup(content.replace('</p>', '</p>\n\n'), features="lxml").get_text()[:1000])
|
|
|
|
vote_groupes = OrderedDict(**{k: None for k in GROUPES_ID})
|
|
for g in scrutin['ventilationVotes']['organe']['groupes']['groupe']:
|
|
vote_groupes[g['organeRef']] = g['vote']
|
|
|
|
# FIXME Faites mieux
|
|
syn_html = '<html lang="fr">\n<head>\n<meta http-equiv="content-type" content="text/html; charset=UTF-8" />\n</head>\n<body>\n'
|
|
syn_html += '<table style="border: 1px solid black;">\n<thead>\n'
|
|
syn_html += f"<tr><th colspan=\"4\"><h2>{scrutin['titre']}</h2></th></tr>\n"
|
|
syn_html += '</thead>\n<tbody>\n'
|
|
for i, (gid, voix) in enumerate(vote_groupes.items()):
|
|
if i % 4 == 0:
|
|
if i != 0:
|
|
syn_html += "</tr>\n"
|
|
syn_html += "<tr>\n"
|
|
syn_html += f"<td style=\"width: 25%; margin: 5px; padding: 1px; border: 2px solid {GROUPES[gid]['couleurAssociee']};\">\n"
|
|
syn_html += f"<strong>{GROUPES[gid]['libelleAbrege']}</strong><br>\n"
|
|
syn_html += "<span style=\"color: green;\">"
|
|
syn_html += int(voix['decompteVoix']['pour']) * " ●"
|
|
syn_html += "</span>"
|
|
syn_html += "<span style=\"color: red;\">"
|
|
syn_html += int(voix['decompteVoix']['contre']) * " ●"
|
|
syn_html += "</span>"
|
|
syn_html += "<span style=\"color: orange;\">"
|
|
syn_html += int(voix['decompteVoix']['abstentions']) * " ●"
|
|
syn_html += "</span>"
|
|
syn_html += "<span style=\"color: gray;\">"
|
|
syn_html += (len(GROUPES[gid]['deputes']) - int(voix['decompteVoix']['pour']) - int(voix['decompteVoix']['contre']) - int(voix['decompteVoix']['abstentions'])) * " ●"
|
|
syn_html += "</span>\n<hr>\n"
|
|
syn_html += f"Pour : {voix['decompteVoix']['pour']}<br>Contre : {voix['decompteVoix']['contre']}<br>Abstentions : {voix['decompteVoix']['abstentions']}\n"
|
|
syn_html += "</td>\n"
|
|
|
|
syn_html += "<td style=\"width: 25%; margin: 8px; padding: 2px; border: 2px solid black;\">\n"
|
|
syn_html += "<strong>Total</strong>\n"
|
|
syn_html += "<hr>\n"
|
|
syn_html += f"Pour : {syn['decompte']['pour']}<br>\n"
|
|
syn_html += f"Contre : {syn['decompte']['contre']}<br>\n"
|
|
syn_html += f"Abstentions : {syn['decompte']['abstentions']}<br>\n"
|
|
if scrutin['sort']['code'] == 'rejeté':
|
|
syn_html += "<span style=\"color: red;\">"
|
|
else:
|
|
syn_html += "<span style=\"color: green;\">"
|
|
syn_html += scrutin['sort']['libelle']
|
|
syn_html += "</span>\n"
|
|
syn_html += "</td>\n"
|
|
syn_html += "</tr>\n"
|
|
syn_html += "</tbody>\n"
|
|
syn_html += f"<tfoot>\n<tr>\n<th colspan=\"4\">\nScrutin n°{scrutin['numero']}, réalisé le {datetime.date.fromisoformat(scrutin['dateScrutin']):%d/%m/%Y}</th>\n</tr>\n</tfoot>\n"
|
|
syn_html += "</table>\n</body>\n</html>"
|
|
|
|
with open('/tmp/img.html', 'w') as f:
|
|
f.write(syn_html)
|
|
|
|
alt_text = "Tableau présentant les votes de l'amendement\n\n" + \
|
|
f"Exprimés : {syn['suffragesExprimes']}\n" + \
|
|
f"Pour : {syn['decompte']['pour']}\n" + \
|
|
f"Contre : {syn['decompte']['contre']}\n" + \
|
|
f"Abstentions : {syn['decompte']['abstentions']}\n\n" + \
|
|
scrutin['sort']['libelle']
|
|
|
|
subprocess.run(['wkhtmltoimage', '/tmp/img.html', '/tmp/img.jpg'])
|
|
result_media = api.media_upload("/tmp/img.jpg")
|
|
api.create_media_metadata(result_media.media_id, alt_text)
|
|
|
|
message = obj['message'].format(date=date, nb_jours=delta)
|
|
|
|
client.create_tweet(text=message, media_ids=[result_media.media_id, text_media.media_id])
|
|
|
|
|
|
def main():
|
|
auth = OAuth1UserHandler(
|
|
config.API_KEY,
|
|
config.API_KEY_SECRET,
|
|
config.ACCESS_TOKEN,
|
|
config.ACCESS_TOKEN_SECRET,
|
|
)
|
|
api = API(auth)
|
|
|
|
client = Client(
|
|
bearer_token=config.BEARER_TOKEN,
|
|
consumer_key=config.API_KEY,
|
|
consumer_secret=config.API_KEY_SECRET,
|
|
access_token=config.ACCESS_TOKEN,
|
|
access_token_secret=config.ACCESS_TOKEN_SECRET,
|
|
)
|
|
|
|
update_scrutins()
|
|
update_amendements()
|
|
|
|
with open(os.path.join(BASE_PATH, 'data', 'votes.json')) as f:
|
|
data = json.load(f)
|
|
|
|
for groupe_id in GROUPES_ID:
|
|
with open(os.path.join(BASE_PATH, 'data', str(LEG), 'Organes', f"{groupe_id}.json")) as f:
|
|
groupe = json.load(f)['organe']
|
|
groupe['deputes'] = set()
|
|
GROUPES[groupe_id] = groupe
|
|
|
|
for filename in os.listdir(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs')):
|
|
with open(os.path.join(os.path.join(BASE_PATH, 'data', str(LEG), 'Acteurs', filename))) as f:
|
|
acteur = json.load(f)['acteur']
|
|
for mandat in acteur['mandats']['mandat']:
|
|
organes = mandat['organes']['organeRef']
|
|
if not isinstance(organes, list):
|
|
organes = [organes]
|
|
for organe in organes:
|
|
if organe in GROUPES:
|
|
groupe = GROUPES[organe]
|
|
groupe['deputes'].add(acteur['uid']['#text'])
|
|
|
|
for g in GROUPES.values():
|
|
print(g['libelle'], len(g['deputes']))
|
|
|
|
obj = random.choice(data)
|
|
publish_amendement(obj, api, client)
|