diff --git a/squinnondation/hazel.py b/squinnondation/hazel.py deleted file mode 100644 index a1e006a..0000000 --- a/squinnondation/hazel.py +++ /dev/null @@ -1,1062 +0,0 @@ -# Copyright (C) 2020 by eichhornchen, ΓΏnΓ©rant -# SPDX-License-Identifier: GPL-3.0-or-later - -from datetime import datetime -from random import randint, uniform -from typing import Any, Tuple, Generator -# from ipaddress import IPv6Address -from threading import Thread, RLock -import curses -import re -import socket -import time - -from .messages import Packet, DataTLV, HelloTLV, GoAwayTLV, GoAwayType, NeighbourTLV, WarningTLV - - -class Hazelnut: - """ - A hazelnut is a connected client, with its socket. - """ - def __init__(self, nickname: str = None, address: str = "localhost", port: int = 2500): - self.nickname = nickname - self.id = -1 - self.last_hello_time = 0 - self.last_long_hello_time = 0 - self.symmetric = False - self.active = False - self.errors = 0 - - try: - # Resolve DNS as an IPv6 - address = socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0] - except socket.gaierror: - # This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping - # to compute a valid IPv6. - # See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4 - address = "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0] - - self.addresses = set() - self.addresses.add((address, port)) - - @property - def potential(self) -> bool: - return not self.active and not self.banned - - @potential.setter - def potential(self, value: bool) -> None: - self.active = not value - - @property - def main_address(self) -> Tuple[str, int]: - """ - A client can have multiple addresses. - We contact it only on one of them. - """ - return list(self.addresses)[0] - - @property - def banned(self) -> bool: - """ - If a client send more than 5 invalid packets, we don't trust it anymore. - """ - return self.errors >= 5 - - def __repr__(self): - return self.nickname or str(self.id) or str(self.main_address) - - def __str__(self): - return repr(self) - - def merge(self, other: "Hazelnut") -> "Hazelnut": - """ - Merge the hazelnut data with one other. - The symmetric and active properties are kept from the original client. - """ - self.errors = max(self.errors, other.errors) - self.last_hello_time = max(self.last_hello_time, other.last_hello_time) - self.last_long_hello_time = max(self.last_hello_time, other.last_long_hello_time) - self.addresses.update(self.addresses) - self.addresses.update(other.addresses) - self.id = self.id if self.id > 0 else other.id - return self - - -class Squirrel(Hazelnut): - """ - The squirrel is the user of the program. It can speak with other clients, that are called hazelnuts. - """ - def __init__(self, instance: Any, nickname: str): - super().__init__(nickname, instance.bind_address, instance.bind_port) - - # Random identifier on 64 bits - self.id = randint(0, 1 << 64 - 1) - self.incr_nonce = 0 - - # Create UDP socket - self.socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) - # Bind the socket - self.socket.bind(self.main_address) - - self.squinnondation = instance - - self.input_buffer = "" - self.input_index = 0 - self.last_line = -1 - - # Lock the refresh function in order to avoid concurrent refresh - self.refresh_lock = RLock() - - self.history = [] - self.received_messages = dict() - self.recent_messages = dict() # of the form [Pkt(DataTLV), date of first reception, - # dict(neighbour, date of the next send, nb of times it has already been sent)] - self.history_pad = curses.newpad(curses.LINES - 2, curses.COLS) - self.input_pad = curses.newpad(1, curses.COLS) - self.emoji_pad = curses.newpad(18, 12) - self.emoji_panel_page = -1 - - curses.init_color(curses.COLOR_WHITE, 1000, 1000, 1000) - for i in range(curses.COLOR_BLACK + 1, curses.COLOR_WHITE): - curses.init_pair(i + 1, i, curses.COLOR_BLACK) - - # dictionnaries of neighbours - self.hazelnuts = dict() - self.nbNS = 0 - self.minNS = 3 # minimal number of symmetric neighbours a squirrel needs to have. - - self.worm = Worm(self) - self.hazel_manager = HazelManager(self) - self.inondator = Inondator(self) - - self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True) - self.add_system_message(f"I am {self.id}") - - def new_hazel(self, address: str, port: int) -> Hazelnut: - """ - Returns a new hazelnut (with no id nor nickname) - """ - hazelnut = Hazelnut(address=address, port=port) - return hazelnut - - @property - def active_hazelnuts(self) -> set: - return set(hazelnut for hazelnut in self.hazelnuts.values() if hazelnut.active) - - @property - def potential_hazelnuts(self) -> set: - return set(hazelnut for hazelnut in self.hazelnuts.values() if hazelnut.potential) - - def find_hazelnut(self, address: str, port: int) -> Hazelnut: - """ - Translate an address into a hazelnut. If this hazelnut does not exist, - creates a new hazelnut. - """ - if (address, port) in self.hazelnuts: - return self.hazelnuts[(address, port)] - hazelnut = Hazelnut(address=address, port=port) - self.hazelnuts[(address, port)] = hazelnut - return hazelnut - - def find_hazelnut_by_id(self, hazel_id: int) -> Hazelnut: - """ - Retrieve the hazelnut that is known by its id. Return None if it is unknown. - The given identifier must be positive. - """ - if hazel_id > 0: - for hazelnut in self.hazelnuts.values(): - if hazelnut.id == hazel_id: - return hazelnut - - def find_hazelnut_by_nickname(self, nickname: str) -> Generator[Hazelnut, Any, None]: - """ - Retrieve the hazelnuts that are known by their nicknames. - """ - for hazelnut in self.hazelnuts.values(): - if hazelnut.nickname == nickname: - yield hazelnut - - def send_packet(self, client: Hazelnut, pkt: Packet) -> int: - """ - Send a formatted packet to a client. - """ - if len(pkt) > 1024: - # The packet is too large to be sent by the protocol. We split the packet in subpackets. - return sum(self.send_packet(client, subpkt) for subpkt in pkt.split(1024)) - res = self.send_raw_data(client, pkt.marshal()) - return res - - def send_raw_data(self, client: Hazelnut, data: bytes) -> int: - """ - Send a raw packet to a client. - """ - return self.socket.sendto(data, client.main_address) - - def receive_packet(self) -> Tuple[Packet, Hazelnut]: - """ - Receive a packet from the socket and translate it into a Python object. - Warning: the process is blocking, it should be ran inside a dedicated thread. - """ - data, addr = self.receive_raw_data() - hazelnut = self.find_hazelnut(addr[0], addr[1]) - if hazelnut.banned: - # The client already sent errored packets - return Packet.construct(), hazelnut - try: - pkt = Packet.unmarshal(data) - except ValueError as error: - # The packet contains an error. We memorize it and warn the other user. - hazelnut.errors += 1 - self.send_packet(hazelnut, Packet.construct(WarningTLV.construct( - f"An error occured while reading your packet: {error}"))) - if hazelnut.banned: - self.send_packet(hazelnut, Packet.construct(WarningTLV.construct( - "You got banned since you sent too much errored packets."))) - raise ValueError("Client is banned since there were too many errors.", error) - raise error - else: - return pkt, hazelnut - - def receive_raw_data(self) -> Tuple[bytes, Any]: - """ - Receive a packet from the socket. - """ - return self.socket.recvfrom(1024) - - def start_threads(self) -> None: - """ - Start asynchronous threads. - """ - # Kill subthreads when exitting the program - self.worm.setDaemon(True) - self.hazel_manager.setDaemon(True) - self.inondator.setDaemon(True) - - self.worm.start() - self.hazel_manager.start() - self.inondator.start() - - def wait_for_key(self) -> None: - """ - Infinite loop where we are waiting for a key of the user. - """ - while True: - self.refresh_history() - self.refresh_input() - if not self.squinnondation.no_emoji: - self.refresh_emoji_pad() - try: - key = self.squinnondation.screen.get_wch( - curses.LINES - 1, min(3 + len(self.nickname) + self.input_index, curses.COLS - 4)) - except curses.error: - continue - except KeyboardInterrupt: - # Exit the program and send GoAway to neighbours - self.leave() - return - - if key == curses.KEY_MOUSE: - try: - _, x, y, _, attr = curses.getmouse() - self.handle_mouse_click(y, x, attr) - continue - except curses.error: - # This is not a valid click - continue - - self.handle_key_pressed(key) - - def handle_key_pressed(self, key: str) -> None: # noqa: C901 - """ - Process the key press from the user. - """ - if key == "\x7f" or key == curses.KEY_BACKSPACE: # backspace - # delete character at the good position - if self.input_index: - self.input_index -= 1 - self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:] - return - elif key == curses.KEY_DC: # key - if self.input_index < len(self.input_buffer): - self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:] - return - elif key == curses.KEY_LEFT: - # Navigate in the message to the left - self.input_index = max(0, self.input_index - 1) - return - elif key == curses.KEY_RIGHT: - # Navigate in the message to the right - self.input_index = min(len(self.input_buffer), self.input_index + 1) - return - elif key == curses.KEY_UP: - # Scroll up in the history - self.last_line = min(max(curses.LINES - 3, self.last_line - 1), len(self.history) - 1) - return - elif key == curses.KEY_DOWN: - # Scroll down in the history - self.last_line = min(len(self.history) - 1, self.last_line + 1) - return - elif key == curses.KEY_PPAGE: - # Page up in the history - self.last_line = min(max(curses.LINES - 3, self.last_line - (curses.LINES - 3)), len(self.history) - 1) - return - elif key == curses.KEY_NPAGE: - # Page down in the history - self.last_line = min(len(self.history) - 1, self.last_line + (curses.LINES - 3)) - return - elif key == curses.KEY_HOME: - # Place the cursor at the beginning of the typing word - self.input_index = 0 - return - elif key == curses.KEY_END: - # Place the cursor at the end of the typing word - self.input_index = len(self.input_buffer) - return - elif isinstance(key, int): - # Unmanaged complex key - return - elif key != "\n": - # Insert the pressed key in the current message - new_buffer = self.input_buffer[:self.input_index] + key + self.input_buffer[self.input_index:] - if len(DataTLV.construct(f"{self.nickname}: {new_buffer}", None)) > 255 - 8 - 4: - # The message is too long to be sent once. We don't allow the user to type any other character. - curses.beep() - return - self.input_buffer = new_buffer - self.input_index += 1 - return - - # Send message to neighbours - msg = self.input_buffer - self.input_buffer = "" - self.input_index = 0 - - if not msg: - return - - if msg.startswith("/"): - return self.handle_command(msg[1:]) - - msg = f"{self.nickname}: {msg}" - self.add_message(msg) - - pkt = Packet.construct(DataTLV.construct(msg, self)) - for hazelnut in self.active_hazelnuts: - self.send_packet(hazelnut, pkt) - - def handle_mouse_click(self, y: int, x: int, attr: int) -> None: - """ - The user clicks on the screen, at coordinates (y, x). - According to the position, we can indicate what can be done. - """ - - if not self.squinnondation.no_emoji: - if y == curses.LINES - 1 and x >= curses.COLS - 3: - # Click on the emoji, open or close the emoji pad - self.emoji_panel_page *= -1 - elif self.emoji_panel_page > 0 and y == curses.LINES - 4 and x >= curses.COLS - 5: - # Open next emoji page - self.emoji_panel_page += 1 - elif self.emoji_panel_page > 1 and y == curses.LINES - curses.LINES // 2 - 1 \ - and x >= curses.COLS - 5: - # Open previous emoji page - self.emoji_panel_page -= 1 - elif self.emoji_panel_page > 0 and y >= curses.LINES // 2 - 1 and x >= curses.COLS // 2 - 1: - pad_y, pad_x = y - (curses.LINES - curses.LINES // 2) + 1, \ - (x - (curses.COLS - curses.COLS // 3) + 1) // 2 - # Click on an emoji on the pad to autocomplete an emoji - self.click_on_emoji_pad(pad_y, pad_x) - - def click_on_emoji_pad(self, pad_y: int, pad_x: int) -> None: - """ - The emoji pad contains the list of all available emojis. - Clicking on a emoji auto-complete the emoji in the input pad. - """ - import emoji - from emoji import unicode_codes - - height, width = self.emoji_pad.getmaxyx() - height -= 1 - width -= 1 - - emojis = list(unicode_codes.UNICODE_EMOJI) - emojis = [c for c in emojis if len(c) == 1] - size = (height - 2) * (width - 4) // 2 - page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size] - index = pad_y * (width - 4) // 2 + pad_x - char = page[index] - if char: - demojized = emoji.demojize(char) - if char != demojized: - for c in reversed(demojized): - curses.ungetch(c) - - def handle_command(self, command: str) -> None: # noqa: C901 - """ - The user sent a command. We analyse it and process what is needed. - """ - def resolve_address(address: str) -> str: - # Resolve address - try: - # Resolve DNS as an IPv6 - return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0] - except socket.gaierror: - # This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping - # to compute a valid IPv6. - # See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4 - try: - return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0] - except socket.gaierror: - raise ValueError(f"{address} is not resolvable") - - def resolve_port(port: str) -> int: - try: - port = int(port) - if not 1 <= port <= 65565: - raise ValueError - return port - except ValueError: - raise ValueError(f"{port} is not a valid port") - - args = command.split(" ") - command, args = args[0].lower(), args[1:] - if command == "help" or command == "usage": - self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True) - self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours", - italic=False, ignore_debug=True) - self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour", - italic=False, ignore_debug=True) - self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour", - italic=False, ignore_debug=True) - self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour", - italic=False, ignore_debug=True) - self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.", - italic=False, ignore_debug=True) - self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.", - italic=False, ignore_debug=True) - self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True) - self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True) - self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True) - elif command == "connect": - if len(args) != 2: - self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True) - return - try: - address, port = resolve_address(args[0]), resolve_port(args[1]) - except ValueError as e: - self.add_system_message(str(e), ignore_debug=True) - return - - if (address, port) in self.hazelnuts: - self.add_system_message("There is already a known client with this address.", ignore_debug=True) - return - hazelnut = self.new_hazel(address, port) - self.hazelnuts[(address, port)] = hazelnut - self.add_system_message(f"Potential client successfully added! You can send a hello by running " - f"\"/hello {address} {port}\".", ignore_debug=True) - elif command == "hello": - if len(args) != 2: - self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True) - return - try: - address, port = resolve_address(args[0]), resolve_port(args[1]) - except ValueError as e: - self.add_system_message(str(e), ignore_debug=True) - return - - if (address, port) not in self.hazelnuts: - self.add_system_message("This client is unknown. Please register it by running " - f"\"/connect {address} {port}\"", ignore_debug=True) - return - - hazelnut = self.find_hazelnut(address, port) - self.send_packet(hazelnut, Packet.construct(HelloTLV.construct(8, self))) - - self.add_system_message("Hello successfully sent!", ignore_debug=True) - elif command == "unban": - if len(args) != 2: - self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True) - return - try: - address, port = resolve_address(args[0]), resolve_port(args[1]) - except ValueError as e: - self.add_system_message(str(e), ignore_debug=True) - return - - if (address, port) not in self.hazelnuts: - self.add_system_message("This client is unknown. Please register it by running " - f"\"/connect {address} {port}\"", ignore_debug=True) - return - - hazelnut = self.find_hazelnut(address, port) - hazelnut.errors = 0 - - self.add_system_message("The client is unbanned.", ignore_debug=True) - elif command == "info": - if len(args) > 2: - self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True) - return - - if not args: - hazelnuts = [self] - elif len(args) == 2: - try: - address, port = resolve_address(args[0]), resolve_port(args[1]) - except ValueError as e: - self.add_system_message(str(e), ignore_debug=True) - return - - if (address, port) not in self.hazelnuts: - self.add_system_message("This client is unknown. Please register it by running " - f"\"/connect {address} {port}\"", ignore_debug=True) - return - - hazelnuts = [self.find_hazelnut(address, port)] - else: - hazelnuts = list(self.find_hazelnut_by_nickname(args[0])) - if args[0].isnumeric(): - identifier = int(args[0]) - hazelnuts.append(self.find_hazelnut_by_id(identifier)) - if not hazelnuts: - self.add_system_message("Unknown client.") - return - - for hazel in hazelnuts: - self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message("**Addresses:** " - + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), - italic=False, ignore_debug=True) - elif command == "active": - if not self.active_hazelnuts: - self.add_system_message("No active neighbour.", italic=False, ignore_debug=True) - return - - for hazel in self.active_hazelnuts: - self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message("**Addresses:** " - + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), - italic=False, ignore_debug=True) - elif command == "potential": - if not self.potential_hazelnuts: - self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True) - return - - for hazel in self.potential_hazelnuts: - self.add_system_message(f"**Identifier:** {hazel.id or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message(f"**Nickname:** {hazel.nickname or '<*unknown*>'}", - italic=False, ignore_debug=True) - self.add_system_message("**Addresses:** " - + ", ".join(f"{address}:{port}" for address, port in hazel.addresses), - italic=False, ignore_debug=True) - elif command == "debug": - self.squinnondation.debug ^= True - self.add_system_message( - "Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True) - elif command == "emojis": - self.squinnondation.no_emoji ^= True - self.add_system_message( - "Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".", - ignore_debug=True) - elif command == "markdown": - self.squinnondation.no_markdown ^= True - self.add_system_message( - "Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".", - ignore_debug=True) - else: - self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True) - - def add_message(self, msg: str) -> None: - """ - Store a new message into the history. - """ - self.history.append(msg) - if self.last_line == len(self.history) - 2: - self.last_line += 1 - - def receive_message_from(self, tlv: DataTLV, msg: str, sender_id: int, nonce: int, relay: Hazelnut) -> bool: - """ - This method is called by a DataTLV, sent by a real person. - This add the message in the history if not already done. - Returns True iff the message was not already received previously. - """ - if (sender_id, nonce) not in self.received_messages: - # If it is a new message, add it to recent_messages - d = self.make_inundation_dict() - pkt = Packet().construct(tlv) - self.recent_messages[(sender_id, nonce)] = [pkt, time.time(), d] - - # in all cases, remove the sender from the list of neighbours to be inundated - self.remove_from_inundation(relay, sender_id, nonce) - - if (sender_id, nonce) in self.received_messages: - return False - - self.add_message(msg) # for display purposes - self.received_messages[(sender_id, nonce)] = Message(msg, sender_id, nonce) - return True - - def make_inundation_dict(self) -> dict: - """ - Takes the active hazels dictionnary and returns a list of [hazel, date+random, 0] - """ - res = dict() - hazels = self.active_hazelnuts - for hazel in hazels: - if hazel.symmetric: - next_send = uniform(1, 2) - res[hazel.main_address] = [hazel, time.time() + next_send, 0] - return res - - def remove_from_inundation(self, hazel: Hazelnut, sender_id: int, nonce: int) -> None: - """ - Remove the sender from the list of neighbours to be inundated - """ - if (sender_id, nonce) in self.recent_messages: - # If a peer is late in its acknowledgement, the absence of the previous if causes an error. - for addr in hazel.addresses: - self.recent_messages[(sender_id, nonce)][2].pop(addr, None) - - if not self.recent_messages[(sender_id, nonce)][2]: # If dictionnary is empty, remove the message - self.recent_messages.pop((sender_id, nonce), None) - - def clean_inundation(self) -> None: - """ - Remove messages which are overdue (older than 2 minutes) from the inundation dictionnary. - """ - for key in self.recent_messages: - if time.time() - self.recent_messages[key][1] > 120: - self.recent_messages.pop(key) - - def main_inundation(self) -> None: - """ - The main inundation function. - """ - for key in self.recent_messages: - k = list(self.recent_messages[key][2].keys()) - for key2 in k: - if time.time() >= self.recent_messages[key][2][key2][1]: - self.add_system_message(f"inundating {self.recent_messages[key][2][key2][0].id} with message {key}") - - # send the packet if it is overdue - self.send_packet(self.recent_messages[key][2][key2][0], self.recent_messages[key][0]) - - # change the time until the next send - a = self.recent_messages[key][2][key2][2] - self.recent_messages[key][2][key2][2] = a + 1 - next_send = uniform(2 ** (a - 1), 2 ** a) - self.recent_messages[key][2][key2][1] = time.time() + next_send - - if self.recent_messages[key][2][key2][2] >= 5: # the neighbour is not reactive enough - gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, f"{self.id} No acknowledge") - pkt = Packet().construct(gatlv) - hazelnut = self.recent_messages[key][2][key2][0] - self.send_packet(hazelnut, pkt) - self.recent_messages[key][2].pop(key2) - - def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None: - """ - Add a new system log message. - """ - if self.squinnondation.debug or ignore_debug: - return self.add_message( - f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}") - - def print_markdown(self, pad: Any, y: int, x: int, msg: str, - bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int: - """ - Parse a markdown-formatted text and format the text as bold, italic or text text. - ***text***: bold, italic - **text**: bold - *text*: italic - __text__: underline - _text_: italic - ~~text~~: strikethrough - """ - # Replace :emoji_name: by the good emoji - if not self.squinnondation.no_emoji: - import emoji - msg = emoji.emojize(msg, use_aliases=True) - - if self.squinnondation.no_markdown: - pad.addstr(y, x, msg) - return len(msg) - - underline_match = re.match("(.*)__(.*)__(.*)", msg) - if underline_match: - before, text, after = underline_match.group(1), underline_match.group(2), underline_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline) - len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, not underline) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline) - return len_before + len_mid + len_after - - italic_match = re.match("(.*)_(.*)_(.*)", msg) - if italic_match: - before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline) - len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline) - return len_before + len_mid + len_after - - bold_italic_match = re.match("(.*)\\*\\*\\*(.*)\\*\\*\\*(.*)", msg) - if bold_italic_match: - before, text, after = bold_italic_match.group(1), bold_italic_match.group(2),\ - bold_italic_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike) - len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, not italic, underline, strike) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike) - return len_before + len_mid + len_after - - bold_match = re.match("(.*)\\*\\*(.*)\\*\\*(.*)", msg) - if bold_match: - before, text, after = bold_match.group(1), bold_match.group(2), bold_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike) - len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, italic, underline, strike) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike) - return len_before + len_mid + len_after - - italic_match = re.match("(.*)\\*(.*)\\*(.*)", msg) - if italic_match: - before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike) - len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline, strike) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike) - return len_before + len_mid + len_after - - strike_match = re.match("(.*)~~(.*)~~(.*)", msg) - if strike_match: - before, text, after = strike_match.group(1), strike_match.group(2), strike_match.group(3) - len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike) - len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, underline, not strike) - len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike) - return len_before + len_mid + len_after - - size = len(msg) - - attrs = 0 - attrs |= curses.A_BOLD if bold else 0 - attrs |= curses.A_ITALIC if italic else 0 - attrs |= curses.A_UNDERLINE if underline else 0 - if strike: - msg = "".join(c + "\u0336" for c in msg) - - remaining_lines = curses.LINES - 3 - (y + x // (curses.COLS - 1)) + 1 - if remaining_lines > 0: - # Don't print the end of the line if it is too long - space_left_on_line = (curses.COLS - 2) - (x % (curses.COLS - 1)) - msg = msg[:space_left_on_line + max(0, (curses.COLS - 1) * (remaining_lines - 1))] - if msg: - pad.addstr(y + x // (curses.COLS - 1), x % (curses.COLS - 1), msg, attrs) - return size - - def refresh_history(self) -> None: - """ - Rewrite the history of the messages. - """ - self.refresh_lock.acquire() - - y, x = self.squinnondation.screen.getmaxyx() - if curses.is_term_resized(curses.LINES, curses.COLS): - curses.resizeterm(y, x) - self.history_pad.resize(curses.LINES - 2, curses.COLS - 1) - self.input_pad.resize(1, curses.COLS - 1) - - self.history_pad.erase() - - y_offset = 0 - for i, msg in enumerate(self.history[max(0, self.last_line - curses.LINES + 3):self.last_line + 1]): - if i + y_offset > curses.LINES - 3: - break - - msg = re.sub("([^:]*): (.*)", "<\\1> \\2", msg, 1) - - if not re.match("<.*> .*", msg): - msg = " " + msg - match = re.match("<(.*)> (.*)", msg) - nickname = match.group(1) - msg = match.group(2) - full_msg = f"<{nickname}> {msg}" - color_id = sum(ord(c) for c in nickname) % 6 + 1 - true_width = self.print_markdown(self.history_pad, i + y_offset, 0, full_msg) - self.history_pad.addstr(i + y_offset, 1, nickname, curses.A_BOLD | curses.color_pair(color_id + 1)) - y_offset += true_width // (curses.COLS - 1) - self.history_pad.refresh(0, 0, 0, 0, curses.LINES - 2, curses.COLS) - - self.refresh_lock.release() - - def refresh_input(self) -> None: - """ - Redraw input line. Must not be called while the message is not sent. - """ - self.refresh_lock.acquire() - - self.input_pad.erase() - color_id = sum(ord(c) for c in self.nickname) % 6 + 1 - self.input_pad.addstr(0, 0, "<") - self.input_pad.addstr(0, 1, self.nickname, curses.A_BOLD | curses.color_pair(color_id + 1)) - self.input_pad.addstr(0, 1 + len(self.nickname), "> ") - msg = self.input_buffer - if self.input_index >= curses.COLS - len(self.nickname) - 7: - msg = msg[self.input_index - (curses.COLS - len(self.nickname) - 7):self.input_index] - msg = msg[:curses.COLS - len(self.nickname) - 7] - self.input_pad.addstr(0, 3 + len(self.nickname), msg) - if not self.squinnondation.no_emoji: - self.input_pad.addstr(0, self.input_pad.getmaxyx()[1] - 3, "πŸ˜€") - self.input_pad.refresh(0, 0, curses.LINES - 1, 0, curses.LINES - 1, curses.COLS - 1) - - self.refresh_lock.release() - - def refresh_emoji_pad(self) -> None: - """ - Display the emoji pad if necessary. - """ - if self.squinnondation.no_emoji: - return - - from emoji import unicode_codes - - self.refresh_lock.acquire() - - self.emoji_pad.erase() - - if self.emoji_panel_page > 0: - height, width = curses.LINES // 2, curses.COLS // 3 - self.emoji_pad.resize(height + 1, width + 1) - self.emoji_pad.addstr(0, 0, "┏" + (width - 2) * "━" + "β”“") - self.emoji_pad.addstr(0, (width - 14) // 2, " == EMOJIS == ") - for i in range(1, height): - self.emoji_pad.addstr(i, 0, "┃" + (width - 2) * " " + "┃") - self.emoji_pad.addstr(height - 1, 0, "β”—" + (width - 2) * "━" + "β”›") - - emojis = list(unicode_codes.UNICODE_EMOJI) - emojis = [c for c in emojis if len(c) == 1] - size = (height - 2) * (width - 4) // 2 - page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size] - - if self.emoji_panel_page != 1: - self.emoji_pad.addstr(1, width - 2, "⬆") - if len(page) == size: - self.emoji_pad.addstr(height - 2, width - 2, "⬇") - - for i in range(height - 2): - for j in range((width - 4) // 2 + 1): - index = i * (width - 4) // 2 + j - if index < len(page): - self.emoji_pad.addstr(i + 1, 2 * j + 1, page[index]) - - self.emoji_pad.refresh(0, 0, curses.LINES - height - 2, curses.COLS - width - 2, - curses.LINES - 2, curses.COLS - 2) - - self.refresh_lock.release() - - def potential_to_contact(self) -> list: - """ - Returns a list of hazelnuts the squirrel should contact if it does - not have enough symmetric neighbours. - """ - res = [] - val = list(self.potential_hazelnuts) - lp = len(val) - - for i in range(min(lp, max(0, self.minNS - self.nbNS))): - a = randint(0, lp - 1) - res.append(val[a]) - return res - - def send_hello(self) -> None: - """ - Sends a long HelloTLV to all active neighbours. - """ - for hazelnut in self.active_hazelnuts: - htlv = HelloTLV().construct(16, self, hazelnut) - pkt = Packet().construct(htlv) - self.send_packet(hazelnut, pkt) - - def verify_activity(self) -> None: - """ - All neighbours that have not sent a HelloTLV in the last 2 - minutes are considered not active. - """ - val = list(self.active_hazelnuts) # create a copy because the dict size will change - - for hazelnut in val: - if time.time() - hazelnut.last_hello_time > 2 * 60: - gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, "you did not talk to me") - pkt = Packet().construct(gatlv) - self.send_packet(hazelnut, pkt) - hazelnut.active = False - self.update_hazelnut_table(hazelnut) - - def update_hazelnut_table(self, hazelnut: Hazelnut) -> None: - """ - We insert the hazelnut into our table of clients. - If there is a collision with the address / the ID, then we merge clients into a unique one. - """ - for addr in hazelnut.addresses: - if addr in self.hazelnuts: - # Merge with the previous hazel - old_hazel = self.hazelnuts[addr] - hazelnut.merge(old_hazel) - self.hazelnuts[addr] = hazelnut - - for other_hazel in list(self.hazelnuts.values()): - if other_hazel.id == hazelnut.id > 0 and other_hazel != hazelnut: - # The hazelnut with the same id is known as a different address. We merge everything - hazelnut.merge(other_hazel) - - def send_neighbours(self) -> None: - """ - Update the number of symmetric neighbours and - send all neighbours NeighbourTLV - """ - nb_ns = 0 - # could send the same to all neighbour, but it means that neighbour - # A could receive a message with itself in it -> if the others do not pay attention, trouble - for hazelnut in self.active_hazelnuts: - if time.time() - hazelnut.last_long_hello_time <= 2 * 60: - nb_ns += 1 - hazelnut.symmetric = True - ntlv = NeighbourTLV().construct(*hazelnut.main_address) - pkt = Packet().construct(ntlv) - for destination in self.active_hazelnuts: - if destination.id != hazelnut.id: - self.send_packet(destination, pkt) - else: - hazelnut.symmetric = False - self.nbNS = nb_ns - - def leave(self) -> None: - """ - The program is exited. We send a GoAway to our neighbours, then close the program. - """ - # Last inundation - self.main_inundation() - self.clean_inundation() - - # Broadcast a GoAway - gatlv = GoAwayTLV().construct(GoAwayType.EXIT, "I am leaving! Good bye!") - pkt = Packet.construct(gatlv) - for hazelnut in self.active_hazelnuts: - self.send_packet(hazelnut, pkt) - - exit(0) - - -class Worm(Thread): - """ - The worm is the hazel listener. - It always waits for an incoming packet, then it treats it, and continues to wait. - It is in a dedicated thread. - """ - def __init__(self, squirrel: Squirrel, *args, **kwargs): - super().__init__(*args, **kwargs) - self.squirrel = squirrel - - def run(self) -> None: - while True: - try: - pkt, hazelnut = self.squirrel.receive_packet() - except ValueError as error: - self.squirrel.add_system_message("An error occurred while receiving a packet: {}".format(error)) - self.squirrel.refresh_history() - self.squirrel.refresh_input() - else: - if hazelnut.banned: - # Ignore banned hazelnuts - continue - - for tlv in pkt.body: - tlv.handle(self.squirrel, hazelnut) - self.squirrel.refresh_history() - self.squirrel.refresh_input() - - -class HazelManager(Thread): - """ - A process to cleanly manage the squirrel's neighbours - """ - def __init__(self, squirrel: Squirrel, *args, **kwargs): - super().__init__(*args, **kwargs) - self.squirrel = squirrel - self.last_potential = 0 - self.last_check = 0 - self.last_neighbour = 0 - - htlv = HelloTLV().construct(8, self.squirrel) - pkt = Packet().construct(htlv) - self.hellopkt = pkt - - def run(self) -> None: - while True: - # First part of neighbour management: ensure the squirrel has enough - # symmetric neighbours. - if time.time() - self.last_potential > 30: - to_contact = self.squirrel.potential_to_contact() - - for hazel in to_contact: - self.squirrel.send_packet(hazel, self.hellopkt) - self.last_potential = time.time() - - # Second part: send long HelloTLVs to neighbours every 30 seconds - if time.time() - self.last_check > 30: - self.squirrel.add_system_message(f"I have {len(self.squirrel.active_hazelnuts)} friends") - self.squirrel.send_hello() - self.last_check = time.time() - - # Third part: get rid of inactive neighbours - self.squirrel.verify_activity() - - # Fourth part: verify symmetric neighbours and send NeighbourTLV every minute - if time.time() - self.last_neighbour > 60: - self.squirrel.send_neighbours() - self.last_neighbour = time.time() - - # Avoid infinite loops - time.sleep(1) - - -class Inondator(Thread): - """ - A process to manage the inondation - """ - def __init__(self, squirrel: Squirrel, *args, **kwargs): - super().__init__(*args, **kwargs) - self.squirrel = squirrel - self.last_check = 0 - - def run(self) -> None: - while True: - # clean the dictionnary - if time.time() - self.last_check > 30: - self.squirrel.clean_inundation() - self.last_check = time.time() - - # inundate - self.squirrel.main_inundation() - - # Avoid infinite loops - time.sleep(1) - - -class Message: - """ - This class symbolises the data sent by a real client, excluding system messages. - This is useful to check unicity or to save and load messages. - """ - content: str - # TODO: Replace the id by the good (potential) hazel - sender_id: int - nonce: int - created_at: datetime - - def __init__(self, content: str, sender_id: int, nonce: int, created_at: datetime = None): - self.content = content - self.sender_id = sender_id - self.nonce = nonce - self.created_at = created_at or datetime.now() diff --git a/squinnondation/peers.py b/squinnondation/peers.py index 584b86f..102cd70 100644 --- a/squinnondation/peers.py +++ b/squinnondation/peers.py @@ -120,7 +120,8 @@ class User(Peer): self.emoji_pad = curses.newpad(18, 12) self.emoji_panel_page = -1 - curses.init_color(curses.COLOR_WHITE, 1000, 1000, 1000) + if curses.can_change_color(): + curses.init_color(curses.COLOR_WHITE, 1000, 1000, 1000) for i in range(curses.COLOR_BLACK + 1, curses.COLOR_WHITE): curses.init_pair(i + 1, i, curses.COLOR_BLACK)