import asyncio from typing import Type from mautrix.types import RoomID from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper from maubot import Plugin from pytchat import LiveChatAsync class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("room_id") helper.copy("youtube_channel_id") class YoutubeLivechatBot(Plugin): livechat: LiveChatAsync room: RoomID youtube_channel_id: str async def start(self) -> None: self.on_external_config_update() loop = asyncio.get_event_loop() loop.run_until_complete(self.search_livechats()) async def stop(self) -> None: loop = asyncio.get_event_loop() loop.stop() for livechat in self.livechats.values(): livechat.terminate() def on_external_config_update(self) -> None: self.config.load_and_update() self.room = RoomID(self.config["room_id"]) self.youtube_channel_id = self.config["youtube_channel_id"] @classmethod def get_config_class(cls) -> Type[Config]: return Config async def search_livechats(self): self.livechat = LiveChatAsync(self.youtube_channel_id, callback=lambda chatdata: self.on_youtube_message(chatdata)) async def on_youtube_message(self, chatdata): if not self.livechat.is_alive(): self.livechat.terminate() await chatdata.tick_async() return for c in chatdata.items: await self.client.send_markdown(self.room, "**<{author}>** {message}" .format(author=c.author.name, message=c.message), allow_html=True) await chatdata.tick_async()