🎉 First working version

Signed-off-by: ynerant <ynerant@zamokv5.crans.org>
This commit is contained in:
ynerant 2021-05-11 12:30:45 +02:00
commit 5e6834c749
4 changed files with 395 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__

137
chronodose.py Executable file
View File

@ -0,0 +1,137 @@
#!/usr/bin/env python3
from dataclasses import dataclass
from datetime import date
from math import acos, cos, pi, sin
import requests
from threading import Thread
from time import sleep
from irc import IRCClient
@dataclass
class Location:
longitude: float = 0.0
latitude: float = 0.0
city: str = ""
def distance(self, other: "Location") -> float:
earth_radius = 6378
phi_a, phi_b = self.latitude * pi / 180, other.latitude * pi / 180
lambda_a, lambda_b = self.longitude * pi / 180, other.longitude * pi / 180
unit_dist = acos(sin(phi_a) * sin(phi_b) \
+ cos(phi_a) * cos(phi_b) * cos(lambda_b - lambda_a))
return earth_radius * unit_dist
@dataclass
class CentreMetadata:
address: str = ""
phone_number: str = ""
business_hours: dict = None
@dataclass
class Centre:
departement: str = ""
nom: str = ""
url: str = ""
location: Location = None
metadata: CentreMetadata = None
prochain_rdv: str = ""
plateforme: str = "Doctolib"
type: str = "vaccination-center"
appointment_count: int = 0
internal_id: str = ""
vaccine_type: list[str] = None
appointment_by_phone_only: bool = False
erreur: any = None
last_scan_with_availabilities: str = ""
appointment_schedules: list[dict] = None
gid: str = ""
def check_dpt(dpt_number: int, position: Location, radius: int = 20):
"""
Recherche de rendez-vous disponibles pour les majeurs non-prioritaires
dans le département indiqué.
Renvoie une liste de couples (centre, nombre de doses dispo).
"""
res = requests.get(f'https://vitemadose.gitlab.io/vitemadose/{dpt_number}.json').json()
last_update = res['last_updated']
centres_dispo = res['centres_disponibles']
centres_indispo = res['centres_indisponibles']
print(len(centres_dispo), "centres disponibles sur", len(centres_indispo), "dans le", dpt_number)
places = []
for centre in centres_dispo:
centre = Centre(**centre)
centre.location = Location(**centre.location)
centre.metadata = CentreMetadata(**centre.metadata)
if centre.location.distance(position) > radius:
# Centre trop loin
continue
for schedule in centre.appointment_schedules:
if schedule['name'] == 'chronodose':
if schedule['total']:
# Places dispo en chronodose
places.append((centre, schedule['total']))
return places
def main():
gif = Location(latitude=48.7090418, longitude=2.1648068, city="Gif-sur-Yvette")
lyon = Location(latitude=45.7579502, longitude=4.8001017, city="Lyon")
chambéry = Location(latitude=45.5822142, longitude=5.8713341, city="Chambéry")
nantes = Location(latitude=47.2382007, longitude=-1.6300954, city="Nantes")
marseille = Location(latitude=43.2803692, longitude=5.3104571, city="Marseille")
irc_client = IRCClient('irc.crans.org', 'chronodose')
Thread(target=irc_client.start).start()
sleep(10)
irc_client.join('#chronodose')
irc_client.privmsg('#chronodose', 'coucou')
already_indicated = []
def msg(*mesg: str) -> None:
# Afficher un message dans la console et sur IRC
print(*mesg)
irc_client.privmsg('#chronodose', ' '.join(str(a) for a in mesg))
while True:
places = {}
for dpt, ville in [(91, gif), (92, gif), (94, gif), (78, gif), (69, lyon),
(73, chambéry), (44, nantes), (13, marseille)]:
places[dpt] = check_dpt(dpt, ville)
for dpt, places in places.items():
if not places:
print("Pas de dose disponible dans le", dpt)
continue
print(sum(place[1] for place in places), "doses disponibles dans le", dpt)
for centre, count in places:
if (centre.internal_id, date.today()) in already_indicated:
# Message déjà envoyé, on spam pas
continue
already_indicated.append((centre.internal_id, date.today()))
msg(count, "doses dans le centre de", centre.nom)
msg("Type de vaccin :", ", ".join(centre.vaccine_type))
msg(centre.metadata.address, centre.metadata.phone_number)
msg("Réserver sur", centre.url)
msg(" ")
# 5 minutes
sleep(300)
if __name__ == '__main__':
main()

20
codes.py Normal file
View File

@ -0,0 +1,20 @@
from enum import Enum
class Code(Enum):
CAP = 'CAP'
INVITE = 'INVITE'
JOIN = 'JOIN'
NICK = 'NICK'
NOTICE = 'NOTICE'
PASS = 'PASS'
PING = 'PING'
PONG = 'PONG'
PRIVMSG = 'PRIVMSG'
USER = 'USER'
WHOIS = 'WHOIS'
RPL_WELCOME = 1
RPL_WHOISREGNICK = 307
RPL_ENDOFWHOIS = 318
RPL_WHOISACCOUNT = 330
ERR_NICKNAMEINUSE = 433
ERR_ALREADYREGISTRED = 462

237
irc.py Normal file
View File

@ -0,0 +1,237 @@
import re
import socket
import ssl
import sys
import threading
from codes import Code
class IRCClient:
def __init__(self, server, nickname, port=None, tls=False, username=None, realname=None, encoding='utf-8', errors='ignore', capabilities=None):
self.server = server
self.nickname = nickname
if not self.nickname:
raise ValueError("Nick must not be empty")
if port is None:
if tls:
self.port = 6697
else:
self.port = 6667
else:
self.port = port
self.tls = tls
if not username:
self.username = self.nickname
else:
self.username = username
if not realname:
self.realname = self.username
else:
self.realname = realname
self.encoding = encoding
self.errors = errors
if capabilities is None:
capabilities = []
self.capabilities = capabilities
self.enabled_capabilities = []
self.socket_mutex = threading.Lock()
def start(self):
self.socket = socket.create_connection((self.server, self.port))
if self.tls:
self.raw_socket = self.socket
context = ssl.create_default_context()
self.socket = context.wrap_socket(self.raw_socket, server_hostname=self.server)
if self.capabilities:
self.cap_ls(True)
self.nick(self.nickname)
self.user(self.username, self.realname)
threads = []
while True:
data = b''
while not data or data[-1] != ord('\n'):
data += self.socket.recv(4096)
data = data.decode(self.encoding, self.errors).split('\r\n')
for command in data:
if command:
thread = threading.Thread(target=self.process_command, args=(command,))
thread.start()
threads.append(thread)
@staticmethod
def parse_command_params(params):
result = []
last_param = 0
for i, char in enumerate(params):
if char == ' ':
if params[i+1] == ':':
result.append(params[last_param:i])
result.append(params[i+2:])
last_param = len(params)
break
else:
result.append(params[last_param:i])
last_param = i+1
elif i == 0 and char == ':':
result.append(params[i+1:])
last_param = len(params)
break
if last_param != len(params):
result.append(params[last_param:])
return result
@staticmethod
def parse_tags(tags):
if tags[0] != '@':
raise ValueError("Invalid tags")
result = {}
tags = tags[1:].split(';')
for tag in tags:
if '=' in tag:
tag = tag.split('=', 1)
result[tag[0]] = tag[1]
else:
result[tag] = None
return result
def process_command(self, command):
match = re.match(r'^(?P<tags>@(?:(?:\+?(?:[0-9A-Za-z.-]+/)?[0-9A-Za-z-]+)(?:=[^\x00\r\n; ]+)?)?(?:;(?:\+?(?:[0-9A-Za-z.-]+/)?[0-9A-Za-z-]+)(?:=[^\x00\r\n; ]+)?)*)? *(?::(?P<target>[^ ]*))? *(?P<command>[a-zA-Z]+|[0-9]{3}) *(?:(?P<params>.*?))?$', command)
if match is None:
print('Unknown command:', command, file=sys.stderr)
return
tags = match.group('tags')
if tags is not None:
tags = self.parse_tags(tags)
else:
tags = {}
target = match.group('target')
command = match.group('command')
params = self.parse_command_params(match.group('params'))
print('<=', tags, target, command, params)
if command.isnumeric():
command = int(command)
try:
command = Code(command)
except ValueError:
self.on_command(command, *params, target=target, tags=tags)
if command == Code.CAP:
if params[1] == 'ACK':
self.on_cap_ack(params[2], target=target, tags=tags)
elif params[1] == 'LS':
self.on_cap_ls(params[2], target=target, tags=tags)
elif command == Code.INVITE:
self.on_invite(*params, target=target, tags=tags)
elif command == Code.JOIN:
self.on_join(*params, target=target, tags=tags)
elif command == Code.PING:
self.on_ping(*params, target=target, tags=tags)
elif command == Code.PRIVMSG:
self.on_privmsg(*params, target=target, tags=tags)
elif command == Code.RPL_ENDOFWHOIS:
self.on_endofwhois(*params, target=target, tags=tags)
elif command == Code.RPL_WELCOME:
self.on_welcome(*params, target=target, tags=tags)
elif command == Code.RPL_WHOISACCOUNT:
self.on_whoisaccount(*params, target=target, tags=tags)
elif command == Code.RPL_WHOISREGNICK:
self.on_whoisregnick(*params, target=target, tags=tags)
def cap_end(self):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.CAP.value} END\r\n'.encode(self.encoding))
self.socket_mutex.release()
def cap_ls(self, _302=False):
self.socket_mutex.acquire()
if self.capabilities:
if _302:
self.socket.sendall(f'{Code.CAP.value} LS 302\r\n'.encode(self.encoding))
else:
self.socket.sendall(f'{Code.CAP.value} LS\r\n'.encode(self.encoding))
self.socket_mutex.release()
def cap_req(self, capabilities=None):
self.socket_mutex.acquire()
if capabilities is None:
capabilities = self.capabilities
capabilities = ' '.join(capabilities)
self.socket.sendall(f'{Code.CAP.value} REQ :{capabilities}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def invite(self, nickname, channel):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.INVITE.value} {nickname} {channel}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def join(self, channel):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.JOIN.value} {channel}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def nick(self, nickname):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.NICK.value} {nickname}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def pong(self, data):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.PONG.value} :{data}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def privmsg(self, channel, message):
self.socket_mutex.acquire()
messages = message.split('\n')
for message in messages:
self.socket.sendall(f'{Code.PRIVMSG.value} {channel} :{message}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def user(self, username, realname, mode=0):
self.socket_mutex.acquire()
self.socket.sendall(f'{Code.USER.value} {username} {mode} * :{realname}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def whois(self, *masks, target=None):
self.socket_mutex.acquire()
masks = ' '.join(masks)
if target is None:
self.socket.sendall(f'{Code.WHOIS.value} {masks}\r\n'.encode(self.encoding))
else:
self.socket.sendall(f'{Code.WHOIS.value} {target} {masks}\r\n'.encode(self.encoding))
self.socket_mutex.release()
def on_cap_ack(self, capabilities, target, tags):
self.enabled_capabilities = capabilities.split()
self.cap_end()
def on_cap_ls(self, capabilities, target, tags):
capabilities = capabilities.split()
self.enabled_capabilities = [capability for capability in self.capabilities if capability in capabilities]
self.cap_req(self.enabled_capabilities)
def on_endofwhois(self, nickname, whoisnickname, info, target, tags):
pass
def on_invite(self, nickname, channel, target, tags):
pass
def on_join(self, channel, accountname='*', realname=None, target=None, tags=None):
pass
def on_ping(self, data, target, tags):
self.pong(data)
def on_privmsg(self, channel, message, target, tags):
pass
def on_welcome(self, nickname, reply, target, tags):
pass
def on_whoisaccount(self, nickname, whoisnickname, account, info, target, tags):
pass
def on_whoisregnick(self, nickname, whoisnickname, reply, target, tags):
pass
def on_command(code, *params, target=None, tags=None):
pass