Merge branch 'commands' into 'master'
Commands See merge request ynerant/squinnondation!7
This commit is contained in:
		@@ -3,7 +3,7 @@
 | 
			
		||||
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from random import randint, uniform
 | 
			
		||||
from typing import Any, Tuple
 | 
			
		||||
from typing import Any, Tuple, Generator
 | 
			
		||||
# from ipaddress import IPv6Address
 | 
			
		||||
from threading import Thread, RLock
 | 
			
		||||
import curses
 | 
			
		||||
@@ -129,7 +129,7 @@ class Squirrel(Hazelnut):
 | 
			
		||||
        self.hazel_manager = HazelManager(self)
 | 
			
		||||
        self.inondator = Inondator(self)
 | 
			
		||||
 | 
			
		||||
        self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}")
 | 
			
		||||
        self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True)
 | 
			
		||||
        self.add_system_message(f"I am {self.id}")
 | 
			
		||||
 | 
			
		||||
    def new_hazel(self, address: str, port: int) -> Hazelnut:
 | 
			
		||||
@@ -168,6 +168,14 @@ class Squirrel(Hazelnut):
 | 
			
		||||
                if hazelnut.id == hazel_id:
 | 
			
		||||
                    return hazelnut
 | 
			
		||||
 | 
			
		||||
    def find_hazelnut_by_nickname(self, nickname: str) -> Generator[Hazelnut, Any, None]:
 | 
			
		||||
        """
 | 
			
		||||
        Retrieve the hazelnuts that are known by their nicknames.
 | 
			
		||||
        """
 | 
			
		||||
        for hazelnut in self.hazelnuts.values():
 | 
			
		||||
            if hazelnut.nickname == nickname:
 | 
			
		||||
                yield hazelnut
 | 
			
		||||
 | 
			
		||||
    def send_packet(self, client: Hazelnut, pkt: Packet) -> int:
 | 
			
		||||
        """
 | 
			
		||||
        Send a formatted packet to a client.
 | 
			
		||||
@@ -262,12 +270,16 @@ class Squirrel(Hazelnut):
 | 
			
		||||
        """
 | 
			
		||||
        Process the key press from the user.
 | 
			
		||||
        """
 | 
			
		||||
        if key == "\x7f":  # backspace
 | 
			
		||||
        if key == "\x7f" or key == curses.KEY_BACKSPACE:  # backspace
 | 
			
		||||
            # delete character at the good position
 | 
			
		||||
            if self.input_index:
 | 
			
		||||
                self.input_index -= 1
 | 
			
		||||
                self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
 | 
			
		||||
            return
 | 
			
		||||
        elif key == curses.KEY_DC:  # key
 | 
			
		||||
            if self.input_index < len(self.input_buffer):
 | 
			
		||||
                self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
 | 
			
		||||
            return
 | 
			
		||||
        elif key == curses.KEY_LEFT:
 | 
			
		||||
            # Navigate in the message to the left
 | 
			
		||||
            self.input_index = max(0, self.input_index - 1)
 | 
			
		||||
@@ -322,6 +334,9 @@ class Squirrel(Hazelnut):
 | 
			
		||||
        if not msg:
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        if msg.startswith("/"):
 | 
			
		||||
            return self.handle_command(msg[1:])
 | 
			
		||||
 | 
			
		||||
        msg = f"{self.nickname}: {msg}"
 | 
			
		||||
        self.add_message(msg)
 | 
			
		||||
 | 
			
		||||
@@ -376,6 +391,187 @@ class Squirrel(Hazelnut):
 | 
			
		||||
                for c in reversed(demojized):
 | 
			
		||||
                    curses.ungetch(c)
 | 
			
		||||
 | 
			
		||||
    def handle_command(self, command: str) -> None:  # noqa: C901
 | 
			
		||||
        """
 | 
			
		||||
        The user sent a command. We analyse it and process what is needed.
 | 
			
		||||
        """
 | 
			
		||||
        def resolve_address(address: str) -> str:
 | 
			
		||||
            # Resolve address
 | 
			
		||||
            try:
 | 
			
		||||
                # Resolve DNS as an IPv6
 | 
			
		||||
                return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0]
 | 
			
		||||
            except socket.gaierror:
 | 
			
		||||
                # This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
 | 
			
		||||
                # to compute a valid IPv6.
 | 
			
		||||
                # See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
 | 
			
		||||
                try:
 | 
			
		||||
                    return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0]
 | 
			
		||||
                except socket.gaierror:
 | 
			
		||||
                    raise ValueError(f"{address} is not resolvable")
 | 
			
		||||
 | 
			
		||||
        def resolve_port(port: str) -> int:
 | 
			
		||||
            try:
 | 
			
		||||
                port = int(port)
 | 
			
		||||
                if not 1 <= port <= 65565:
 | 
			
		||||
                    raise ValueError
 | 
			
		||||
                return port
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                raise ValueError(f"{port} is not a valid port")
 | 
			
		||||
 | 
			
		||||
        args = command.split(" ")
 | 
			
		||||
        command, args = args[0].lower(), args[1:]
 | 
			
		||||
        if command == "help" or command == "usage":
 | 
			
		||||
            self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.",
 | 
			
		||||
                                    italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True)
 | 
			
		||||
            self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True)
 | 
			
		||||
        elif command == "connect":
 | 
			
		||||
            if len(args) != 2:
 | 
			
		||||
                self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
            try:
 | 
			
		||||
                address, port = resolve_address(args[0]), resolve_port(args[1])
 | 
			
		||||
            except ValueError as e:
 | 
			
		||||
                self.add_system_message(str(e), ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            if (address, port) in self.hazelnuts:
 | 
			
		||||
                self.add_system_message("There is already a known client with this address.", ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
            hazelnut = self.new_hazel(address, port)
 | 
			
		||||
            self.hazelnuts[(address, port)] = hazelnut
 | 
			
		||||
            self.add_system_message(f"Potential client successfully added! You can send a hello by running "
 | 
			
		||||
                                    f"\"/hello {address} {port}\".", ignore_debug=True)
 | 
			
		||||
        elif command == "hello":
 | 
			
		||||
            if len(args) != 2:
 | 
			
		||||
                self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
            try:
 | 
			
		||||
                address, port = resolve_address(args[0]), resolve_port(args[1])
 | 
			
		||||
            except ValueError as e:
 | 
			
		||||
                self.add_system_message(str(e), ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            if (address, port) not in self.hazelnuts:
 | 
			
		||||
                self.add_system_message("This client is unknown. Please register it by running "
 | 
			
		||||
                                        f"\"/connect {address} {port}\"", ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            hazelnut = self.find_hazelnut(address, port)
 | 
			
		||||
            self.send_packet(hazelnut, Packet.construct(HelloTLV.construct(8, self)))
 | 
			
		||||
 | 
			
		||||
            self.add_system_message("Hello successfully sent!", ignore_debug=True)
 | 
			
		||||
        elif command == "unban":
 | 
			
		||||
            if len(args) != 2:
 | 
			
		||||
                self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
            try:
 | 
			
		||||
                address, port = resolve_address(args[0]), resolve_port(args[1])
 | 
			
		||||
            except ValueError as e:
 | 
			
		||||
                self.add_system_message(str(e), ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            if (address, port) not in self.hazelnuts:
 | 
			
		||||
                self.add_system_message("This client is unknown. Please register it by running "
 | 
			
		||||
                                        f"\"/connect {address} {port}\"", ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            hazelnut = self.find_hazelnut(address, port)
 | 
			
		||||
            hazelnut.errors = 0
 | 
			
		||||
 | 
			
		||||
            self.add_system_message("The client is unbanned.", ignore_debug=True)
 | 
			
		||||
        elif command == "info":
 | 
			
		||||
            if len(args) > 2:
 | 
			
		||||
                self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            if not args:
 | 
			
		||||
                hazelnuts = [self]
 | 
			
		||||
            elif len(args) == 2:
 | 
			
		||||
                try:
 | 
			
		||||
                    address, port = resolve_address(args[0]), resolve_port(args[1])
 | 
			
		||||
                except ValueError as e:
 | 
			
		||||
                    self.add_system_message(str(e), ignore_debug=True)
 | 
			
		||||
                    return
 | 
			
		||||
 | 
			
		||||
                if (address, port) not in self.hazelnuts:
 | 
			
		||||
                    self.add_system_message("This client is unknown. Please register it by running "
 | 
			
		||||
                                            f"\"/connect {address} {port}\"", ignore_debug=True)
 | 
			
		||||
                    return
 | 
			
		||||
 | 
			
		||||
                hazelnuts = [self.find_hazelnut(address, port)]
 | 
			
		||||
            else:
 | 
			
		||||
                hazelnuts = list(self.find_hazelnut_by_nickname(args[0]))
 | 
			
		||||
                if args[0].isnumeric():
 | 
			
		||||
                    identifier = int(args[0])
 | 
			
		||||
                    hazelnuts.append(self.find_hazelnut_by_id(identifier))
 | 
			
		||||
                if not hazelnuts:
 | 
			
		||||
                    self.add_system_message("Unknown client.")
 | 
			
		||||
                    return
 | 
			
		||||
 | 
			
		||||
            for hazel in hazelnuts:
 | 
			
		||||
                self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message("**Addresses:** "
 | 
			
		||||
                                        + ", ".join(f"{address}:{port}" for address, port in hazel.addresses),
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
        elif command == "active":
 | 
			
		||||
            if not self.active_hazelnuts:
 | 
			
		||||
                self.add_system_message("No active neighbour.", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            for hazel in self.active_hazelnuts:
 | 
			
		||||
                self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message("**Addresses:** "
 | 
			
		||||
                                        + ", ".join(f"{address}:{port}" for address, port in hazel.addresses),
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
        elif command == "potential":
 | 
			
		||||
            if not self.potential_hazelnuts:
 | 
			
		||||
                self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True)
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            for hazel in self.potential_hazelnuts:
 | 
			
		||||
                self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}",
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
                self.add_system_message("**Addresses:** "
 | 
			
		||||
                                        + ", ".join(f"{address}:{port}" for address, port in hazel.addresses),
 | 
			
		||||
                                        italic=False, ignore_debug=True)
 | 
			
		||||
        elif command == "debug":
 | 
			
		||||
            self.squinnondation.debug ^= True
 | 
			
		||||
            self.add_system_message(
 | 
			
		||||
                "Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True)
 | 
			
		||||
        elif command == "emojis":
 | 
			
		||||
            self.squinnondation.no_emoji ^= True
 | 
			
		||||
            self.add_system_message(
 | 
			
		||||
                "Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".",
 | 
			
		||||
                ignore_debug=True)
 | 
			
		||||
        elif command == "markdown":
 | 
			
		||||
            self.squinnondation.no_markdown ^= True
 | 
			
		||||
            self.add_system_message(
 | 
			
		||||
                "Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".",
 | 
			
		||||
                ignore_debug=True)
 | 
			
		||||
        else:
 | 
			
		||||
            self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True)
 | 
			
		||||
 | 
			
		||||
    def add_message(self, msg: str) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Store a new message into the history.
 | 
			
		||||
@@ -464,12 +660,13 @@ class Squirrel(Hazelnut):
 | 
			
		||||
                    self.send_packet(hazelnut, pkt)
 | 
			
		||||
                    self.recent_messages[key][2].pop(key2)
 | 
			
		||||
 | 
			
		||||
    def add_system_message(self, msg: str) -> None:
 | 
			
		||||
    def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Add a new system log message.
 | 
			
		||||
        """
 | 
			
		||||
        if self.squinnondation.debug:
 | 
			
		||||
            return self.add_message(f"system: *{msg}*" if not self.squinnondation.no_markdown else f"system: {msg}")
 | 
			
		||||
        if self.squinnondation.debug or ignore_debug:
 | 
			
		||||
            return self.add_message(
 | 
			
		||||
                f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}")
 | 
			
		||||
 | 
			
		||||
    def print_markdown(self, pad: Any, y: int, x: int, msg: str,
 | 
			
		||||
                       bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user