squinnondation/squinnondation/peers.py

1116 lines
45 KiB
Python

# Copyright (C) 2020 by eichhornchen, ÿnérant
# SPDX-License-Identifier: GPL-3.0-or-later
from datetime import datetime
from random import randint, uniform
from typing import Any, Tuple, Generator
# from ipaddress import IPv6Address
from threading import Thread, RLock
import curses
import re
import socket
import time
from .messages import Packet, DataTLV, HelloTLV, GoAwayTLV, GoAwayType, NeighbourTLV, WarningTLV
class Peer:
"""
A connected peer, with its socket.
"""
def __init__(self, nickname: str = None, address: str = "localhost", port: int = 2500):
self.nickname = nickname
self.id = -1
self.last_hello_time = 0
self.last_long_hello_time = 0
self.symmetric = False
self.active = False
self.errors = 0
self.marked_as_banned = False
try:
# Resolve DNS as an IPv6
address = socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0]
except socket.gaierror:
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
address = "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0]
self.addresses = set()
self.addresses.add((address, port))
@property
def potential(self) -> bool:
return not self.active and not self.banned
@potential.setter
def potential(self, value: bool) -> None:
self.active = not value
@property
def main_address(self) -> Tuple[str, int]:
"""
A client can have multiple addresses.
We contact it only on one of them.
"""
return list(self.addresses)[0]
@property
def banned(self) -> bool:
"""
If a client send more than 5 invalid packets, we don't trust it anymore.
"""
return self.errors >= 5 or self.marked_as_banned
def __repr__(self):
return self.nickname or str(self.id) or str(self.main_address)
def __str__(self):
return repr(self)
def merge(self, other: "Peer") -> "Peer":
"""
Merge two peers that are actually the same client.
The symmetric and active properties are kept from the original client.
"""
self.errors = max(self.errors, other.errors)
self.last_hello_time = max(self.last_hello_time, other.last_hello_time)
self.last_long_hello_time = max(self.last_hello_time, other.last_long_hello_time)
self.addresses.update(self.addresses)
self.addresses.update(other.addresses)
self.id = self.id if self.id > 0 else other.id
self.marked_as_banned = other.marked_as_banned
return self
class User(Peer):
"""
The user of the program. It can speak with other peers.
"""
def __init__(self, instance: Any, nickname: str):
super().__init__(nickname, instance.bind_address, instance.bind_port)
# Random identifier on 64 bits
self.id = randint(0, 1 << 64 - 1)
self.incr_nonce = 0
# Create UDP socket
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
# Bind the socket
self.socket.bind(self.main_address)
self.squinnondation = instance
self.input_buffer = ""
self.input_index = 0
self.last_line = -1
# Lock the refresh function in order to avoid concurrent refresh
self.refresh_lock = RLock()
# Lock function that can be used by two threads to avoid concurrent refresh
self.data_lock = RLock()
self.history = []
self.received_messages = dict()
self.recent_messages = dict() # of the form [Pkt(DataTLV), date of first reception,
# dict(neighbour, date of the next send, nb of times it has already been sent)]
self.history_pad = curses.newpad(curses.LINES - 2, curses.COLS)
self.input_pad = curses.newpad(1, curses.COLS)
self.emoji_pad = curses.newpad(18, 12)
self.emoji_panel_page = -1
if curses.can_change_color():
curses.init_color(curses.COLOR_WHITE, 1000, 1000, 1000)
for i in range(curses.COLOR_BLACK + 1, curses.COLOR_WHITE):
curses.init_pair(i + 1, i, curses.COLOR_BLACK)
# dictionnaries of neighbours
self.neighbours = dict()
self.nbNS = 0
self.minNS = 3 # minimal number of symmetric neighbours a user needs to have.
self.worm = Worm(self)
self.neighbour_manager = PeerManager(self)
self.inondator = Inondator(self)
self.add_system_message(f"Listening on {self.main_address[0]}:{self.main_address[1]}", ignore_debug=True)
self.add_system_message(f"I am {self.id}")
def new_peer(self, address: str, port: int) -> Peer:
"""
Returns a new peer (with no id nor nickname)
"""
peer = Peer(address=address, port=port)
return peer
@property
def active_peers(self) -> set:
return set(peer for peer in self.neighbours.values() if peer.active)
@property
def potential_peers(self) -> set:
return set(peer for peer in self.neighbours.values() if peer.potential)
def find_peer(self, address: str, port: int) -> Peer:
"""
Translate an address into a peer. If this peer does not exist,
creates a new peer.
"""
self.data_lock.acquire()
if (address, port) in self.neighbours:
return self.neighbours[(address, port)]
peer = Peer(address=address, port=port)
self.neighbours[(address, port)] = peer
self.data_lock.release()
return peer
def find_peer_by_id(self, peer_id: int) -> Peer:
"""
Retrieve the peer that is known by its id. Return None if it is unknown.
The given identifier must be positive.
"""
self.data_lock.acquire()
if peer_id > 0:
for peer in self.neighbours.values():
if peer.id == peer_id:
return peer
self.data_lock.release()
def find_peer_by_nickname(self, nickname: str) -> Generator[Peer, Any, None]:
"""
Retrieve the peers that are known by their nicknames.
"""
self.data_lock.acquire()
for peer in self.neighbours.values():
if peer.nickname == nickname:
yield peer
self.data_lock.release()
def send_packet(self, client: Peer, pkt: Packet) -> int:
"""
Send a formatted packet to a client.
"""
if len(pkt) > 1024:
# The packet is too large to be sent by the protocol. We split the packet in subpackets.
return sum(self.send_packet(client, subpkt) for subpkt in pkt.split(1024))
res = self.send_raw_data(client, pkt.marshal())
return res
def send_raw_data(self, client: Peer, data: bytes) -> int:
"""
Send a raw packet to a client.
"""
return self.socket.sendto(data, client.main_address)
def receive_packet(self) -> Tuple[Packet, Peer]:
"""
Receive a packet from the socket and translate it into a Python object.
Warning: the process is blocking, it should be ran inside a dedicated thread.
"""
data, addr = self.receive_raw_data()
peer = self.find_peer(addr[0], addr[1])
if peer.banned:
# The client already sent errored packets
return Packet.construct(), peer
try:
pkt = Packet.unmarshal(data)
except ValueError as error:
# The packet contains an error. We memorize it and warn the other user.
peer.errors += 1
self.send_packet(peer, Packet.construct(WarningTLV.construct(
f"An error occured while reading your packet: {error}")))
if peer.banned:
self.send_packet(peer, Packet.construct(WarningTLV.construct(
"You got banned since you sent too much errored packets.")))
raise ValueError("Client is banned since there were too many errors.", error)
raise error
else:
return pkt, peer
def receive_raw_data(self) -> Tuple[bytes, Any]:
"""
Receive a packet from the socket.
"""
return self.socket.recvfrom(1024)
def start_threads(self) -> None:
"""
Start asynchronous threads.
"""
# Kill subthreads when exitting the program
self.worm.setDaemon(True)
self.neighbour_manager.setDaemon(True)
self.inondator.setDaemon(True)
self.worm.start()
self.neighbour_manager.start()
self.inondator.start()
def wait_for_key(self) -> None:
"""
Infinite loop where we are waiting for a key of the user.
"""
while True:
self.refresh_history()
self.refresh_input()
if not self.squinnondation.no_emoji:
self.refresh_emoji_pad()
try:
key = self.squinnondation.screen.get_wch(
curses.LINES - 1, min(3 + len(self.nickname) + self.input_index, curses.COLS - 4))
except curses.error:
continue
except KeyboardInterrupt:
# Exit the program and send GoAway to neighbours
self.leave()
return
if key == curses.KEY_MOUSE:
try:
_, x, y, _, attr = curses.getmouse()
self.handle_mouse_click(y, x, attr)
continue
except curses.error:
# This is not a valid click
continue
self.handle_key_pressed(key)
def handle_key_pressed(self, key: str) -> None: # noqa: C901
"""
Process the key press from the user.
"""
if key == "\x7f" or key == curses.KEY_BACKSPACE: # backspace
# delete character at the good position
if self.input_index:
self.input_index -= 1
self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
return
elif key == curses.KEY_DC: # key
if self.input_index < len(self.input_buffer):
self.input_buffer = self.input_buffer[:self.input_index] + self.input_buffer[self.input_index + 1:]
return
elif key == curses.KEY_LEFT:
# Navigate in the message to the left
self.input_index = max(0, self.input_index - 1)
return
elif key == curses.KEY_RIGHT:
# Navigate in the message to the right
self.input_index = min(len(self.input_buffer), self.input_index + 1)
return
elif key == curses.KEY_UP:
# Scroll up in the history
self.last_line = min(max(curses.LINES - 3, self.last_line - 1), len(self.history) - 1)
return
elif key == curses.KEY_DOWN:
# Scroll down in the history
self.last_line = min(len(self.history) - 1, self.last_line + 1)
return
elif key == curses.KEY_PPAGE:
# Page up in the history
self.last_line = min(max(curses.LINES - 3, self.last_line - (curses.LINES - 3)), len(self.history) - 1)
return
elif key == curses.KEY_NPAGE:
# Page down in the history
self.last_line = min(len(self.history) - 1, self.last_line + (curses.LINES - 3))
return
elif key == curses.KEY_HOME:
# Place the cursor at the beginning of the typing word
self.input_index = 0
return
elif key == curses.KEY_END:
# Place the cursor at the end of the typing word
self.input_index = len(self.input_buffer)
return
elif isinstance(key, int):
# Unmanaged complex key
self.add_system_message(str(key))
return
elif key != "\n":
# Insert the pressed key in the current message
new_buffer = self.input_buffer[:self.input_index] + key + self.input_buffer[self.input_index:]
if len(DataTLV.construct(f"{self.nickname}: {new_buffer}", None)) > 255 - 8 - 4:
# The message is too long to be sent once. We don't allow the user to type any other character.
curses.beep()
return
self.input_buffer = new_buffer
self.input_index += 1
return
# Send message to neighbours
msg = self.input_buffer
self.input_buffer = ""
self.input_index = 0
if not msg:
return
if msg.startswith("/"):
return self.handle_command(msg[1:])
msg = f"{self.nickname}: {msg}"
self.add_message(msg)
pkt = Packet.construct(DataTLV.construct(msg, self))
for peer in self.active_peers:
self.send_packet(peer, pkt)
def handle_mouse_click(self, y: int, x: int, attr: int) -> None:
"""
The user clicks on the screen, at coordinates (y, x).
According to the position, we can indicate what can be done.
"""
if not self.squinnondation.no_emoji:
if y == curses.LINES - 1 and x >= curses.COLS - 3:
# Click on the emoji, open or close the emoji pad
self.emoji_panel_page *= -1
elif self.emoji_panel_page > 0 and y == curses.LINES - 4 and x >= curses.COLS - 5:
# Open next emoji page
self.emoji_panel_page += 1
elif self.emoji_panel_page > 1 and y == curses.LINES - curses.LINES // 2 - 1 \
and x >= curses.COLS - 5:
# Open previous emoji page
self.emoji_panel_page -= 1
elif self.emoji_panel_page > 0 and y >= curses.LINES // 2 - 1 and x >= curses.COLS // 2 - 1:
pad_y, pad_x = y - (curses.LINES - curses.LINES // 2) + 1, \
(x - (curses.COLS - curses.COLS // 3) + 1) // 2
# Click on an emoji on the pad to autocomplete an emoji
self.click_on_emoji_pad(pad_y, pad_x)
def click_on_emoji_pad(self, pad_y: int, pad_x: int) -> None:
"""
The emoji pad contains the list of all available emojis.
Clicking on a emoji auto-complete the emoji in the input pad.
"""
import emoji
from emoji import unicode_codes
height, width = self.emoji_pad.getmaxyx()
height -= 1
width -= 1
emojis = list(unicode_codes.UNICODE_EMOJI)
emojis = [c for c in emojis if len(c) == 1]
size = (height - 2) * (width - 4) // 2
page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size]
index = pad_y * (width - 4) // 2 + pad_x
char = page[index]
if char:
demojized = emoji.demojize(char)
if char != demojized:
for c in reversed(demojized):
curses.ungetch(c)
def handle_command(self, command: str) -> None:
"""
The user sent a command. We analyse it and process what is needed.
"""
self.data_lock.acquire()
def resolve_address(address: str) -> str:
# Resolve address
try:
# Resolve DNS as an IPv6
return socket.getaddrinfo(address, None, socket.AF_INET6)[0][4][0]
except socket.gaierror:
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
try:
return "::ffff:" + socket.getaddrinfo(address, None, socket.AF_INET)[0][4][0]
except socket.gaierror:
raise ValueError(f"{address} is not resolvable")
def resolve_port(port: str) -> int:
try:
port = int(port)
if not 1 <= port <= 65565:
raise ValueError
return port
except ValueError:
raise ValueError(f"{port} is not a valid port")
args = command.split(" ")
command, args = args[0].lower(), args[1:]
if command == "help" or command == "usage":
self.add_system_message("**/help**\t\t\t\tDisplay this help menu", italic=False, ignore_debug=True)
self.add_system_message("**/connect address port**\t\tAdd this address in the potential neighbours",
italic=False, ignore_debug=True)
self.add_system_message("**/hello address port**\t\tSend short hello to the given neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/unban address port**\t\tReset the error counter of a given neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/info id|nickname|addr port**\tDisplay information about a neighbour",
italic=False, ignore_debug=True)
self.add_system_message("**/active**\t\t\tDisplay the list of all active neighbours.",
italic=False, ignore_debug=True)
self.add_system_message("**/potential**\t\t\tDisplay the list of all potential neighbours.",
italic=False, ignore_debug=True)
self.add_system_message("**/debug**\t\t\t\tToggle debug mode", italic=False, ignore_debug=True)
self.add_system_message("**/emojis**\t\t\tToggle emojis support", italic=False, ignore_debug=True)
self.add_system_message("**/markdown**\t\t\tToggle markdown support", italic=False, ignore_debug=True)
elif command == "connect":
if len(args) != 2:
self.add_system_message("Usage: /connect address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) in self.neighbours:
self.add_system_message("There is already a known client with this address.", ignore_debug=True)
return
peer = self.new_peer(address, port)
self.neighbours[(address, port)] = peer
self.add_system_message(f"Potential client successfully added! You can send a hello by running "
f"\"/hello {address} {port}\".", ignore_debug=True)
elif command == "hello":
if len(args) != 2:
self.add_system_message("Usage: /hello address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
peer = self.find_peer(address, port)
self.send_packet(peer, Packet.construct(HelloTLV.construct(8, self)))
self.add_system_message("Hello successfully sent!", ignore_debug=True)
elif command == "unban":
if len(args) != 2:
self.add_system_message("Usage: /unban address port", italic=False, ignore_debug=True)
return
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
peer = self.find_peer(address, port)
peer.errors = 0
self.add_system_message("The client is unbanned.", ignore_debug=True)
elif command == "info":
if len(args) > 2:
self.add_system_message("Usage: /info me|id|nickname|addr port", italic=False, ignore_debug=True)
return
if not args:
peers = [self]
elif len(args) == 2:
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
except ValueError as e:
self.add_system_message(str(e), ignore_debug=True)
return
if (address, port) not in self.neighbours:
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
peers = [self.find_peer(address, port)]
else:
peers = list(self.find_peer_by_nickname(args[0]))
if args[0].isnumeric():
identifier = int(args[0])
peers.append(self.find_peer_by_id(identifier))
if not peers:
self.add_system_message("Unknown client.")
return
for peer in peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "active":
if not self.active_peers:
self.add_system_message("No active neighbour.", italic=False, ignore_debug=True)
return
for peer in self.active_peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "potential":
if not self.potential_peers:
self.add_system_message("No potential neighbour.", italic=False, ignore_debug=True)
return
for peer in self.potential_peers:
self.add_system_message(f"**Identifier:** {peer.id or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Nickname:** {peer.nickname or '<*unknown*>'}",
italic=False, ignore_debug=True)
self.add_system_message(f"**Addresses:** "
+ ", ".join(f"{address}:{port}" for address, port in peer.addresses),
italic=False, ignore_debug=True)
elif command == "debug":
self.squinnondation.debug ^= True
self.add_system_message(
"Debug mode " + ("enabled" if self.squinnondation.debug else "disabled") + ".", ignore_debug=True)
elif command == "emojis":
self.squinnondation.no_emoji ^= True
self.add_system_message(
"Emojis support " + ("disabled" if self.squinnondation.no_emoji else "enabled") + ".",
ignore_debug=True)
elif command == "markdown":
self.squinnondation.debug ^= True
self.add_system_message(
"Markdown support " + ("disabled" if self.squinnondation.no_markdown else "enabled") + ".",
ignore_debug=True)
else:
self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True)
self.data_lock.release()
def add_message(self, msg: str) -> None:
"""
Store a new message into the history.
"""
self.history.append(msg)
if self.last_line == len(self.history) - 2:
self.last_line += 1
def receive_message_from(self, tlv: DataTLV, msg: str, sender_id: int, nonce: int, relay: Peer) -> bool:
"""
This method is called by a DataTLV, sent by a real person.
This add the message in the history if not already done.
Returns True iff the message was not already received previously.
"""
self.data_lock.acquire()
if (sender_id, nonce) not in self.received_messages:
# If it is a new message, add it to recent_messages
d = self.make_inundation_dict()
pkt = Packet().construct(tlv)
self.recent_messages[(sender_id, nonce)] = [pkt, time.time(), d]
# in all cases, remove the sender from the list of neighbours to be inundated
self.remove_from_inundation(relay, sender_id, nonce)
if (sender_id, nonce) in self.received_messages:
return False
self.add_message(msg) # for display purposes
self.received_messages[(sender_id, nonce)] = Message(msg, sender_id, nonce)
self.data_lock.release()
return True
def make_inundation_dict(self) -> dict:
"""
Takes the active peers dictionnary and returns a list of [peer, date+random, 0]
"""
self.data_lock.acquire()
res = dict()
peers = self.active_peers
for peer in peers:
if peer.symmetric:
next_send = uniform(1, 2)
res[peer.main_address] = [peer, time.time() + next_send, 0]
self.data_lock.release()
return res
def remove_from_inundation(self, peer: Peer, sender_id: int, nonce: int) -> None:
"""
Remove the sender from the list of neighbours to be inundated
"""
self.data_lock.acquire()
if (sender_id, nonce) in self.recent_messages:
# If a peer is late in its acknowledgement, the absence of the previous if causes an error.
for addr in peer.addresses:
self.recent_messages[(sender_id, nonce)][2].pop(addr, None)
if not self.recent_messages[(sender_id, nonce)][2]: # If dictionnary is empty, remove the message
self.recent_messages.pop((sender_id, nonce), None)
self.data_lock.release()
def clean_inundation(self) -> None:
"""
Remove messages which are overdue (older than 2 minutes) from the inundation dictionnary.
"""
self.data_lock.acquire()
for key in self.recent_messages:
if time.time() - self.recent_messages[key][1] > 120:
self.recent_messages.pop(key)
self.data_lock.release()
def main_inundation(self) -> None:
"""
The main inundation function.
"""
for key in self.recent_messages:
k = list(self.recent_messages[key][2].keys())
for key2 in k:
if time.time() >= self.recent_messages[key][2][key2][1]:
self.add_system_message(f"inundating {self.recent_messages[key][2][key2][0].id} with message {key}")
# send the packet if it is overdue
self.send_packet(self.recent_messages[key][2][key2][0], self.recent_messages[key][0])
# change the time until the next send
a = self.recent_messages[key][2][key2][2]
self.recent_messages[key][2][key2][2] = a + 1
next_send = uniform(2 ** (a - 1), 2 ** a)
self.recent_messages[key][2][key2][1] = time.time() + next_send
if self.recent_messages[key][2][key2][2] >= 5: # the neighbour is not reactive enough
gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, f"{self.id} No acknowledge")
pkt = Packet().construct(gatlv)
peer = self.recent_messages[key][2][key2][0]
self.send_packet(peer, pkt)
self.recent_messages[key][2].pop(key2)
def add_system_message(self, msg: str, italic: bool = True, ignore_debug: bool = False) -> None:
"""
Add a new system log message.
"""
if self.squinnondation.debug or ignore_debug:
return self.add_message(
f"system: *{msg}*" if not self.squinnondation.no_markdown and italic else f"system: {msg}")
def print_markdown(self, pad: Any, y: int, x: int, msg: str,
bold: bool = False, italic: bool = False, underline: bool = False, strike: bool = False) -> int:
"""
Parse a markdown-formatted text and format the text as bold, italic or text text.
***text***: bold, italic
**text**: bold
*text*: italic
__text__: underline
_text_: italic
~~text~~: strikethrough
"""
# Replace :emoji_name: by the good emoji
if not self.squinnondation.no_emoji:
import emoji
msg = emoji.emojize(msg, use_aliases=True)
if self.squinnondation.no_markdown:
pad.addstr(y, x, msg)
return len(msg)
underline_match = re.match("(.*)__(.*)__(.*)", msg)
if underline_match:
before, text, after = underline_match.group(1), underline_match.group(2), underline_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, not underline)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline)
return len_before + len_mid + len_after
italic_match = re.match("(.*)_(.*)_(.*)", msg)
if italic_match:
before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline)
return len_before + len_mid + len_after
bold_italic_match = re.match("(.*)\\*\\*\\*(.*)\\*\\*\\*(.*)", msg)
if bold_italic_match:
before, text, after = bold_italic_match.group(1), bold_italic_match.group(2),\
bold_italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, not italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
bold_match = re.match("(.*)\\*\\*(.*)\\*\\*(.*)", msg)
if bold_match:
before, text, after = bold_match.group(1), bold_match.group(2), bold_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, not bold, italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
italic_match = re.match("(.*)\\*(.*)\\*(.*)", msg)
if italic_match:
before, text, after = italic_match.group(1), italic_match.group(2), italic_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, not italic, underline, strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
strike_match = re.match("(.*)~~(.*)~~(.*)", msg)
if strike_match:
before, text, after = strike_match.group(1), strike_match.group(2), strike_match.group(3)
len_before = self.print_markdown(pad, y, x, before, bold, italic, underline, strike)
len_mid = self.print_markdown(pad, y, x + len_before, text, bold, italic, underline, not strike)
len_after = self.print_markdown(pad, y, x + len_before + len_mid, after, bold, italic, underline, strike)
return len_before + len_mid + len_after
size = len(msg)
attrs = 0
attrs |= curses.A_BOLD if bold else 0
attrs |= curses.A_ITALIC if italic else 0
attrs |= curses.A_UNDERLINE if underline else 0
if strike:
msg = "".join(c + "\u0336" for c in msg)
remaining_lines = curses.LINES - 3 - (y + x // (curses.COLS - 1)) + 1
if remaining_lines > 0:
# Don't print the end of the line if it is too long
space_left_on_line = (curses.COLS - 2) - (x % (curses.COLS - 1))
msg = msg[:space_left_on_line + max(0, (curses.COLS - 1) * (remaining_lines - 1))]
if msg:
pad.addstr(y + x // (curses.COLS - 1), x % (curses.COLS - 1), msg, attrs)
return size
def refresh_history(self) -> None:
"""
Rewrite the history of the messages.
"""
self.refresh_lock.acquire()
y, x = self.squinnondation.screen.getmaxyx()
if curses.is_term_resized(curses.LINES, curses.COLS):
curses.resizeterm(y, x)
self.history_pad.resize(curses.LINES - 2, curses.COLS - 1)
self.input_pad.resize(1, curses.COLS - 1)
self.history_pad.erase()
y_offset = 0
for i, msg in enumerate(self.history[max(0, self.last_line - curses.LINES + 3):self.last_line + 1]):
if i + y_offset > curses.LINES - 3:
break
msg = re.sub("([^:]*): (.*)", "<\\1> \\2", msg, 1)
if not re.match("<.*> .*", msg):
msg = "<unknown> " + msg
match = re.match("<(.*)> (.*)", msg)
nickname = match.group(1)
msg = match.group(2)
full_msg = f"<{nickname}> {msg}"
color_id = sum(ord(c) for c in nickname) % 6 + 1
true_width = self.print_markdown(self.history_pad, i + y_offset, 0, full_msg)
self.history_pad.addstr(i + y_offset, 1, nickname, curses.A_BOLD | curses.color_pair(color_id + 1))
y_offset += true_width // (curses.COLS - 1)
self.history_pad.refresh(0, 0, 0, 0, curses.LINES - 2, curses.COLS)
self.refresh_lock.release()
def refresh_input(self) -> None:
"""
Redraw input line. Must not be called while the message is not sent.
"""
self.refresh_lock.acquire()
self.input_pad.erase()
color_id = sum(ord(c) for c in self.nickname) % 6 + 1
self.input_pad.addstr(0, 0, "<")
self.input_pad.addstr(0, 1, self.nickname, curses.A_BOLD | curses.color_pair(color_id + 1))
self.input_pad.addstr(0, 1 + len(self.nickname), "> ")
msg = self.input_buffer
if self.input_index >= curses.COLS - len(self.nickname) - 7:
msg = msg[self.input_index - (curses.COLS - len(self.nickname) - 7):self.input_index]
msg = msg[:curses.COLS - len(self.nickname) - 7]
self.input_pad.addstr(0, 3 + len(self.nickname), msg)
if not self.squinnondation.no_emoji:
self.input_pad.addstr(0, self.input_pad.getmaxyx()[1] - 3, "😀")
self.input_pad.refresh(0, 0, curses.LINES - 1, 0, curses.LINES - 1, curses.COLS - 1)
self.refresh_lock.release()
def refresh_emoji_pad(self) -> None:
"""
Display the emoji pad if necessary.
"""
if self.squinnondation.no_emoji:
return
from emoji import unicode_codes
self.refresh_lock.acquire()
self.emoji_pad.erase()
if self.emoji_panel_page > 0:
height, width = curses.LINES // 2, curses.COLS // 3
self.emoji_pad.resize(height + 1, width + 1)
self.emoji_pad.addstr(0, 0, "" + (width - 2) * "" + "")
self.emoji_pad.addstr(0, (width - 14) // 2, " == EMOJIS == ")
for i in range(1, height):
self.emoji_pad.addstr(i, 0, "" + (width - 2) * " " + "")
self.emoji_pad.addstr(height - 1, 0, "" + (width - 2) * "" + "")
emojis = list(unicode_codes.UNICODE_EMOJI)
emojis = [c for c in emojis if len(c) == 1]
size = (height - 2) * (width - 4) // 2
page = emojis[(self.emoji_panel_page - 1) * size:self.emoji_panel_page * size]
if self.emoji_panel_page != 1:
self.emoji_pad.addstr(1, width - 2, "")
if len(page) == size:
self.emoji_pad.addstr(height - 2, width - 2, "")
for i in range(height - 2):
for j in range((width - 4) // 2 + 1):
index = i * (width - 4) // 2 + j
if index < len(page):
self.emoji_pad.addstr(i + 1, 2 * j + 1, page[index])
self.emoji_pad.refresh(0, 0, curses.LINES - height - 2, curses.COLS - width - 2,
curses.LINES - 2, curses.COLS - 2)
self.refresh_lock.release()
def potential_to_contact(self) -> list:
"""
Returns a list of peers the user should contact if it does
not have enough symmetric neighbours.
"""
self.data_lock.acquire()
res = []
val = list(self.potential_peers)
lp = len(val)
for i in range(min(lp, max(0, self.minNS - self.nbNS))):
a = randint(0, lp - 1)
res.append(val[a])
self.data_lock.release()
return res
def send_hello(self) -> None:
"""
Sends a long HelloTLV to all active neighbours.
"""
self.data_lock.acquire()
for peer in self.active_peers:
htlv = HelloTLV().construct(16, self, peer)
pkt = Packet().construct(htlv)
self.send_packet(peer, pkt)
self.data_lock.release()
def verify_activity(self) -> None:
"""
All neighbours that have not sent a HelloTLV in the last 2
minutes are considered not active.
"""
self.data_lock.acquire()
val = list(self.active_peers) # create a copy because the dict size will change
for peer in val:
if time.time() - peer.last_hello_time > 2 * 60:
gatlv = GoAwayTLV().construct(GoAwayType.TIMEOUT, "you did not talk to me")
pkt = Packet().construct(gatlv)
self.send_packet(peer, pkt)
peer.active = False
self.update_peer_table(peer)
self.data_lock.release()
def update_peer_table(self, peer: Peer) -> None:
"""
We insert the peer into our table of clients.
If there is a collision with the address / the ID, then we merge clients into a unique one.
"""
self.data_lock.acquire()
for addr in peer.addresses:
if addr in self.neighbours:
# Merge with the previous peer
old_peer = self.neighbours[addr]
peer.merge(old_peer)
self.neighbours[addr] = peer
for other_peer in list(self.neighbours.values()):
if other_peer.id == peer.id > 0 and other_peer != peer:
# The peer with the same id is known as a different address. We merge everything
peer.merge(other_peer)
self.data_lock.release()
def send_neighbours(self) -> None:
"""
Update the number of symmetric neighbours and
send all neighbours NeighbourTLV
"""
self.data_lock.acquire()
nb_ns = 0
# could send the same to all neighbour, but it means that neighbour
# A could receive a message with itself in it -> if the others do not pay attention, trouble
for peer in self.active_peers:
if time.time() - peer.last_long_hello_time <= 2 * 60:
nb_ns += 1
peer.symmetric = True
ntlv = NeighbourTLV().construct(*peer.main_address)
pkt = Packet().construct(ntlv)
for destination in self.active_peers:
if destination.id != peer.id:
self.send_packet(destination, pkt)
else:
peer.symmetric = False
self.nbNS = nb_ns
self.data_lock.release()
def leave(self) -> None:
"""
The program is exited. We send a GoAway to our neighbours, then close the program.
"""
# Last inundation
self.main_inundation()
self.clean_inundation()
# Broadcast a GoAway
gatlv = GoAwayTLV().construct(GoAwayType.EXIT, "I am leaving! Good bye!")
pkt = Packet.construct(gatlv)
for peer in self.active_peers:
self.send_packet(peer, pkt)
exit(0)
class Worm(Thread):
"""
The worm is the peer listener.
It always waits for an incoming packet, then it treats it, and continues to wait.
It is in a dedicated thread.
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def run(self) -> None:
while True:
try:
pkt, peer = self.user.receive_packet()
except ValueError as error:
self.user.add_system_message("An error occurred while receiving a packet: {}".format(error))
self.user.refresh_history()
self.user.refresh_input()
else:
if peer.banned:
# Ignore banned peers
continue
for tlv in pkt.body:
tlv.handle(self.user, peer)
self.user.refresh_history()
self.user.refresh_input()
class PeerManager(Thread):
"""
A process to cleanly manage the user's neighbours
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.last_potential = 0
self.last_check = 0
self.last_neighbour = 0
htlv = HelloTLV().construct(8, self.user)
pkt = Packet().construct(htlv)
self.hellopkt = pkt
def run(self) -> None:
while True:
# First part of neighbour management: ensure the user has enough
# symmetric neighbours.
if time.time() - self.last_potential > 30:
to_contact = self.user.potential_to_contact()
for peer in to_contact:
self.user.send_packet(peer, self.hellopkt)
self.last_potential = time.time()
# Second part: send long HelloTLVs to neighbours every 30 seconds
if time.time() - self.last_check > 30:
self.user.add_system_message(f"I have {len(self.user.active_peers)} friends")
self.user.send_hello()
self.last_check = time.time()
# Third part: get rid of inactive neighbours
self.user.verify_activity()
# Fourth part: verify symmetric neighbours and send NeighbourTLV every minute
if time.time() - self.last_neighbour > 60:
self.user.send_neighbours()
self.last_neighbour = time.time()
# Avoid infinite loops
time.sleep(1)
class Inondator(Thread):
"""
A process to manage the inondation
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.last_check = 0
def run(self) -> None:
while True:
# clean the dictionnary
if time.time() - self.last_check > 30:
self.user.clean_inundation()
self.last_check = time.time()
# inundate
self.user.main_inundation()
# Avoid infinite loops
time.sleep(1)
class Message:
"""
This class symbolises the data sent by a real client, excluding system messages.
This is useful to check unicity or to save and load messages.
"""
content: str
# TODO: Replace the id by the good (potential) peer
sender_id: int
nonce: int
created_at: datetime
def __init__(self, content: str, sender_id: int, nonce: int, created_at: datetime = None):
self.content = content
self.sender_id = sender_id
self.nonce = nonce
self.created_at = created_at or datetime.now()