plateforme-corres2math/corres2math/matrix.py

351 lines
14 KiB
Python

import os
from typing import Tuple
from asgiref.sync import async_to_sync
from nio import *
class Matrix:
"""
Utility class to manage interaction with the Matrix homeserver.
This log in the @corres2mathbot account (must be created before).
The access token is then stored.
All is done with this bot account, that is a server administrator.
Tasks are normally asynchronous, but for compatibility we make
them synchronous.
"""
_token: str = None
_device_id: str = None
@classmethod
async def _get_client(cls) -> Union[AsyncClient, "FakeMatrixClient"]:
"""
Retrieve the bot account.
If not logged, log in and store access token.
"""
if not os.getenv("SYNAPSE_PASSWORD"):
return FakeMatrixClient()
client = AsyncClient("https://correspondances-maths.fr", "@corres2mathbot:correspondances-maths.fr")
client.user_id = "@corres2mathbot:correspondances-maths.fr"
if os.path.isfile(".matrix_token"):
with open(".matrix_device", "r") as f:
cls._device_id = f.read().rstrip(" \t\r\n")
client.device_id = cls._device_id
with open(".matrix_token", "r") as f:
cls._token = f.read().rstrip(" \t\r\n")
client.access_token = cls._token
return client
await client.login(password=os.getenv("SYNAPSE_PASSWORD"), device_name="Plateforme")
cls._token = client.access_token
cls._device_id = client.device_id
with open(".matrix_token", "w") as f:
f.write(cls._token)
with open(".matrix_device", "w") as f:
f.write(cls._device_id)
return client
@classmethod
@async_to_sync
async def set_display_name(cls, name: str) -> Union[ProfileSetDisplayNameResponse, ProfileSetDisplayNameError]:
"""
Set the display name of the bot account.
"""
client = await cls._get_client()
return await client.set_displayname(name)
@classmethod
@async_to_sync
async def set_avatar(cls, avatar_url: str) -> Union[ProfileSetAvatarResponse, ProfileSetAvatarError]:
"""
Set the display avatar of the bot account.
"""
client = await cls._get_client()
return await client.set_avatar(avatar_url)
@classmethod
@async_to_sync
async def upload(
cls,
data_provider: DataProvider,
content_type: str = "application/octet-stream",
filename: Optional[str] = None,
encrypt: bool = False,
monitor: Optional[TransferMonitor] = None,
filesize: Optional[int] = None,
) -> Tuple[Union[UploadResponse, UploadError], Optional[Dict[str, Any]]]:
"""
Upload a file to the content repository.
Returns a tuple containing:
- Either a `UploadResponse` if the request was successful, or a
`UploadError` if there was an error with the request
- A dict with file decryption info if encrypt is ``True``,
else ``None``.
Args:
data_provider (Callable, SynchronousFile, AsyncFile): A function
returning the data to upload or a file object. File objects
must be opened in binary mode (``mode="r+b"``). Callables
returning a path string, Path, async iterable or aiofiles
open binary file object allow the file data to be read in an
asynchronous and lazy way (without reading the entire file
into memory). Returning a synchronous iterable or standard
open binary file object will still allow the data to be read
lazily, but not asynchronously.
The function will be called again if the upload fails
due to a server timeout, in which case it must restart
from the beginning.
Callables receive two arguments: the total number of
429 "Too many request" errors that occured, and the total
number of server timeout exceptions that occured, thus
cleanup operations can be performed for retries if necessary.
content_type (str): The content MIME type of the file,
e.g. "image/png".
Defaults to "application/octet-stream", corresponding to a
generic binary file.
Custom values are ignored if encrypt is ``True``.
filename (str, optional): The file's original name.
encrypt (bool): If the file's content should be encrypted,
necessary for files that will be sent to encrypted rooms.
Defaults to ``False``.
monitor (TransferMonitor, optional): If a ``TransferMonitor``
object is passed, it will be updated by this function while
uploading.
From this object, statistics such as currently
transferred bytes or estimated remaining time can be gathered
while the upload is running as a task; it also allows
for pausing and cancelling.
filesize (int, optional): Size in bytes for the file to transfer.
If left as ``None``, some servers might refuse the upload.
"""
client = await cls._get_client()
return await client.upload(data_provider, content_type, filename, encrypt, monitor, filesize)
@classmethod
@async_to_sync
async def create_room(
cls,
visibility: RoomVisibility = RoomVisibility.private,
alias: Optional[str] = None,
name: Optional[str] = None,
topic: Optional[str] = None,
room_version: Optional[str] = None,
federate: bool = True,
is_direct: bool = False,
preset: Optional[RoomPreset] = None,
invite=(),
initial_state=(),
power_level_override: Optional[Dict[str, Any]] = None,
) -> Union[RoomCreateResponse, RoomCreateError]:
"""
Create a new room.
Returns either a `RoomCreateResponse` if the request was successful or
a `RoomCreateError` if there was an error with the request.
Args:
visibility (RoomVisibility): whether to have the room published in
the server's room directory or not.
Defaults to ``RoomVisibility.private``.
alias (str, optional): The desired canonical alias local part.
For example, if set to "foo" and the room is created on the
"example.com" server, the room alias will be
"#foo:example.com".
name (str, optional): A name to set for the room.
topic (str, optional): A topic to set for the room.
room_version (str, optional): The room version to set.
If not specified, the homeserver will use its default setting.
If a version not supported by the homeserver is specified,
a 400 ``M_UNSUPPORTED_ROOM_VERSION`` error will be returned.
federate (bool): Whether to allow users from other homeservers from
joining the room. Defaults to ``True``.
Cannot be changed later.
is_direct (bool): If this should be considered a
direct messaging room.
If ``True``, the server will set the ``is_direct`` flag on
``m.room.member events`` sent to the users in ``invite``.
Defaults to ``False``.
preset (RoomPreset, optional): The selected preset will set various
rules for the room.
If unspecified, the server will choose a preset from the
``visibility``: ``RoomVisibility.public`` equates to
``RoomPreset.public_chat``, and
``RoomVisibility.private`` equates to a
``RoomPreset.private_chat``.
invite (list): A list of user id to invite to the room.
initial_state (list): A list of state event dicts to send when
the room is created.
For example, a room could be made encrypted immediatly by
having a ``m.room.encryption`` event dict.
power_level_override (dict): A ``m.room.power_levels content`` dict
to override the default.
The dict will be applied on top of the generated
``m.room.power_levels`` event before it is sent to the room.
"""
client = await cls._get_client()
return await client.room_create(
visibility, alias, name, topic, room_version, federate, is_direct, preset, invite, initial_state,
power_level_override)
@classmethod
async def resolve_room_alias(cls, room_alias: str) -> Optional[str]:
"""
Resolve a room alias to a room ID.
Return None if the alias does not exist.
"""
client = await cls._get_client()
resp: RoomResolveAliasResponse = await client.room_resolve_alias(room_alias)
if isinstance(resp, RoomResolveAliasResponse):
return resp.room_id
return None
@classmethod
@async_to_sync
async def invite(cls, room_id: str, user_id: str) -> Union[RoomInviteResponse, RoomInviteError]:
"""
Invite a user to a room.
Returns either a `RoomInviteResponse` if the request was successful or
a `RoomInviteError` if there was an error with the request.
Args:
room_id (str): The room id of the room that the user will be
invited to.
user_id (str): The user id of the user that should be invited.
"""
client = await cls._get_client()
if room_id.startswith("#"):
room_id = await cls.resolve_room_alias(room_id)
return await client.room_invite(room_id, user_id)
@classmethod
@async_to_sync
async def kick(cls, room_id: str, user_id: str, reason: str = None) -> Union[RoomKickResponse, RoomInviteError]:
"""
Kick a user from a room, or withdraw their invitation.
Kicking a user adjusts their membership to "leave" with an optional
reason.
Returns either a `RoomKickResponse` if the request was successful or
a `RoomKickError` if there was an error with the request.
Args:
room_id (str): The room id of the room that the user will be
kicked from.
user_id (str): The user_id of the user that should be kicked.
reason (str, optional): A reason for which the user is kicked.
"""
client = await cls._get_client()
if room_id.startswith("#"):
room_id = await cls.resolve_room_alias(room_id)
return await client.room_kick(room_id, user_id, reason)
@classmethod
@async_to_sync
async def set_room_power_level(cls, room_id: str, user_id: str, power_level: int)\
-> Union[RoomPutStateResponse, RoomPutStateError]:
"""
Put a given power level to a user in a certain room.
Returns either a `RoomPutStateResponse` if the request was successful or
a `RoomPutStateError` if there was an error with the request.
Args:
room_id (str): The room id of the room where the power level
of the user should be updated.
user_id (str): The user_id of the user which power level should
be updated.
power_level (int): The target power level to give.
"""
client = await cls._get_client()
if room_id.startswith("#"):
room_id = await cls.resolve_room_alias(room_id)
resp = await client.room_get_state_event(room_id, "m.room.power_levels")
content = resp.content
content["users"][user_id] = power_level
return await client.room_put_state(room_id, "m.room.power_levels", content=content, state_key=resp.state_key)
@classmethod
@async_to_sync
async def set_room_power_level_event(cls, room_id: str, event: str, power_level: int)\
-> Union[RoomPutStateResponse, RoomPutStateError]:
"""
Define the minimal power level to have to send a certain event type
in a given room.
Returns either a `RoomPutStateResponse` if the request was successful or
a `RoomPutStateError` if there was an error with the request.
Args:
room_id (str): The room id of the room where the power level
of the event should be updated.
event (str): The event name which minimal power level should
be updated.
power_level (int): The target power level to give.
"""
client = await cls._get_client()
if room_id.startswith("#"):
room_id = await cls.resolve_room_alias(room_id)
resp = await client.room_get_state_event(room_id, "m.room.power_levels")
content = resp.content
if event.startswith("m."):
content["events"][event] = power_level
else:
content[event] = power_level
return await client.room_put_state(room_id, "m.room.power_levels", content=content, state_key=resp.state_key)
@classmethod
@async_to_sync
async def set_room_avatar(cls, room_id: str, avatar_uri: str)\
-> Union[RoomPutStateResponse, RoomPutStateError]:
"""
Define the avatar of a room.
Returns either a `RoomPutStateResponse` if the request was successful or
a `RoomPutStateError` if there was an error with the request.
Args:
room_id (str): The room id of the room where the avatar
should be changed.
avatar_uri (str): The internal avatar URI to apply.
"""
client = await cls._get_client()
if room_id.startswith("#"):
room_id = await cls.resolve_room_alias(room_id)
return await client.room_put_state(room_id, "m.room.avatar", content={
"url": avatar_uri
}, state_key="")
class FakeMatrixClient:
"""
Simulate a Matrix client to run tests, if no Matrix homeserver is connected.
"""
def __getattribute__(self, item):
async def func(*_, **_2):
return None
return func