mirror of
https://gitlab.com/ddorn/tfjm-discord-bot.git
synced 2025-07-07 16:28:32 +02:00
🚧 WIP: better and more flexible tirages
This commit is contained in:
264
src/base_tirage.py
Normal file
264
src/base_tirage.py
Normal file
@ -0,0 +1,264 @@
|
||||
import asyncio
|
||||
import random
|
||||
from pprint import pprint
|
||||
|
||||
from io import StringIO
|
||||
from typing import Type, Union, Dict, List
|
||||
|
||||
import discord
|
||||
|
||||
from src.constants import *
|
||||
|
||||
|
||||
class Event(asyncio.Event):
|
||||
def __init__(self, team: str, value: Union[bool, int, str]):
|
||||
super(Event, self).__init__()
|
||||
self.value = value
|
||||
self.team = team
|
||||
self.response = None
|
||||
|
||||
|
||||
class Team:
|
||||
yaml_tag = "Team"
|
||||
|
||||
def __init__(self, team_role):
|
||||
self.name = team_role.name
|
||||
self.mention = team_role.mention
|
||||
|
||||
self.accepted_problems = [None, None]
|
||||
self.rejected = [set(), set()]
|
||||
|
||||
def __str__(self):
|
||||
s = StringIO()
|
||||
pprint(self.__dict__, stream=s)
|
||||
s.seek(0)
|
||||
return s.read()
|
||||
|
||||
__repr__ = __str__
|
||||
|
||||
def coeff(self, round):
|
||||
if len(self.rejected[round]) <= MAX_REFUSE:
|
||||
return 2
|
||||
else:
|
||||
return 2 - 0.5 * (len(self.rejected[round]) - MAX_REFUSE)
|
||||
|
||||
def details(self, round):
|
||||
|
||||
info = {
|
||||
# "Accepté": self.accepted_problems[round],
|
||||
"Refusés": ", ".join(p[0] for p in self.rejected[round])
|
||||
if self.rejected[round]
|
||||
else "aucun",
|
||||
"Coefficient": self.coeff(round),
|
||||
# "Ordre passage": self.passage_order[round],
|
||||
}
|
||||
|
||||
width = max(map(len, info))
|
||||
|
||||
return "\n".join(f"`{n.rjust(width)}`: {v}" for n, v in info.items())
|
||||
|
||||
return f""" - Accepté: {self.accepted_problems[round]}
|
||||
- Refusés: {", ".join(p[0] for p in self.rejected[round]) if self.rejected[round] else "aucun"}
|
||||
- Coefficient: {self.coeff(round)}
|
||||
- Ordre au tirage: {self.tirage_order[round]}
|
||||
- Ordre de passage: {self.passage_order[round]}
|
||||
"""
|
||||
|
||||
|
||||
class Poule:
|
||||
def __init__(self, poule, rnd):
|
||||
self.poule = poule
|
||||
self.rnd = rnd
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.poule}{self.rnd + 1}"
|
||||
|
||||
|
||||
class BaseTirage:
|
||||
def __init__(self, *teams: discord.Role, fmt=(3, 3)):
|
||||
assert sum(fmt) == len(teams)
|
||||
|
||||
self.teams: Dict[str, Team] = {t.name: Team(t) for t in teams}
|
||||
self.format = fmt
|
||||
self.queue = asyncio.Queue()
|
||||
self.poules: Dict[Poule, List[str]] = {}
|
||||
"""A mapping between the poule name and the list of teams in this poule."""
|
||||
|
||||
async def event(self, event: Event):
|
||||
event.set()
|
||||
await self.queue.put(event)
|
||||
await event.wait()
|
||||
return event.response
|
||||
|
||||
async def dice(self, trigram):
|
||||
return await self.event(Event(trigram, random.randint(1, 100)))
|
||||
|
||||
async def rproblem(self, trigram):
|
||||
team = self.teams[trigram]
|
||||
rnd = 0 if team.accepted_problems[0] is None else 1
|
||||
for poule, teams in self.poules.items():
|
||||
if trigram in teams and poule.rnd == rnd:
|
||||
break
|
||||
else:
|
||||
return await self.warn_wrong_team(None, trigram)
|
||||
|
||||
other_pbs = [self.teams[team].accepted_problems[rnd] for team in teams]
|
||||
available = [
|
||||
pb
|
||||
for pb in PROBLEMS
|
||||
if pb not in team.accepted_problems and pb not in other_pbs
|
||||
]
|
||||
return await self.event(Event(trigram, random.choice(available)))
|
||||
|
||||
async def next(self, typ, team=None):
|
||||
while True:
|
||||
event = await self.queue.get()
|
||||
if team is not None and event.team != team:
|
||||
await self.warn_wrong_team(team, event.team)
|
||||
elif not isinstance(event.value, typ):
|
||||
await self.warn_unwanted(typ, event.value)
|
||||
else:
|
||||
event.clear()
|
||||
return event
|
||||
event.clear()
|
||||
|
||||
async def run(self):
|
||||
|
||||
await self.info_start()
|
||||
|
||||
self.poules = await self.make_poules()
|
||||
|
||||
for poule in self.poules:
|
||||
await self.draw_poule(poule)
|
||||
|
||||
await self.info_finish()
|
||||
|
||||
async def get_dices(self, teams):
|
||||
dices = {t: None for t in teams}
|
||||
collisions = list(teams)
|
||||
while collisions:
|
||||
|
||||
for t in collisions:
|
||||
dices[t] = None
|
||||
collisions = []
|
||||
|
||||
while None in dices.values():
|
||||
event = await self.next(int)
|
||||
|
||||
# TODO: avoid KeyError
|
||||
if dices[event.team] is None:
|
||||
dices[event.team] = event.value
|
||||
else:
|
||||
await self.warn_twice(int)
|
||||
|
||||
if collisions:
|
||||
await self.warn_colisions(collisions)
|
||||
return dices
|
||||
|
||||
async def make_poules(self):
|
||||
poules = {}
|
||||
for rnd in (0, 1):
|
||||
await self.start_make_poule(rnd)
|
||||
|
||||
dices = await self.get_dices(self.teams)
|
||||
sorted_teams = sorted(self.teams, key=lambda t: dices[t])
|
||||
|
||||
idx = 0
|
||||
for i, qte in enumerate(self.format):
|
||||
letter = chr(ord("A") + i)
|
||||
poules[Poule(letter, rnd)] = sorted_teams[idx : idx + qte]
|
||||
idx += qte
|
||||
|
||||
await self.annonce_poules(poules)
|
||||
return poules
|
||||
|
||||
async def draw_poule(self, poule):
|
||||
# Trigrams in draw order
|
||||
trigrams = await self.draw_order(poule)
|
||||
|
||||
# Teams in draw order
|
||||
teams = [self.teams[tri] for tri in trigrams]
|
||||
current = 0
|
||||
while not all(team.accepted_problems[poule.rnd] for team in teams):
|
||||
team = teams[current]
|
||||
if team.accepted_problems[poule.rnd] is not None:
|
||||
# The team already accepted a problem
|
||||
current += 1
|
||||
continue
|
||||
|
||||
# Choose problem
|
||||
await self.start_select_pb(team)
|
||||
event = await self.next(str, team.name)
|
||||
# TODO: Add check for already selected / taken by someone else
|
||||
# This is not a bug for now, since it cannot happen yet
|
||||
await self.info_draw_pb(team, event.value, rnd)
|
||||
|
||||
# Accept it
|
||||
accept = await self.next(bool, team.name)
|
||||
if accept:
|
||||
team.accepted_problems[poule.rnd] = event.value
|
||||
await self.info_accepted(team, event.value)
|
||||
else:
|
||||
await self.info_rejected(team, event.value, rnd=poule.rnd)
|
||||
team.rejected[poule.rnd].add(event.value)
|
||||
|
||||
current += 1
|
||||
|
||||
await self.annonce_poule(poule)
|
||||
|
||||
async def draw_order(self, poule):
|
||||
await self.start_draw_order(poule)
|
||||
|
||||
teams = self.poules[poule]
|
||||
dices = await self.get_dices(teams)
|
||||
|
||||
order = sorted(self.teams, key=lambda t: dices[t], reverse=True)
|
||||
|
||||
await self.annonce_draw_order(order)
|
||||
return order
|
||||
|
||||
async def warn_unwanted(self, wanted: Type, got: Type):
|
||||
"""Called when a event of an unwanted type occurs."""
|
||||
|
||||
async def warn_wrong_team(self, expected, got):
|
||||
"""Called when a team that should not play now put an event"""
|
||||
|
||||
async def warn_colisions(self, collisions: List[str]):
|
||||
"""Called when there are collisions in a dice tirage."""
|
||||
|
||||
async def warn_twice(self, typ: Type):
|
||||
"""Called when an event appears once again and not wanted."""
|
||||
|
||||
async def start_make_poule(self, rnd):
|
||||
"""Called when it starts drawing the poules for round `rnd`"""
|
||||
|
||||
async def start_draw_order(self, poule):
|
||||
"""Called when we start to draw the order."""
|
||||
|
||||
async def start_select_pb(self, team):
|
||||
"""Called when a team needs to select a problem."""
|
||||
|
||||
async def annonce_poules(self, poules):
|
||||
"""Called when all poules are defined."""
|
||||
|
||||
async def annonce_draw_order(self, order):
|
||||
"""Called when the drawing order is defined."""
|
||||
|
||||
async def annonce_poule(self, poule):
|
||||
"""Called when the problems and order for a poule is known."""
|
||||
|
||||
async def info_start(self):
|
||||
"""Called at the start of the tirage."""
|
||||
|
||||
async def info_finish(self):
|
||||
"""Called when the tirage has ended."""
|
||||
|
||||
async def info_draw_pb(self, team, pb, rnd):
|
||||
"""Called when a team draws a problem."""
|
||||
|
||||
async def info_accepted(self, team, pb):
|
||||
"""Called when a team accepts a problem."""
|
||||
|
||||
async def info_rejected(self, team, pb, rnd):
|
||||
"""Called when a team rejects a problem,
|
||||
before it is added to the rejected set."""
|
Reference in New Issue
Block a user