tfjm-discord-bot/src/cogs/dev.py

360 lines
11 KiB
Python

import asyncio
import re
import traceback
from contextlib import redirect_stdout
from io import StringIO
from pprint import pprint
from textwrap import indent
from typing import Union
import discord
from discord import TextChannel, PermissionOverwrite, Message, ChannelType
from discord.ext.commands import (
command,
has_role,
Bot,
Cog,
ExtensionNotLoaded,
Context,
is_owner,
)
from discord.utils import get
from ptpython.repl import embed
from src.constants import *
from src.core import CustomBot
from src.errors import TfjmError
from src.utils import fg, french_join
COGS_SHORTCUTS = {
"bt": "src.base_tirage",
"c": "src.constants",
"d": "tirages",
"e": "errors",
"m": "misc",
"t": "teams",
"u": "src.utils",
"v": "dev",
}
RE_QUERY = re.compile(
r"^! ?e(val)?[ \n]+(`{1,3}(py(thon)?\n)?)?(?P<query>.*?)\n?(`{1,3})?\n?$", re.DOTALL
)
class DevCog(Cog, name="Dev tools"):
def __init__(self, bot: CustomBot):
self.bot = bot
self.eval_locals = {}
@command(name="interrupt")
@has_role(Role.DEV)
async def interrupt_cmd(self, ctx):
"""
(dev) Ouvre une console là où un @dev m'a lancé. :warning:
A utiliser en dernier recours:
- le bot sera inactif pendant ce temps.
- toutes les commandes seront executées à sa reprise.
"""
await ctx.send(
"J'ai été arrêté et une console interactive a été ouverte là où je tourne. "
"Toutes les commandes rateront tant que cette console est ouverte.\n"
"Soyez rapides, je déteste les opérations à coeur ouvert... :confounded:"
)
# Utility functions
def send(msg, channel=None):
if isinstance(channel, int):
channel = self.bot.get_channel(channel)
channel = channel or ctx.channel
asyncio.create_task(channel.send(msg))
try:
await embed(
globals(), locals(), vi_mode=True, return_asyncio_coroutine=True
)
except EOFError:
pass
await ctx.send("Tout va mieux !")
def full_cog_name(self, name):
name = COGS_SHORTCUTS.get(name, name)
if not "." in name:
name = f"src.cogs.{name}"
return name
@command(
name="reload", aliases=["r"], usage=f"[{'|'.join(COGS_SHORTCUTS.values())}]"
)
@has_role(Role.DEV)
async def reload_cmd(self, ctx, name=None):
"""
(dev) Recharge une catégorie de commandes.
A utiliser quand le code change. Arguments
possibles: `teams`, `tirages`, `dev`.
"""
if name is None:
self.bot.reload()
await ctx.send(":tada: The bot was reloaded !")
return
name = self.full_cog_name(name)
try:
self.bot.reload_extension(name)
except ExtensionNotLoaded:
await ctx.invoke(self.load_cmd, name)
return
except:
await ctx.send(f":grimacing: **{name}** n'a pas pu être rechargée.")
raise
else:
await ctx.send(f":tada: L'extension **{name}** a bien été rechargée.")
@command(name="load", aliases=["l"])
@has_role(Role.DEV)
async def load_cmd(self, ctx, name):
"""
(dev) Ajoute une catégorie de commandes.
Permet d'ajouter dynamiquement un cog sans redémarrer le bot.
"""
name = self.full_cog_name(name)
try:
self.bot.load_extension(name)
except:
await ctx.send(f":grimacing: **{name}** n'a pas pu être chargée.")
raise
else:
await ctx.send(f":tada: L'extension **{name}** a bien été ajoutée !")
# noinspection PyUnreachableCode
@command(name="setup")
@has_role(Role.DEV)
async def setup_roles(self, ctx: Context, *teams: discord.Role):
"""
(dev) Commande temporaire pour setup le serveur.
"""
return
finalist = get(ctx.guild.roles, name=Role.FINALISTE)
assert finalist
for t in teams:
m: discord.Member
for m in t.members:
await m.add_roles(finalist)
await ctx.send(
f"{french_join(t.mention for t in teams)} ont été ajouté en finale !"
)
return
guild: discord.Guild = ctx.guild
nothing = PermissionOverwrite(read_messages=False)
see = PermissionOverwrite(read_messages=True)
# orga = get(guild.roles, name=f"Orga {t}")
for t in TOURNOIS[3:]:
jury = get(guild.roles, name=f"Jury {t}")
for p in "AB":
await guild.create_voice_channel(
f"blabla-jury-poule-{p}",
overwrites={guild.default_role: nothing, jury: see},
category=get(guild.categories, name=t),
)
return
aide: TextChannel = get(guild.text_channels, name="aide")
for t in TOURNOIS:
await aide.set_permissions(orga, overwrite=see)
await aide.set_permissions(jury, overwrite=see)
return
tournois = {
tournoi: get(guild.categories, name=tournoi) for tournoi in TOURNOIS
}
for ch in guild.text_channels:
print(repr(ch.category))
for tournoi, cat in tournois.items():
if tournoi == "Lyon":
continue
jury_channel: TextChannel = get(
guild.text_channels, category=cat, name="cro"
)
await jury_channel.delete()
# jury = get(guild.roles, name=f"Jury {tournoi}")
orga = get(guild.roles, name=f"Orga {tournoi}")
ov = {
guild.default_role: nothing,
# jury: see,
orga: see,
}
await guild.create_text_channel(
f"cro-{tournoi}", category=cat, overwrites=ov
)
await ctx.send(str(jury_channel))
@command(name="send")
@has_role(Role.DEV)
async def send_cmd(self, ctx, *msg):
"""(dev) Envoie un message."""
await ctx.message.delete()
await ctx.send(" ".join(msg))
@command(name="del")
@has_role(Role.CNO)
async def del_range_cmd(self, ctx: Context, id1: Message, id2: Message):
"""
(cno) Supprime les messages entre les deux IDs en argument.
"""
channel: TextChannel = id1.channel
to_delete = [
message async for message in channel.history(before=id1, after=id2)
] + [id1, id2]
await channel.delete_messages(to_delete)
await ctx.message.delete()
async def eval(self, msg: Message) -> discord.Embed:
guild: discord.Guild = msg.guild
roles = guild.roles
members = guild.members
hugs_cog = self.bot.get_cog("Divers")
hugs = hugs_cog.hugs
channel: TextChannel = msg.channel
send = lambda text: asyncio.create_task(channel.send(text))
query = re.match(RE_QUERY, msg.content).group("query")
if not query:
raise TfjmError("No query found.")
if any(word in query for word in ("=", "return", "await", ":", "\n")):
lines = query.splitlines()
if (
"return" not in lines[-1]
and "=" not in lines[-1]
and not lines[-1].startswith(" ")
):
lines[-1] = f"return {lines[-1]}"
query = "\n".join(lines)
full_query = f"""async def query():
try:
{indent(query, " " * 8)}
finally:
self.eval_locals.update(locals())
"""
else:
full_query = query
globs = {**globals(), **locals(), **self.eval_locals}
stdout = StringIO()
try:
with redirect_stdout(stdout):
if "\n" in full_query:
locs = {}
exec(full_query, globs, locs)
resp = await locs["query"]()
else:
resp = eval(query, globs)
except Exception as e:
tb = StringIO()
traceback.print_tb(e.__traceback__, file=tb)
embed = discord.Embed(title=str(e), color=discord.Colour.red())
embed.add_field(
name="Query", value=f"```py\n{full_query}\n```", inline=False
)
embed.add_field(
name="Traceback", value=self.to_field_value(tb), inline=False
)
else:
out = StringIO()
pprint(resp, out)
embed = discord.Embed(title="Result", color=discord.Colour.green())
embed.add_field(name="Query", value=f"```py\n{full_query}```", inline=False)
value = self.to_field_value(out)
if resp is not None and value:
embed.add_field(name="Value", value=value, inline=False)
stdout = self.to_field_value(stdout)
if stdout:
embed.add_field(name="Standard output", value=stdout, inline=False)
embed.set_footer(text="You may edit your message.")
return embed
def to_field_value(self, string: Union[str, StringIO]):
if isinstance(string, StringIO):
string.seek(0)
string = string.read()
if not string:
return
if len(string) > 1000:
string = string[:500] + "\n...\n" + string[-500:]
return f"```py\n{string}```"
@command(name="eval", aliases=["e"])
@is_owner()
async def eval_cmd(self, ctx: Context):
"""(dev) Evalue l'entrée."""
self.eval_locals["ctx"] = ctx
embed = await self.eval(ctx.message)
resp = await ctx.send(embed=embed)
def check(before, after):
return after.id == ctx.message.id
while True:
try:
before, after = await self.bot.wait_for(
"message_edit", check=check, timeout=600
)
except asyncio.TimeoutError:
break
embed = await self.eval(after)
await resp.edit(embed=embed)
# Remove the "You may edit your message"
embed.set_footer()
try:
await resp.edit(embed=embed)
except discord.NotFound:
pass
@Cog.listener()
async def on_message(self, msg: Message):
ch: TextChannel = msg.channel
if ch.type == ChannelType.private:
m = f"""{fg(msg.author.name)}: {msg.content}
MSG_ID: {fg(msg.id, 0x03A678)}
CHA_ID: {fg(msg.channel.id, 0x03A678)}"""
print(m)
def setup(bot: CustomBot):
bot.add_cog(DevCog(bot))