squinnondation/squinnondation/peers.py

1217 lines
50 KiB
Python

# Copyright (C) 2020 by eichhornchen, ÿnérant
# SPDX-License-Identifier: GPL-3.0-or-later
from datetime import datetime
from random import randint, uniform
from typing import Any, Tuple, Generator
from threading import Thread, Semaphore
import curses
import re
import socket
import time
import struct
from .messages import Packet, DataTLV, HelloTLV, GoAwayTLV, GoAwayType, NeighbourTLV, WarningTLV
class Peer:
"""
A connected peer, with its socket.
"""
def __init__(self, nickname: str = None, address: str = "localhost", port: int = 2500):
self.nickname = nickname
self.id = -1
self.last_hello_time = 0
self.last_long_hello_time = 0
self.symmetric = False
self.active = False
self.errors = 0
try:
# Resolve DNS as an IPv6
address = socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0]
except socket.gaierror:
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
address = "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0]
self.addresses = set()
self.addresses.add((address, port))
self.main_address = (address, port)
@property
def potential(self) -> bool:
return not self.active and not self.banned and not isinstance(self, User)
@potential.setter
def potential(self, value: bool) -> None:
self.active = not value
@property
def banned(self) -> bool:
"""
If a client send more than 5 invalid packets, we don't trust it anymore.
"""
return self.errors >= 5 or isinstance(self, User)
def __repr__(self):
return self.nickname or str(self.id) or str(self.main_address)
def __str__(self):
return repr(self)
def merge(self, other: "Peer") -> "Peer":
"""
Merge two peers that are actually the same client.
The symmetric and active properties are kept from the original client.
"""
self.errors = max(self.errors, other.errors)
self.last_hello_time = max(self.last_hello_time, other.last_hello_time)
self.last_long_hello_time = max(self.last_hello_time, other.last_long_hello_time)
self.addresses.update(self.addresses)
self.addresses.update(other.addresses)
if self.main_address[1] == 1212: # always prefer the non-multicast address
self.main_address = other.main_address
elif other.main_address[1] == 1212:
other.main_address = self.main_address
self.id = self.id if self.id > 0 else other.id
return self
class User(Peer):
"""
The user of the program. It can speak with other peers.
"""
def __init__(self, instance: Any, nickname: str):
super().__init__(nickname, instance.bind_address, instance.bind_port)
# Random identifier on 64 bits
self.id = randint(0, 1 << 64 - 1)
self.incr_nonce = 0
# Create UDP socket
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
# Bind the socket
self.socket.bind(self.main_address)
self.squinnondation = instance
if self.squinnondation.multicast:
# Create multicast socket
self.multicast_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
# Allows address to be reused
self.multicast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.multicast_socket.bind(('', 1212)) # listen on all interfaces?
# To join the group, we need to give setsockopt a binary packed representation of the multicast
# group's address and on what interfaces we will listen (here all)
mreq = struct.pack("16s15s", socket.inet_pton(socket.AF_INET6, "ff02::4242:4242"), bytes(socket.INADDR_ANY))
# The string "16s15s" corresponds to the packing options: here it packs the arguments into a 16-byte
# string and a 15-byte string.
self.multicast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
self.input_buffer = ""
self.input_index = 0
self.last_line = -1
# Lock the refresh function in order to avoid concurrent refresh
self.refresh_lock = Semaphore()
# Lock functions that can be used by two threads to avoid concurrent writing
self.data_lock = Semaphore()
self.history = []
self.received_messages = dict()
self.recent_messages = dict() # of the form [Pkt(DataTLV), date of first reception,
# dict(neighbour, date of the next send, nb of times it has already been sent)]
self.history_pad = curses.newpad(curses.LINES - 2, curses.COLS)
self.input_pad = curses.newpad(1, curses.COLS)
self.emoji_pad = curses.newpad(18, 12)
self.emoji_panel_page = -1
if curses.can_change_color():
curses.init_color(curses.COLOR_WHITE, 1000, 1000, 1000)
for i in range(curses.COLOR_BLACK + 1, curses.COLOR_WHITE):
curses.init_pair(i + 1, i, curses.COLOR_BLACK)
# dictionnaries of neighbours
self.neighbours = {self.main_address: self}
self.nbNS = 0
self.minNS = 3 # minimal number of symmetric neighbours a user needs to have.
self.listener = Listener(self)
self.neighbour_manager = PeerManager(self)
self.inondator = Inondator(self)
if self.squinnondation.multicast:
self.multicastlistener = Multicastlistener(self)
self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True)
self.add_system_message(f"I am {self.id}")
def new_peer(self, address: str, port: int) -> Peer:
"""
Returns a new peer (with no id nor nickname)
"""
peer = Peer(address=address, port=port)
return peer
@property
def active_peers(self) -> set:
return set(peer for peer in self.neighbours.values() if peer.active)
@property
def potential_peers(self) -> set:
return set(peer for peer in self.neighbours.values() if peer.potential)
def find_peer(self, address: str, port: int) -> Peer:
"""
Translate an address into a peer. If this peer does not exist,
creates a new peer.
"""
if (address, port) in self.neighbours:
return self.neighbours[(address, port)]
self.data_lock.acquire(timeout=1)
peer = Peer(address=address, port=port)
self.neighbours[(address, port)] = peer
self.data_lock.release()
return peer
def find_peer_by_id(self, peer_id: int) -> Peer:
"""
Retrieve the peer that is known by its id. Return None if it is unknown.
The given identifier must be positive.
"""
if peer_id > 0:
for peer in self.neighbours.values():
if peer.id == peer_id:
return peer
def find_peer_by_nickname(self, nickname: str) -> Generator[Peer, Any, None]:
"""
Retrieve the peers that are known by their nicknames.
"""
for peer in self.neighbours.values():
if peer.nickname == nickname:
yield peer
def send_packet(self, client: Peer, pkt: Packet) -> int:
"""
Send a formatted packet to a client.
"""
if len(pkt) > 1024:
# The packet is too large to be sent by the protocol. We split the packet in subpackets.
return sum(self.send_packet(client, subpkt) for subpkt in pkt.split(1024))
res = self.send_raw_data(client, pkt.marshal())
return res
def send_raw_data(self, client: Peer, data: bytes) -> int:
"""
Send a raw packet to a client.
"""
return self.socket.sendto(data, client.main_address)
def receive_packet(self) -> Tuple[Packet, Peer]:
"""
Receive a packet from the socket and translate it into a Python object.
Warning: the process is blocking, it should be ran inside a dedicated thread.
"""
data, addr = self.receive_raw_data()
peer = self.find_peer(addr[0], addr[1])
if peer.banned:
# The client already sent errored packets
return Packet.construct(), peer
try:
pkt = Packet.unmarshal(data)
except ValueError as error:
# The packet contains an error. We memorize it and warn the other user.
peer.errors += 1
self.send_packet(peer, Packet.construct(WarningTLV.construct(
f"An error occured while reading your packet: {error}")))
if peer.banned:
self.send_packet(peer, Packet.construct(WarningTLV.construct(
"You got banned since you sent too much errored packets.")))
raise ValueError("Client is banned since there were too many errors.", error)
raise error
else:
return pkt, peer
def receive_raw_data(self) -> Tuple[bytes, Any]:
"""
Receive a packet from the socket.
"""
return self.socket.recvfrom(1024)
def start_threads(self) -> None:
"""
Start asynchronous threads.
"""
# Kill subthreads when exitting the program
self.listener.setDaemon(True)
self.neighbour_manager.setDaemon(True)
self.inondator.setDaemon(True)
self.listener.start()
self.neighbour_manager.start()
self.inondator.start()
if self.squinnondation.multicast:
self.multicastlistener.setDaemon(True)
self.multicastlistener.start()
def wait_for_key(self) -> None:
"""
Infinite loop where we are waiting for a key of the user.
"""
while True:
self.refresh_history()
self.refresh_input()
self.refresh_emoji_pad()
if not self.squinnondation.no_emoji:
self.refresh_emoji_pad()
try:
key = self.squinnondation.screen.get_wch(
curses.LINES - 1, min(3 + len(self.nickname) + self.input_index, curses.COLS - 4))
except curses.error:
continue
except KeyboardInterrupt:
# Exit the program and send GoAway to neighbours
self.leave()
return
if key == curses.KEY_MOUSE:
try:
_, x, y, _, attr = curses.getmouse()
self.handle_mouse_click(y, x, attr)
continue
except curses.error:
# This is not a valid click
continue
self.handle_key_pressed(key)
def handle_key_pressed(self, key: str) -> None: # noqa: C901
"""
Process the key press from the user.
"""
if key == "\x7f" or key == curses.KEY_BACKSPACE: # backspace
# delete character at the good position
if self.input_index:
self.input_index -= 1
self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
return
elif key == curses.KEY_DC: # key
if self.input_index < len(self.input_buffer):
self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
return
elif key == curses.KEY_LEFT:
# Navigate in the message to the left
self.input_index = max(0, self.input_index - 1)
return
elif key == curses.KEY_RIGHT:
# Navigate in the message to the right
self.input_index = min(len(self.input_buffer), self.input_index + 1)
return
elif key == curses.KEY_UP:
# Scroll up in the history
self.last_line = min(max(curses.LINES - 3, self.last_line - 1), len(self.history) - 1)
return
elif key == curses.KEY_DOWN:
# Scroll down in the history
self.last_line = min(len(self.history) - 1, self.last_line + 1)
return
elif key == curses.KEY_PPAGE:
# Page up in the history
self.last_line = min(max(curses.LINES - 3, self.last_line - (curses.LINES - 3)), len(self.history) - 1)
return
elif key == curses.KEY_NPAGE:
# Page down in the history
self.last_line = min(len(self.history) - 1, self.last_line + (curses.LINES - 3))
return
elif key == curses.KEY_HOME:
# Place the cursor at the beginning of the typing word
self.input_index = 0
return
elif key == curses.KEY_END:
# Place the cursor at the end of the typing word
self.input_index = len(self.input_buffer)
return
elif isinstance(key, int):
# Unmanaged complex key
return
elif key != "\n":
# Insert the pressed key in the current message
new_buffer = self.input_buffer[:self.input_index] + key + self.input_buffer[self.input_index:]
if len(DataTLV.construct(f"{self.nickname}: {new_buffer}", None)) > 255 - 8 - 4:
# The message is too long to be sent once. We don't allow the user to type any other character.
curses.beep()
return
self.input_buffer = new_buffer
self.input_index += 1
return
# Send message to neighbours
msg = self.input_buffer
self.input_buffer = ""
self.input_index = 0
if not msg:
return
if msg.startswith("/"):
return self.handle_command(msg[1:])
msg = f"{self.nickname}: {msg}"
self.add_message(msg)
pkt = Packet.construct(DataTLV.construct(msg, self))
for peer in self.active_peers:
self.send_packet(peer, pkt)
def handle_mouse_click(self, y: int, x: int, attr: int) -> None:
"""
The user clicks on the screen, at coordinates (y, x).
According to the position, we can indicate what can be done.
"""
if not self.squinnondation.no_emoji:
if y == curses.LINES - 1 and x >= curses.COLS - 3:
# Click on the emoji, open or close the emoji pad
self.emoji_panel_page *= -1
elif self.emoji_panel_page > 0 and y == curses.LINES - 4 and x >= curses.COLS - 5:
# Open next emoji page
self.emoji_panel_page += 1
elif self.emoji_panel_page > 1 and y == curses.LINES - curses.LINES // 2 - 1 \
and x >= curses.COLS - 5:
# Open previous emoji page
self.emoji_panel_page -= 1
elif self.emoji_panel_page > 0 and y >= curses.LINES // 2 - 1 and x >= curses.COLS // 2 - 1:
pad_y, pad_x = y - (curses.LINES - curses.LINES // 2) + 1, \
(x - (curses.COLS - curses.COLS // 3) + 1) // 2
# Click on an emoji on the pad to autocomplete an emoji
self.click_on_emoji_pad(pad_y, pad_x)
def click_on_emoji_pad(self, pad_y: int, pad_x: int) -> None:
"""
The emoji pad contains the list of all available emojis.
Clicking on a emoji auto-complete the emoji in the input pad.
"""
import emoji
from emoji import unicode_codes
height, width = self.emoji_pad.getmaxyx()
height -= 1
width -= 1
emojis = list(unicode_codes.UNICODE_EMOJI)
emojis = [c for c in emojis if len(c) == 1]
size = (height - 2) * (width - 4) // 2
page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size]
index = pad_y * (width - 4) // 2 + pad_x
char = page[index]
if char:
demojized = emoji.demojize(char)
if char != demojized:
for c in reversed(demojized):
curses.ungetch(c)
def handle_command(self, command: str) -> None:
"""
The user sent a command. We analyse it and process what is needed.
"""
def resolve_address(address: str) -> str:
# Resolve address
try:
# Resolve DNS as an IPv6
return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0]
except socket.gaierror:
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
try:
return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0]
except socket.gaierror:
raise ValueError(f"{address} is not resolvable")
def resolve_port(port: str) -> int:
try:
port = int(port)
if not 1 <= port <= 65565:
raise ValueError
return port
except ValueError:
raise ValueError(f"{port} is not a valid port")
args = command.split(" ")
command, args = args[0].lower(), args[1:]
if command == "help" or command == "usage":
self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True)
self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours",
italic=False, ignore_debug=True)
self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.",
italic=False, ignore_debug=True)
self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.",
italic=False, ignore_debug=True)
self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True)
self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True)
self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True)
elif command == "connect":
if len(args) != 2:
self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) in self.neighbours:
self.add_system_message("There is already a known client with this address.", ignore_debug=True)
return
self.data_lock.acquire(timeout=1)
peer = self.new_peer(address, port)
self.neighbours[(address, port)] = peer
self.data_lock.release()
self.add_system_message(f"Potential client successfully added! You can send a hello by running "
f"\"/hello {address} {port}\".", ignore_debug=True)
elif command == "hello":
if len(args) != 2:
self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
peer = self.find_peer(address, port)
self.send_packet(peer, Packet.construct(HelloTLV.construct(8, self)))
self.add_system_message("Hello successfully sent!", ignore_debug=True)
elif command == "unban":
if len(args) != 2:
self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
peer = self.find_peer(address, port)
peer.errors = 0
self.add_system_message("The client is unbanned.", ignore_debug=True)
elif command == "info":
if len(args) > 2:
self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True)
return
if not args:
self.data_lock.acquire(timeout=1)
peers = [self]
self.data_lock.release()
elif len(args) == 2:
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
self.data_lock.acquire(timeout=1)
peers = [self.find_peer(address, port)]
self.data_lock.release()
else:
peers = list(self.find_peer_by_nickname(args[0]))
if args[0].isnumeric():
identifier = int(args[0])
peers.append(self.find_peer_by_id(identifier))
if not peers:
self.add_system_message("Unknown client.")
return
for peer in peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "active":
if not self.active_peers:
self.add_system_message("No active neighbour.", italic=False, ignore_debug=True)
return
for peer in self.active_peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "potential":
if not self.potential_peers:
self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True)
return
for peer in self.potential_peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "debug":
self.squinnondation.debug ^= True
self.add_system_message(
"Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True)
elif command == "emojis":
self.squinnondation.no_emoji ^= True
self.add_system_message(
"Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".",
ignore_debug=True)
elif command == "markdown":
self.squinnondation.debug ^= True
self.add_system_message(
"Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".",
ignore_debug=True)
else:
self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True)
def add_message(self, msg: str) -> None:
"""
Store a new message into the history.
"""
self.history.append(msg)
if self.last_line == len(self.history) - 2:
self.last_line += 1
def receive_message_from(self, tlv: DataTLV, msg: str, sender_id: int, nonce: int, relay: Peer) -> bool:
"""
This method is called by a DataTLV, sent by a real person.
This add the message in the history if not already done.
Returns True iff the message was not already received previously.
"""
self.data_lock.acquire(timeout=1)
if (sender_id, nonce) not in self.received_messages:
# If it is a new message, add it to recent_messages
d = self.make_inundation_dict()
pkt = Packet().construct(tlv)
self.recent_messages[(sender_id, nonce)] = [pkt, time.time(), d]
self.data_lock.release()
# in all cases, remove the sender from the list of neighbours to be inundated
self.remove_from_inundation(relay, sender_id, nonce)
if (sender_id, nonce) in self.received_messages:
return False
self.data_lock.acquire(timeout=1)
self.add_message(msg) # for display purposes
self.received_messages[(sender_id, nonce)] = Message(msg, sender_id, nonce)
self.data_lock.release()
return True
def make_inundation_dict(self) -> dict:
"""
Takes the active peers dictionnary and returns a list of [peer, date+random, 0]
"""
res = dict()
peers = self.active_peers
for peer in peers:
if peer.symmetric:
next_send = uniform(1, 2)
res[peer.main_address] = [peer, time.time() + next_send, 0]
return res
def remove_from_inundation(self, peer: Peer, sender_id: int, nonce: int) -> None:
"""
Remove the sender from the list of neighbours to be inundated
"""
self.data_lock.acquire(timeout=1)
if (sender_id, nonce) in self.recent_messages:
# If a peer is late in its acknowledgement, the absence of the previous if causes an error.
for addr in peer.addresses:
self.recent_messages[(sender_id, nonce)][2].pop(addr, None)
if not self.recent_messages[(sender_id, nonce)][2]: # If dictionnary is empty, remove the message
self.recent_messages.pop((sender_id, nonce), None)
self.data_lock.release()
def clean_inundation(self) -> None:
"""
Remove messages which are overdue (older than 2 minutes) from the inundation dictionnary.
"""
for key in self.recent_messages.copy():
if time.time() - self.recent_messages[key][1] > 120:
self.data_lock.acquire(timeout=1)
self.recent_messages.pop(key)
self.data_lock.release()
def main_inundation(self) -> None:
"""
The main inundation function.
"""
self.data_lock.acquire(timeout=1)
for key in self.recent_messages.copy():
k = list(self.recent_messages[key][2].keys())
for key2 in k:
if time.time() >= self.recent_messages[key][2][key2][1]:
self.add_system_message(f"inundating {self.recent_messages[key][2][key2][0].id} with message {key}")
# send the packet if it is overdue
self.send_packet(self.recent_messages[key][2][key2][0], self.recent_messages[key][0])
# change the time until the next send
a = self.recent_messages[key][2][key2][2]
self.recent_messages[key][2][key2][2] = a + 1
next_send = uniform(2 ** (a - 1), 2 ** a)
self.recent_messages[key][2][key2][1] = time.time() + next_send
if self.recent_messages[key][2][key2][2] >= 5: # the neighbour is not reactive enough
gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, f"{self.id} No acknowledge")
pkt = Packet().construct(gatlv)
peer = self.recent_messages[key][2][key2][0]
self.send_packet(peer, pkt)
self.recent_messages[key][2].pop(key2)
self.data_lock.release()
def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None:
"""
Add a new system log message.
"""
if self.squinnondation.debug or ignore_debug:
return self.add_message(
f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}")
def print_markdown(self, pad: Any, y: int, x: int, msg: str,
bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int:
"""
Parse a markdown-formatted text and format the text as bold, italic or text text.
***text***: bold, italic
**text**: bold
*text*: italic
__text__: underline
_text_: italic
~~text~~: strikethrough
"""
msg = msg.replace("\0", "")
# Replace :emoji_name: by the good emoji
if not self.squinnondation.no_emoji:
import emoji
msg = emoji.emojize(msg, use_aliases=True)
if self.squinnondation.no_markdown:
try:
pad.addstr(y, x, msg)
except curses.error:
# Should not happen
pass
return len(msg)
underline_match = re.match("(.*)__(.*)__(.*)", msg)
if underline_match:
before, text, after = underline_match.group(1), underline_match.group(2), underline_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, not underline)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline)
return len_before + len_mid + len_after
italic_match = re.match("(.*)_(.*)_(.*)", msg)
if italic_match:
before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline)
return len_before + len_mid + len_after
bold_italic_match = re.match("(.*)\\*\\*\\*(.*)\\*\\*\\*(.*)", msg)
if bold_italic_match:
before, text, after = bold_italic_match.group(1), bold_italic_match.group(2),\
bold_italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, not italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
bold_match = re.match("(.*)\\*\\*(.*)\\*\\*(.*)", msg)
if bold_match:
before, text, after = bold_match.group(1), bold_match.group(2), bold_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
italic_match = re.match("(.*)\\*(.*)\\*(.*)", msg)
if italic_match:
before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
strike_match = re.match("(.*)~~(.*)~~(.*)", msg)
if strike_match:
before, text, after = strike_match.group(1), strike_match.group(2), strike_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, underline, not strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
size = len(msg)
attrs = 0
attrs |= curses.A_BOLD if bold else 0
attrs |= curses.A_ITALIC if italic else 0
attrs |= curses.A_UNDERLINE if underline else 0
if strike:
msg = "".join(c + "\u0336" for c in msg)
remaining_lines = curses.LINES - 3 - (y + x // (curses.COLS - 1)) + 1
if remaining_lines > 0:
# Don't print the end of the line if it is too long
space_left_on_line = (curses.COLS - 2) - (x % (curses.COLS - 1))
msg = msg[:space_left_on_line + max(0, (curses.COLS - 1) * (remaining_lines - 1))]
if msg:
try:
pad.addstr(y + x // (curses.COLS - 1), x % (curses.COLS - 1), msg, attrs)
except curses.error:
# Should not happen
pass
return size
def refresh_history(self) -> None:
"""
Rewrite the history of the messages.
"""
self.refresh_lock.acquire(timeout=1)
y, x = self.squinnondation.screen.getmaxyx()
if curses.is_term_resized(curses.LINES, curses.COLS):
curses.resizeterm(y, x)
self.history_pad.resize(curses.LINES - 2, curses.COLS - 1)
self.input_pad.resize(1, curses.COLS - 1)
self.history_pad.erase()
y_offset = 0
for i, msg in enumerate(self.history[max(0, self.last_line - curses.LINES + 3):self.last_line + 1]):
if i + y_offset > curses.LINES - 3:
break
msg = re.sub("([^:]*): (.*)", "<\\1> \\2", msg, 1)
if not re.match("<.*> .*", msg):
msg = "<unknown> " + msg
match = re.match("<(.*)> (.*)", msg)
nickname = match.group(1)
msg = match.group(2)
full_msg = f"<{nickname}> {msg}"
color_id = sum(ord(c) for c in nickname) % 6 + 1
true_width = self.print_markdown(self.history_pad, i + y_offset, 0, full_msg)
self.history_pad.addstr(i + y_offset, 1, nickname, curses.A_BOLD | curses.color_pair(color_id + 1))
y_offset += true_width // (curses.COLS - 1)
self.history_pad.refresh(0, 0, 0, 0, curses.LINES - 2, curses.COLS)
self.refresh_lock.release()
def refresh_input(self) -> None:
"""
Redraw input line. Must not be called while the message is not sent.
"""
self.refresh_lock.acquire(timeout=1)
self.input_pad.erase()
color_id = sum(ord(c) for c in self.nickname) % 6 + 1
self.input_pad.addstr(0, 0, "<")
self.input_pad.addstr(0, 1, self.nickname, curses.A_BOLD | curses.color_pair(color_id + 1))
self.input_pad.addstr(0, 1 + len(self.nickname), "> ")
msg = self.input_buffer
if self.input_index >= curses.COLS - len(self.nickname) - 7:
msg = msg[self.input_index - (curses.COLS - len(self.nickname) - 7):self.input_index]
msg = msg[:curses.COLS - len(self.nickname) - 7]
self.input_pad.addstr(0, 3 + len(self.nickname), msg)
if not self.squinnondation.no_emoji:
self.input_pad.addstr(0, self.input_pad.getmaxyx()[1] - 3, "😀")
self.input_pad.refresh(0, 0, curses.LINES - 1, 0, curses.LINES - 1, curses.COLS - 1)
self.refresh_lock.release()
def refresh_emoji_pad(self) -> None:
"""
Display the emoji pad if necessary.
"""
if self.squinnondation.no_emoji:
return
from emoji import unicode_codes
self.refresh_lock.acquire(timeout=1)
self.emoji_pad.erase()
if self.emoji_panel_page > 0:
height, width = curses.LINES // 2, curses.COLS // 3
self.emoji_pad.resize(height + 1, width + 1)
self.emoji_pad.addstr(0, 0, "" + (width - 2) * "" + "")
self.emoji_pad.addstr(0, (width - 14) // 2, " == EMOJIS == ")
for i in range(1, height):
self.emoji_pad.addstr(i, 0, "" + (width - 2) * " " + "")
self.emoji_pad.addstr(height - 1, 0, "" + (width - 2) * "" + "")
emojis = list(unicode_codes.UNICODE_EMOJI)
emojis = [c for c in emojis if len(c) == 1]
size = (height - 2) * (width - 4) // 2
page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size]
if self.emoji_panel_page != 1:
self.emoji_pad.addstr(1, width - 2, "")
if len(page) == size:
self.emoji_pad.addstr(height - 2, width - 2, "")
for i in range(height - 2):
for j in range((width - 4) // 2 + 1):
index = i * (width - 4) // 2 + j
if index < len(page):
self.emoji_pad.addstr(i + 1, 2 * j + 1, page[index])
self.emoji_pad.refresh(0, 0, curses.LINES - height - 2, curses.COLS - width - 2,
curses.LINES - 2, curses.COLS - 2)
self.refresh_lock.release()
def potential_to_contact(self) -> list:
"""
Returns a list of peers the user should contact if it does
not have enough symmetric neighbours.
"""
res = []
val = list(self.potential_peers)
lp = len(val)
for i in range(min(lp, max(0, self.minNS - self.nbNS))):
a = randint(0, lp - 1)
res.append(val[a])
return res
def send_hello(self) -> None:
"""
Sends a long HelloTLV to all active neighbours.
"""
for peer in self.active_peers:
htlv = HelloTLV().construct(16, self, peer)
pkt = Packet().construct(htlv)
self.send_packet(peer, pkt)
def verify_activity(self) -> None:
"""
All neighbours that have not sent a HelloTLV in the last 2
minutes are considered not active.
"""
val = list(self.active_peers) # create a copy because the dict size will change
for peer in val:
if time.time() - peer.last_hello_time > 2 * 60:
gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, "you did not talk to me")
pkt = Packet().construct(gatlv)
self.send_packet(peer, pkt)
peer.active = False
self.update_peer_table(peer)
def update_peer_table(self, peer: Peer) -> None:
"""
We insert the peer into our table of clients.
If there is a collision with the address / the ID, then we merge clients into a unique one.
"""
self.data_lock.acquire(timeout=1)
for addr in peer.addresses:
if addr in self.neighbours:
# Merge with the previous peer
old_peer = self.neighbours[addr]
if isinstance(old_peer, User):
peer, old_peer = old_peer, peer
peer.merge(old_peer)
self.neighbours[addr] = peer
for other_peer in list(self.neighbours.values()):
if other_peer.id == peer.id > 0 and other_peer != peer:
# The peer with the same id is known as a different address. We merge everything
if isinstance(other_peer, User):
peer, old_peer = other_peer, peer
peer.merge(other_peer)
for other_addr in peer.addresses:
self.neighbours[other_addr] = peer
self.data_lock.release()
def send_neighbours(self) -> None:
"""
Update the number of symmetric neighbours and
send all neighbours NeighbourTLV
"""
nb_ns = 0
# could send the same to all neighbour, but it means that neighbour
# A could receive a message with itself in it -> if the others do not pay attention, trouble
for peer in self.active_peers:
if time.time() - peer.last_long_hello_time <= 2 * 60:
nb_ns += 1
peer.symmetric = True
ntlv = NeighbourTLV().construct(*peer.main_address)
pkt = Packet().construct(ntlv)
for destination in self.active_peers:
if destination.id != peer.id:
self.send_packet(destination, pkt)
else:
peer.symmetric = False
self.nbNS = nb_ns
def leave(self) -> None:
"""
The program is exited. We send a GoAway to our neighbours, then close the program.
"""
# Last inundation
self.data_lock.release()
self.refresh_lock.release()
self.main_inundation()
self.clean_inundation()
# Broadcast a GoAway
gatlv = GoAwayTLV().construct(GoAwayType.EXIT, "I am leaving! Good bye!")
pkt = Packet.construct(gatlv)
for peer in self.active_peers:
self.send_packet(peer, pkt)
exit(0)
def send_hello_multicast(self) -> int:
"""
Send a short hello on the multicast group.
"""
htlv = HelloTLV().construct(8, self)
pkt = Packet().construct(htlv)
res = self.send_multicast(pkt.marshal())
return res
def send_multicast(self, data: bytes) -> int:
"""
Send a packet on the multicast.
"""
return self.multicast_socket.sendto(data, ("ff02::4242:4242", 1212))
def receive_hello_multicast(self) -> Tuple[Packet, Peer]:
"""
Receive a packet from the multicast and translate it into a Python object.
"""
data, addr = self.receive_multicast()
peer = self.find_peer(addr[0], addr[1])
try:
pkt = Packet.unmarshal(data)
except ValueError as error:
# The packet contains an error. We memorize it.
peer.errors += 1
self.add_system_message("An error occured on the multicast")
raise error
else:
return pkt, peer
def receive_multicast(self) -> Tuple[bytes, Any]:
"""
Receive a packet from the socket.
"""
return self.multicast_socket.recvfrom(1024)
class Listener(Thread):
"""
It is the peer listener.
It always waits for an incoming packet, then it treats it, and continues to wait.
It is in a dedicated thread.
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def run(self) -> None:
while True:
try:
pkt, peer = self.user.receive_packet()
except ValueError as error:
self.user.add_system_message("An error occurred while receiving a packet: {}".format(error))
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
else:
if peer.banned:
# Ignore banned peers
continue
for tlv in pkt.body:
tlv.handle(self.user, peer)
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
class Multicastlistener(Thread):
"""
Used to listen on the multicast group to discover new people
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def run(self) -> None:
self.user.add_system_message("running")
while True:
try:
pkt, peer = self.user.receive_hello_multicast()
except ValueError as error:
self.user.add_system_message("An error occurred while receiving a packet: {}".format(error))
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
else:
if peer.banned:
# Ignore banned peers
continue
for tlv in pkt.body:
# We are only supposed to receive HelloTlVs via this communication mean
self.user.add_system_message(f"Via multicast {peer.addresses}:")
tlv.handle(self.user, peer)
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
class PeerManager(Thread):
"""
A process to cleanly manage the user's neighbours
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.last_potential = 0
self.last_check = 0
self.last_neighbour = 0
self.last_multicast = 0
htlv = HelloTLV().construct(8, self.user)
pkt = Packet().construct(htlv)
self.hellopkt = pkt
def run(self) -> None:
while True:
# First part of neighbour management: ensure the user has enough
# symmetric neighbours.
if time.time() - self.last_potential > 30:
to_contact = self.user.potential_to_contact()
for peer in to_contact:
self.user.send_packet(peer, self.hellopkt)
self.last_potential = time.time()
# Second part: send long HelloTLVs to neighbours every 30 seconds
if time.time() - self.last_check > 30:
self.user.add_system_message(f"I have {len(self.user.active_peers)} friends")
self.user.send_hello()
self.last_check = time.time()
# Third part: get rid of inactive neighbours
self.user.verify_activity()
# Fourth part: verify symmetric neighbours and send NeighbourTLV every minute
if time.time() - self.last_neighbour > 60:
self.user.send_neighbours()
self.last_neighbour = time.time()
if self.user.squinnondation.multicast:
# For the multicast discovery : send a hello every minute.
if time.time() - self.last_multicast > 60:
self.user.send_hello_multicast()
self.last_multicast = time.time()
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
# Avoid infinite loops
time.sleep(1)
class Inondator(Thread):
"""
A process to manage the inondation
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.last_check = 0
def run(self) -> None:
while True:
try:
# clean the dictionnary
if time.time() - self.last_check > 30:
self.user.clean_inundation()
self.last_check = time.time()
# inundate
self.user.main_inundation()
self.user.refresh_history()
self.user.refresh_input()
self.user.refresh_emoji_pad()
except Exception as e:
self.user.add_system_message(f"An error occured while inondating: {e}", ignore_debug=True)
# Avoid infinite loops
time.sleep(1)
class Message:
"""
This class symbolises the data sent by a real client, excluding system messages.
This is useful to check unicity or to save and load messages.
"""
content: str
sender_id: int
nonce: int
created_at: datetime
def __init__(self, content: str, sender_id: int, nonce: int, created_at: datetime = None):
self.content = content
self.sender_id = sender_id
self.nonce = nonce
self.created_at = created_at or datetime.now()