Manage commands
This commit is contained in:
		| @@ -3,7 +3,7 @@ | |||||||
|  |  | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from random import randint, uniform | from random import randint, uniform | ||||||
| from typing import Any, Tuple | from typing import Any, Tuple, Generator | ||||||
| # from ipaddress import IPv6Address | # from ipaddress import IPv6Address | ||||||
| from threading import Thread, RLock | from threading import Thread, RLock | ||||||
| import curses | import curses | ||||||
| @@ -129,7 +129,7 @@ class Squirrel(Hazelnut): | |||||||
|         self.hazel_manager = HazelManager(self) |         self.hazel_manager = HazelManager(self) | ||||||
|         self.inondator = Inondator(self) |         self.inondator = Inondator(self) | ||||||
|  |  | ||||||
|         self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}") |         self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True) | ||||||
|         self.add_system_message(f"I am {self.id}") |         self.add_system_message(f"I am {self.id}") | ||||||
|  |  | ||||||
|     def new_hazel(self, address: str, port: int) -> Hazelnut: |     def new_hazel(self, address: str, port: int) -> Hazelnut: | ||||||
| @@ -168,6 +168,14 @@ class Squirrel(Hazelnut): | |||||||
|                 if hazelnut.id == hazel_id: |                 if hazelnut.id == hazel_id: | ||||||
|                     return hazelnut |                     return hazelnut | ||||||
|  |  | ||||||
|  |     def find_hazelnut_by_nickname(self, nickname: str) -> Generator[Hazelnut, Any, None]: | ||||||
|  |         """ | ||||||
|  |         Retrieve the hazelnuts that are known by their nicknames. | ||||||
|  |         """ | ||||||
|  |         for hazelnut in self.hazelnuts.values(): | ||||||
|  |             if hazelnut.nickname == nickname: | ||||||
|  |                 yield hazelnut | ||||||
|  |  | ||||||
|     def send_packet(self, client: Hazelnut, pkt: Packet) -> int: |     def send_packet(self, client: Hazelnut, pkt: Packet) -> int: | ||||||
|         """ |         """ | ||||||
|         Send a formatted packet to a client. |         Send a formatted packet to a client. | ||||||
| @@ -322,6 +330,9 @@ class Squirrel(Hazelnut): | |||||||
|         if not msg: |         if not msg: | ||||||
|             return |             return | ||||||
|  |  | ||||||
|  |         if msg.startswith("/"): | ||||||
|  |             return self.handle_command(msg[1:]) | ||||||
|  |  | ||||||
|         msg = f"{self.nickname}: {msg}" |         msg = f"{self.nickname}: {msg}" | ||||||
|         self.add_message(msg) |         self.add_message(msg) | ||||||
|  |  | ||||||
| @@ -376,6 +387,187 @@ class Squirrel(Hazelnut): | |||||||
|                 for c in reversed(demojized): |                 for c in reversed(demojized): | ||||||
|                     curses.ungetch(c) |                     curses.ungetch(c) | ||||||
|  |  | ||||||
|  |     def handle_command(self, command: str) -> None: | ||||||
|  |         """ | ||||||
|  |         The user sent a command. We analyse it and process what is needed. | ||||||
|  |         """ | ||||||
|  |         def resolve_address(address: str) -> str: | ||||||
|  |             # Resolve address | ||||||
|  |             try: | ||||||
|  |                 # Resolve DNS as an IPv6 | ||||||
|  |                 return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0] | ||||||
|  |             except socket.gaierror: | ||||||
|  |                 # This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping | ||||||
|  |                 # to compute a valid IPv6. | ||||||
|  |                 # See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4 | ||||||
|  |                 try: | ||||||
|  |                     return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0] | ||||||
|  |                 except socket.gaierror: | ||||||
|  |                     raise ValueError(f"{address} is not resolvable") | ||||||
|  |  | ||||||
|  |         def resolve_port(port: str) -> int: | ||||||
|  |             try: | ||||||
|  |                 port = int(port) | ||||||
|  |                 if not 1 <= port <= 65565: | ||||||
|  |                     raise ValueError | ||||||
|  |                 return port | ||||||
|  |             except ValueError: | ||||||
|  |                 raise ValueError(f"{port} is not a valid port") | ||||||
|  |  | ||||||
|  |         args = command.split(" ") | ||||||
|  |         command, args = args[0].lower(), args[1:] | ||||||
|  |         if command == "help" or command == "usage": | ||||||
|  |             self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.", | ||||||
|  |                                     italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True) | ||||||
|  |             self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True) | ||||||
|  |         elif command == "connect": | ||||||
|  |             if len(args) != 2: | ||||||
|  |                 self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |             try: | ||||||
|  |                 address, port = resolve_address(args[0]), resolve_port(args[1]) | ||||||
|  |             except ValueError as e: | ||||||
|  |                 self.add_system_message(str(e), ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             if (address, port) in self.hazelnuts: | ||||||
|  |                 self.add_system_message("There is already a known client with this address.", ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |             hazelnut = self.new_hazel(address, port) | ||||||
|  |             self.hazelnuts[(address, port)] = hazelnut | ||||||
|  |             self.add_system_message(f"Potential client successfully added! You can send a hello by running " | ||||||
|  |                                     f"\"/hello {address} {port}\".", ignore_debug=True) | ||||||
|  |         elif command == "hello": | ||||||
|  |             if len(args) != 2: | ||||||
|  |                 self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |             try: | ||||||
|  |                 address, port = resolve_address(args[0]), resolve_port(args[1]) | ||||||
|  |             except ValueError as e: | ||||||
|  |                 self.add_system_message(str(e), ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             if (address, port) not in self.hazelnuts: | ||||||
|  |                 self.add_system_message("This client is unknown. Please register it by running " | ||||||
|  |                                         f"\"/connect {address} {port}\"", ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             hazelnut = self.find_hazelnut(address, port) | ||||||
|  |             self.send_packet(hazelnut, Packet.construct(HelloTLV.construct(8, self))) | ||||||
|  |  | ||||||
|  |             self.add_system_message("Hello successfully sent!", ignore_debug=True) | ||||||
|  |         elif command == "unban": | ||||||
|  |             if len(args) != 2: | ||||||
|  |                 self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |             try: | ||||||
|  |                 address, port = resolve_address(args[0]), resolve_port(args[1]) | ||||||
|  |             except ValueError as e: | ||||||
|  |                 self.add_system_message(str(e), ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             if (address, port) not in self.hazelnuts: | ||||||
|  |                 self.add_system_message("This client is unknown. Please register it by running " | ||||||
|  |                                         f"\"/connect {address} {port}\"", ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             hazelnut = self.find_hazelnut(address, port) | ||||||
|  |             hazelnut.errors = 0 | ||||||
|  |  | ||||||
|  |             self.add_system_message("The client is unbanned.", ignore_debug=True) | ||||||
|  |         elif command == "info": | ||||||
|  |             if len(args) > 2: | ||||||
|  |                 self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             if not args: | ||||||
|  |                 hazelnuts = [self] | ||||||
|  |             elif len(args) == 2: | ||||||
|  |                 try: | ||||||
|  |                     address, port = resolve_address(args[0]), resolve_port(args[1]) | ||||||
|  |                 except ValueError as e: | ||||||
|  |                     self.add_system_message(str(e), ignore_debug=True) | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |                 if (address, port) not in self.hazelnuts: | ||||||
|  |                     self.add_system_message("This client is unknown. Please register it by running " | ||||||
|  |                                             f"\"/connect {address} {port}\"", ignore_debug=True) | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |                 hazelnuts = [self.find_hazelnut(address, port)] | ||||||
|  |             else: | ||||||
|  |                 hazelnuts = list(self.find_hazelnut_by_nickname(args[0])) | ||||||
|  |                 if args[0].isnumeric(): | ||||||
|  |                     identifier = int(args[0]) | ||||||
|  |                     hazelnuts.append(self.find_hazelnut_by_id(identifier)) | ||||||
|  |                 if not hazelnuts: | ||||||
|  |                     self.add_system_message("Unknown client.") | ||||||
|  |                     return | ||||||
|  |  | ||||||
|  |             for hazel in hazelnuts: | ||||||
|  |                 self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Addresses:** " | ||||||
|  |                                         + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |         elif command == "active": | ||||||
|  |             if not self.active_hazelnuts: | ||||||
|  |                 self.add_system_message("No active neighbour.", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             for hazel in self.active_hazelnuts: | ||||||
|  |                 self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Addresses:** " | ||||||
|  |                                         + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |         elif command == "potential": | ||||||
|  |             if not self.potential_hazelnuts: | ||||||
|  |                 self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True) | ||||||
|  |                 return | ||||||
|  |  | ||||||
|  |             for hazel in self.potential_hazelnuts: | ||||||
|  |                 self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |                 self.add_system_message(f"**Addresses:** " | ||||||
|  |                                         + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), | ||||||
|  |                                         italic=False, ignore_debug=True) | ||||||
|  |         elif command == "debug": | ||||||
|  |             self.squinnondation.debug ^= True | ||||||
|  |             self.add_system_message( | ||||||
|  |                 "Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True) | ||||||
|  |         elif command == "emojis": | ||||||
|  |             self.squinnondation.no_emoji ^= True | ||||||
|  |             self.add_system_message( | ||||||
|  |                 "Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".", | ||||||
|  |                 ignore_debug=True) | ||||||
|  |         elif command == "markdown": | ||||||
|  |             self.squinnondation.debug ^= True | ||||||
|  |             self.add_system_message( | ||||||
|  |                 "Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".", | ||||||
|  |                 ignore_debug=True) | ||||||
|  |         else: | ||||||
|  |             self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True) | ||||||
|  |  | ||||||
|     def add_message(self, msg: str) -> None: |     def add_message(self, msg: str) -> None: | ||||||
|         """ |         """ | ||||||
|         Store a new message into the history. |         Store a new message into the history. | ||||||
| @@ -464,12 +656,13 @@ class Squirrel(Hazelnut): | |||||||
|                     self.send_packet(hazelnut, pkt) |                     self.send_packet(hazelnut, pkt) | ||||||
|                     self.recent_messages[key][2].pop(key2) |                     self.recent_messages[key][2].pop(key2) | ||||||
|  |  | ||||||
|     def add_system_message(self, msg: str) -> None: |     def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None: | ||||||
|         """ |         """ | ||||||
|         Add a new system log message. |         Add a new system log message. | ||||||
|         """ |         """ | ||||||
|         if self.squinnondation.debug: |         if self.squinnondation.debug or ignore_debug: | ||||||
|             return self.add_message(f"system: *{msg}*" if not self.squinnondation.no_markdown else f"system: {msg}") |             return self.add_message( | ||||||
|  |                 f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}") | ||||||
|  |  | ||||||
|     def print_markdown(self, pad: Any, y: int, x: int, msg: str, |     def print_markdown(self, pad: Any, y: int, x: int, msg: str, | ||||||
|                        bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int: |                        bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int: | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user