From 89bb6ddc3af3ad6be8f72e517ace639c7b5fa7d3 Mon Sep 17 00:00:00 2001 From: Yohann D'ANELLO Date: Tue, 5 Jan 2021 23:59:35 +0100 Subject: [PATCH] Manage commands --- squinnondation/hazel.py | 203 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 198 insertions(+), 5 deletions(-) diff --git a/squinnondation/hazel.py b/squinnondation/hazel.py index 59f03d2..d56f9d9 100644 --- a/squinnondation/hazel.py +++ b/squinnondation/hazel.py @@ -3,7 +3,7 @@ from datetime import datetime from random import randint, uniform -from typing import Any, Tuple +from typing import Any, Tuple, Generator # from ipaddress import IPv6Address from threading import Thread, RLock import curses @@ -129,7 +129,7 @@ class Squirrel(Hazelnut): self.hazel_manager = HazelManager(self) self.inondator = Inondator(self) - self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}") + self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True) self.add_system_message(f"I am {self.id}") def new_hazel(self, address: str, port: int) -> Hazelnut: @@ -168,6 +168,14 @@ class Squirrel(Hazelnut): if hazelnut.id == hazel_id: return hazelnut + def find_hazelnut_by_nickname(self, nickname: str) -> Generator[Hazelnut, Any, None]: + """ + Retrieve the hazelnuts that are known by their nicknames. + """ + for hazelnut in self.hazelnuts.values(): + if hazelnut.nickname == nickname: + yield hazelnut + def send_packet(self, client: Hazelnut, pkt: Packet) -> int: """ Send a formatted packet to a client. @@ -322,6 +330,9 @@ class Squirrel(Hazelnut): if not msg: return + if msg.startswith("/"): + return self.handle_command(msg[1:]) + msg = f"{self.nickname}: {msg}" self.add_message(msg) @@ -376,6 +387,187 @@ class Squirrel(Hazelnut): for c in reversed(demojized): curses.ungetch(c) + def handle_command(self, command: str) -> None: + """ + The user sent a command. We analyse it and process what is needed. + """ + def resolve_address(address: str) -> str: + # Resolve address + try: + # Resolve DNS as an IPv6 + return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0] + except socket.gaierror: + # This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping + # to compute a valid IPv6. + # See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4 + try: + return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0] + except socket.gaierror: + raise ValueError(f"{address} is not resolvable") + + def resolve_port(port: str) -> int: + try: + port = int(port) + if not 1 <= port <= 65565: + raise ValueError + return port + except ValueError: + raise ValueError(f"{port} is not a valid port") + + args = command.split(" ") + command, args = args[0].lower(), args[1:] + if command == "help" or command == "usage": + self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True) + self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours", + italic=False, ignore_debug=True) + self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour", + italic=False, ignore_debug=True) + self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour", + italic=False, ignore_debug=True) + self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour", + italic=False, ignore_debug=True) + self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.", + italic=False, ignore_debug=True) + self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.", + italic=False, ignore_debug=True) + self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True) + self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True) + self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True) + elif command == "connect": + if len(args) != 2: + self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True) + return + try: + address, port = resolve_address(args[0]), resolve_port(args[1]) + except ValueError as e: + self.add_system_message(str(e), ignore_debug=True) + return + + if (address, port) in self.hazelnuts: + self.add_system_message("There is already a known client with this address.", ignore_debug=True) + return + hazelnut = self.new_hazel(address, port) + self.hazelnuts[(address, port)] = hazelnut + self.add_system_message(f"Potential client successfully added! You can send a hello by running " + f"\"/hello {address} {port}\".", ignore_debug=True) + elif command == "hello": + if len(args) != 2: + self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True) + return + try: + address, port = resolve_address(args[0]), resolve_port(args[1]) + except ValueError as e: + self.add_system_message(str(e), ignore_debug=True) + return + + if (address, port) not in self.hazelnuts: + self.add_system_message("This client is unknown. Please register it by running " + f"\"/connect {address} {port}\"", ignore_debug=True) + return + + hazelnut = self.find_hazelnut(address, port) + self.send_packet(hazelnut, Packet.construct(HelloTLV.construct(8, self))) + + self.add_system_message("Hello successfully sent!", ignore_debug=True) + elif command == "unban": + if len(args) != 2: + self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True) + return + try: + address, port = resolve_address(args[0]), resolve_port(args[1]) + except ValueError as e: + self.add_system_message(str(e), ignore_debug=True) + return + + if (address, port) not in self.hazelnuts: + self.add_system_message("This client is unknown. Please register it by running " + f"\"/connect {address} {port}\"", ignore_debug=True) + return + + hazelnut = self.find_hazelnut(address, port) + hazelnut.errors = 0 + + self.add_system_message("The client is unbanned.", ignore_debug=True) + elif command == "info": + if len(args) > 2: + self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True) + return + + if not args: + hazelnuts = [self] + elif len(args) == 2: + try: + address, port = resolve_address(args[0]), resolve_port(args[1]) + except ValueError as e: + self.add_system_message(str(e), ignore_debug=True) + return + + if (address, port) not in self.hazelnuts: + self.add_system_message("This client is unknown. Please register it by running " + f"\"/connect {address} {port}\"", ignore_debug=True) + return + + hazelnuts = [self.find_hazelnut(address, port)] + else: + hazelnuts = list(self.find_hazelnut_by_nickname(args[0])) + if args[0].isnumeric(): + identifier = int(args[0]) + hazelnuts.append(self.find_hazelnut_by_id(identifier)) + if not hazelnuts: + self.add_system_message("Unknown client.") + return + + for hazel in hazelnuts: + self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Addresses:** " + + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), + italic=False, ignore_debug=True) + elif command == "active": + if not self.active_hazelnuts: + self.add_system_message("No active neighbour.", italic=False, ignore_debug=True) + return + + for hazel in self.active_hazelnuts: + self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Addresses:** " + + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), + italic=False, ignore_debug=True) + elif command == "potential": + if not self.potential_hazelnuts: + self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True) + return + + for hazel in self.potential_hazelnuts: + self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", + italic=False, ignore_debug=True) + self.add_system_message(f"**Addresses:** " + + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), + italic=False, ignore_debug=True) + elif command == "debug": + self.squinnondation.debug ^= True + self.add_system_message( + "Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True) + elif command == "emojis": + self.squinnondation.no_emoji ^= True + self.add_system_message( + "Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".", + ignore_debug=True) + elif command == "markdown": + self.squinnondation.debug ^= True + self.add_system_message( + "Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".", + ignore_debug=True) + else: + self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True) + def add_message(self, msg: str) -> None: """ Store a new message into the history. @@ -464,12 +656,13 @@ class Squirrel(Hazelnut): self.send_packet(hazelnut, pkt) self.recent_messages[key][2].pop(key2) - def add_system_message(self, msg: str) -> None: + def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None: """ Add a new system log message. """ - if self.squinnondation.debug: - return self.add_message(f"system: *{msg}*" if not self.squinnondation.no_markdown else f"system: {msg}") + if self.squinnondation.debug or ignore_debug: + return self.add_message( + f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}") def print_markdown(self, pad: Any, y: int, x: int, msg: str, bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int: