import os from typing import Tuple from asgiref.sync import async_to_sync from nio import * class Matrix: """ Utility class to manage interaction with the Matrix homeserver. This log in the @corres2mathbot account (must be created before). The access token is then stored. All is done with this bot account, that is a server administrator. Tasks are normally asynchronous, but for compatibility we make them synchronous. """ _token: str = None _device_id: str = None @classmethod async def _get_client(cls) -> Union[AsyncClient, "FakeMatrixClient"]: """ Retrieve the bot account. If not logged, log in and store access token. """ if not os.getenv("SYNAPSE_PASSWORD"): return FakeMatrixClient() client = AsyncClient("https://correspondances-maths.fr", "@corres2mathbot:correspondances-maths.fr") client.user_id = "@corres2mathbot:correspondances-maths.fr" if os.path.isfile(".matrix_token"): with open(".matrix_device", "r") as f: cls._device_id = f.read().rstrip(" \t\r\n") client.device_id = cls._device_id with open(".matrix_token", "r") as f: cls._token = f.read().rstrip(" \t\r\n") client.access_token = cls._token return client await client.login(password=os.getenv("SYNAPSE_PASSWORD"), device_name="Plateforme") cls._token = client.access_token cls._device_id = client.device_id with open(".matrix_token", "w") as f: f.write(cls._token) with open(".matrix_device", "w") as f: f.write(cls._device_id) return client @classmethod @async_to_sync async def set_display_name(cls, name: str) -> Union[ProfileSetDisplayNameResponse, ProfileSetDisplayNameError]: """ Set the display name of the bot account. """ client = await cls._get_client() return await client.set_displayname(name) @classmethod @async_to_sync async def set_avatar(cls, avatar_url: str) -> Union[ProfileSetAvatarResponse, ProfileSetAvatarError]: """ Set the display avatar of the bot account. """ client = await cls._get_client() return await client.set_avatar(avatar_url) @classmethod @async_to_sync async def upload( cls, data_provider: DataProvider, content_type: str = "application/octet-stream", filename: Optional[str] = None, encrypt: bool = False, monitor: Optional[TransferMonitor] = None, filesize: Optional[int] = None, ) -> Tuple[Union[UploadResponse, UploadError], Optional[Dict[str, Any]]]: """ Upload a file to the content repository. Returns a tuple containing: - Either a `UploadResponse` if the request was successful, or a `UploadError` if there was an error with the request - A dict with file decryption info if encrypt is ``True``, else ``None``. Args: data_provider (Callable, SynchronousFile, AsyncFile): A function returning the data to upload or a file object. File objects must be opened in binary mode (``mode="r+b"``). Callables returning a path string, Path, async iterable or aiofiles open binary file object allow the file data to be read in an asynchronous and lazy way (without reading the entire file into memory). Returning a synchronous iterable or standard open binary file object will still allow the data to be read lazily, but not asynchronously. The function will be called again if the upload fails due to a server timeout, in which case it must restart from the beginning. Callables receive two arguments: the total number of 429 "Too many request" errors that occured, and the total number of server timeout exceptions that occured, thus cleanup operations can be performed for retries if necessary. content_type (str): The content MIME type of the file, e.g. "image/png". Defaults to "application/octet-stream", corresponding to a generic binary file. Custom values are ignored if encrypt is ``True``. filename (str, optional): The file's original name. encrypt (bool): If the file's content should be encrypted, necessary for files that will be sent to encrypted rooms. Defaults to ``False``. monitor (TransferMonitor, optional): If a ``TransferMonitor`` object is passed, it will be updated by this function while uploading. From this object, statistics such as currently transferred bytes or estimated remaining time can be gathered while the upload is running as a task; it also allows for pausing and cancelling. filesize (int, optional): Size in bytes for the file to transfer. If left as ``None``, some servers might refuse the upload. """ client = await cls._get_client() return await client.upload(data_provider, content_type, filename, encrypt, monitor, filesize) @classmethod @async_to_sync async def create_room( cls, visibility: RoomVisibility = RoomVisibility.private, alias: Optional[str] = None, name: Optional[str] = None, topic: Optional[str] = None, room_version: Optional[str] = None, federate: bool = True, is_direct: bool = False, preset: Optional[RoomPreset] = None, invite=(), initial_state=(), power_level_override: Optional[Dict[str, Any]] = None, ) -> Union[RoomCreateResponse, RoomCreateError]: """ Create a new room. Returns either a `RoomCreateResponse` if the request was successful or a `RoomCreateError` if there was an error with the request. Args: visibility (RoomVisibility): whether to have the room published in the server's room directory or not. Defaults to ``RoomVisibility.private``. alias (str, optional): The desired canonical alias local part. For example, if set to "foo" and the room is created on the "example.com" server, the room alias will be "#foo:example.com". name (str, optional): A name to set for the room. topic (str, optional): A topic to set for the room. room_version (str, optional): The room version to set. If not specified, the homeserver will use its default setting. If a version not supported by the homeserver is specified, a 400 ``M_UNSUPPORTED_ROOM_VERSION`` error will be returned. federate (bool): Whether to allow users from other homeservers from joining the room. Defaults to ``True``. Cannot be changed later. is_direct (bool): If this should be considered a direct messaging room. If ``True``, the server will set the ``is_direct`` flag on ``m.room.member events`` sent to the users in ``invite``. Defaults to ``False``. preset (RoomPreset, optional): The selected preset will set various rules for the room. If unspecified, the server will choose a preset from the ``visibility``: ``RoomVisibility.public`` equates to ``RoomPreset.public_chat``, and ``RoomVisibility.private`` equates to a ``RoomPreset.private_chat``. invite (list): A list of user id to invite to the room. initial_state (list): A list of state event dicts to send when the room is created. For example, a room could be made encrypted immediatly by having a ``m.room.encryption`` event dict. power_level_override (dict): A ``m.room.power_levels content`` dict to override the default. The dict will be applied on top of the generated ``m.room.power_levels`` event before it is sent to the room. """ client = await cls._get_client() return await client.room_create( visibility, alias, name, topic, room_version, federate, is_direct, preset, invite, initial_state, power_level_override) @classmethod async def resolve_room_alias(cls, room_alias: str) -> Optional[str]: """ Resolve a room alias to a room ID. Return None if the alias does not exist. """ client = await cls._get_client() resp: RoomResolveAliasResponse = await client.room_resolve_alias(room_alias) if isinstance(resp, RoomResolveAliasResponse): return resp.room_id return None @classmethod @async_to_sync async def invite(cls, room_id: str, user_id: str) -> Union[RoomInviteResponse, RoomInviteError]: """ Invite a user to a room. Returns either a `RoomInviteResponse` if the request was successful or a `RoomInviteError` if there was an error with the request. Args: room_id (str): The room id of the room that the user will be invited to. user_id (str): The user id of the user that should be invited. """ client = await cls._get_client() if room_id.startswith("#"): room_id = await cls.resolve_room_alias(room_id) return await client.room_invite(room_id, user_id) @classmethod @async_to_sync async def kick(cls, room_id: str, user_id: str, reason: str = None) -> Union[RoomKickResponse, RoomInviteError]: """ Kick a user from a room, or withdraw their invitation. Kicking a user adjusts their membership to "leave" with an optional reason. Returns either a `RoomKickResponse` if the request was successful or a `RoomKickError` if there was an error with the request. Args: room_id (str): The room id of the room that the user will be kicked from. user_id (str): The user_id of the user that should be kicked. reason (str, optional): A reason for which the user is kicked. """ client = await cls._get_client() if room_id.startswith("#"): room_id = await cls.resolve_room_alias(room_id) return await client.room_kick(room_id, user_id, reason) @classmethod @async_to_sync async def set_room_power_level(cls, room_id: str, user_id: str, power_level: int)\ -> Union[RoomPutStateResponse, RoomPutStateError]: """ Put a given power level to a user in a certain room. Returns either a `RoomPutStateResponse` if the request was successful or a `RoomPutStateError` if there was an error with the request. Args: room_id (str): The room id of the room where the power level of the user should be updated. user_id (str): The user_id of the user which power level should be updated. power_level (int): The target power level to give. """ client = await cls._get_client() if room_id.startswith("#"): room_id = await cls.resolve_room_alias(room_id) resp = await client.room_get_state_event(room_id, "m.room.power_levels") content = resp.content content["users"][user_id] = power_level return await client.room_put_state(room_id, "m.room.power_levels", content=content, state_key=resp.state_key) @classmethod @async_to_sync async def set_room_power_level_event(cls, room_id: str, event: str, power_level: int)\ -> Union[RoomPutStateResponse, RoomPutStateError]: """ Define the minimal power level to have to send a certain event type in a given room. Returns either a `RoomPutStateResponse` if the request was successful or a `RoomPutStateError` if there was an error with the request. Args: room_id (str): The room id of the room where the power level of the event should be updated. event (str): The event name which minimal power level should be updated. power_level (int): The target power level to give. """ client = await cls._get_client() if room_id.startswith("#"): room_id = await cls.resolve_room_alias(room_id) resp = await client.room_get_state_event(room_id, "m.room.power_levels") content = resp.content if event.startswith("m."): content["events"][event] = power_level else: content[event] = power_level return await client.room_put_state(room_id, "m.room.power_levels", content=content, state_key=resp.state_key) @classmethod @async_to_sync async def set_room_avatar(cls, room_id: str, avatar_uri: str)\ -> Union[RoomPutStateResponse, RoomPutStateError]: """ Define the avatar of a room. Returns either a `RoomPutStateResponse` if the request was successful or a `RoomPutStateError` if there was an error with the request. Args: room_id (str): The room id of the room where the avatar should be changed. avatar_uri (str): The internal avatar URI to apply. """ client = await cls._get_client() if room_id.startswith("#"): room_id = await cls.resolve_room_alias(room_id) return await client.room_put_state(room_id, "m.room.avatar", content={ "url": avatar_uri }, state_key="") class FakeMatrixClient: """ Simulate a Matrix client to run tests, if no Matrix homeserver is connected. """ def __getattribute__(self, item): async def func(*_, **_2): return None return func