2021-01-06 20:47:07 +00:00
# Copyright (C) 2020 by eichhornchen, ÿnérant
# SPDX-License-Identifier: GPL-3.0-or-later
from datetime import datetime
from random import randint , uniform
from typing import Any , Tuple , Generator
# from ipaddress import IPv6Address
from threading import Thread , RLock
import curses
import re
import socket
import time
2021-01-07 14:06:23 +00:00
import struct
2021-01-06 20:47:07 +00:00
from . messages import Packet , DataTLV , HelloTLV , GoAwayTLV , GoAwayType , NeighbourTLV , WarningTLV
class Peer :
"""
A connected peer , with its socket .
"""
def __init__ ( self , nickname : str = None , address : str = " localhost " , port : int = 2500 ) :
self . nickname = nickname
self . id = - 1
self . last_hello_time = 0
self . last_long_hello_time = 0
self . symmetric = False
self . active = False
self . errors = 0
2021-01-06 21:04:23 +00:00
self . marked_as_banned = False
2021-01-06 20:47:07 +00:00
try :
# Resolve DNS as an IPv6
address = socket . getaddrinfo ( address , None , socket . AF_INET6 ) [ 0 ] [ 4 ] [ 0 ]
except socket . gaierror :
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
address = " ::ffff: " + socket . getaddrinfo ( address , None , socket . AF_INET ) [ 0 ] [ 4 ] [ 0 ]
self . addresses = set ( )
self . addresses . add ( ( address , port ) )
@property
def potential ( self ) - > bool :
return not self . active and not self . banned
@potential.setter
def potential ( self , value : bool ) - > None :
self . active = not value
@property
def main_address ( self ) - > Tuple [ str , int ] :
"""
A client can have multiple addresses .
We contact it only on one of them .
"""
return list ( self . addresses ) [ 0 ]
@property
def banned ( self ) - > bool :
"""
If a client send more than 5 invalid packets , we don ' t trust it anymore.
"""
2021-01-06 21:04:23 +00:00
return self . errors > = 5 or self . marked_as_banned
2021-01-06 20:47:07 +00:00
def __repr__ ( self ) :
return self . nickname or str ( self . id ) or str ( self . main_address )
def __str__ ( self ) :
return repr ( self )
def merge ( self , other : " Peer " ) - > " Peer " :
"""
Merge two peers that are actually the same client .
The symmetric and active properties are kept from the original client .
"""
self . errors = max ( self . errors , other . errors )
self . last_hello_time = max ( self . last_hello_time , other . last_hello_time )
self . last_long_hello_time = max ( self . last_hello_time , other . last_long_hello_time )
self . addresses . update ( self . addresses )
self . addresses . update ( other . addresses )
self . id = self . id if self . id > 0 else other . id
2021-01-06 21:04:23 +00:00
self . marked_as_banned = other . marked_as_banned
2021-01-06 20:47:07 +00:00
return self
class User ( Peer ) :
"""
The user of the program . It can speak with other peers .
"""
def __init__ ( self , instance : Any , nickname : str ) :
super ( ) . __init__ ( nickname , instance . bind_address , instance . bind_port )
2021-01-07 14:06:23 +00:00
2021-01-06 20:47:07 +00:00
# Random identifier on 64 bits
self . id = randint ( 0 , 1 << 64 - 1 )
self . incr_nonce = 0
# Create UDP socket
self . socket = socket . socket ( socket . AF_INET6 , socket . SOCK_DGRAM )
# Bind the socket
self . socket . bind ( self . main_address )
2021-01-07 14:06:23 +00:00
# Create multicast socket
self . multicast_socket = socket . socket ( socket . AF_INET6 , socket . SOCK_DGRAM , socket . IPPROTO_UDP )
self . multicast_socket . bind ( ( ' ' , 1212 ) ) #listen on all interfaces?
# To join the group, we need to give setsockopt a binary packed representation of the multicast group's address and on what interfaces we will listen (here all)
mreq = struct . pack ( " 16s15s " , socket . inet_pton ( socket . AF_INET6 , " ff02::4242:4242 " ) , bytes ( socket . INADDR_ANY ) ) #The string "16s15s" corresponds to the packing options: here it packs the arguments into a 16-byte string and a 15-byte string.
self . multicast_socket . setsockopt ( socket . IPPROTO_IPV6 , socket . IPV6_JOIN_GROUP , mreq )
2021-01-06 20:47:07 +00:00
self . squinnondation = instance
self . input_buffer = " "
self . input_index = 0
self . last_line = - 1
# Lock the refresh function in order to avoid concurrent refresh
self . refresh_lock = RLock ( )
2021-01-07 14:06:23 +00:00
# Lock functions that can be used by two threads to avoid concurrent refresh
2021-01-06 20:47:07 +00:00
self . data_lock = RLock ( )
self . history = [ ]
self . received_messages = dict ( )
self . recent_messages = dict ( ) # of the form [Pkt(DataTLV), date of first reception,
# dict(neighbour, date of the next send, nb of times it has already been sent)]
self . history_pad = curses . newpad ( curses . LINES - 2 , curses . COLS )
self . input_pad = curses . newpad ( 1 , curses . COLS )
self . emoji_pad = curses . newpad ( 18 , 12 )
self . emoji_panel_page = - 1
curses . init_color ( curses . COLOR_WHITE , 1000 , 1000 , 1000 )
for i in range ( curses . COLOR_BLACK + 1 , curses . COLOR_WHITE ) :
curses . init_pair ( i + 1 , i , curses . COLOR_BLACK )
# dictionnaries of neighbours
self . neighbours = dict ( )
self . nbNS = 0
self . minNS = 3 # minimal number of symmetric neighbours a user needs to have.
2021-01-07 14:06:23 +00:00
self . listener = Listener ( self )
2021-01-06 20:47:07 +00:00
self . neighbour_manager = PeerManager ( self )
self . inondator = Inondator ( self )
self . add_system_message ( f " Listening on { self . main_address [ 0 ] } : { self . main_address [ 1 ] } " , ignore_debug = True )
self . add_system_message ( f " I am { self . id } " )
def new_peer ( self , address : str , port : int ) - > Peer :
"""
Returns a new peer ( with no id nor nickname )
"""
peer = Peer ( address = address , port = port )
return peer
@property
def active_peers ( self ) - > set :
return set ( peer for peer in self . neighbours . values ( ) if peer . active )
@property
def potential_peers ( self ) - > set :
return set ( peer for peer in self . neighbours . values ( ) if peer . potential )
def find_peer ( self , address : str , port : int ) - > Peer :
"""
Translate an address into a peer . If this peer does not exist ,
creates a new peer .
"""
self . data_lock . acquire ( )
if ( address , port ) in self . neighbours :
return self . neighbours [ ( address , port ) ]
peer = Peer ( address = address , port = port )
self . neighbours [ ( address , port ) ] = peer
self . data_lock . release ( )
return peer
def find_peer_by_id ( self , peer_id : int ) - > Peer :
"""
Retrieve the peer that is known by its id . Return None if it is unknown .
The given identifier must be positive .
"""
self . data_lock . acquire ( )
if peer_id > 0 :
for peer in self . neighbours . values ( ) :
if peer . id == peer_id :
return peer
self . data_lock . release ( )
def find_peer_by_nickname ( self , nickname : str ) - > Generator [ Peer , Any , None ] :
"""
Retrieve the peers that are known by their nicknames .
"""
self . data_lock . acquire ( )
for peer in self . neighbours . values ( ) :
if peer . nickname == nickname :
yield peer
self . data_lock . release ( )
def send_packet ( self , client : Peer , pkt : Packet ) - > int :
"""
Send a formatted packet to a client .
"""
if len ( pkt ) > 1024 :
# The packet is too large to be sent by the protocol. We split the packet in subpackets.
return sum ( self . send_packet ( client , subpkt ) for subpkt in pkt . split ( 1024 ) )
res = self . send_raw_data ( client , pkt . marshal ( ) )
return res
def send_raw_data ( self , client : Peer , data : bytes ) - > int :
"""
Send a raw packet to a client .
"""
return self . socket . sendto ( data , client . main_address )
def receive_packet ( self ) - > Tuple [ Packet , Peer ] :
"""
Receive a packet from the socket and translate it into a Python object .
Warning : the process is blocking , it should be ran inside a dedicated thread .
"""
data , addr = self . receive_raw_data ( )
peer = self . find_peer ( addr [ 0 ] , addr [ 1 ] )
if peer . banned :
# The client already sent errored packets
return Packet . construct ( ) , peer
try :
pkt = Packet . unmarshal ( data )
except ValueError as error :
# The packet contains an error. We memorize it and warn the other user.
peer . errors + = 1
self . send_packet ( peer , Packet . construct ( WarningTLV . construct (
f " An error occured while reading your packet: { error } " ) ) )
if peer . banned :
self . send_packet ( peer , Packet . construct ( WarningTLV . construct (
" You got banned since you sent too much errored packets. " ) ) )
raise ValueError ( " Client is banned since there were too many errors. " , error )
raise error
else :
return pkt , peer
def receive_raw_data ( self ) - > Tuple [ bytes , Any ] :
"""
Receive a packet from the socket .
"""
return self . socket . recvfrom ( 1024 )
def start_threads ( self ) - > None :
"""
Start asynchronous threads .
"""
# Kill subthreads when exitting the program
2021-01-07 14:06:23 +00:00
self . listener . setDaemon ( True )
2021-01-06 20:47:07 +00:00
self . neighbour_manager . setDaemon ( True )
self . inondator . setDaemon ( True )
2021-01-07 14:06:23 +00:00
self . listener . start ( )
2021-01-06 20:47:07 +00:00
self . neighbour_manager . start ( )
self . inondator . start ( )
def wait_for_key ( self ) - > None :
"""
Infinite loop where we are waiting for a key of the user .
"""
while True :
self . refresh_history ( )
self . refresh_input ( )
if not self . squinnondation . no_emoji :
self . refresh_emoji_pad ( )
try :
key = self . squinnondation . screen . get_wch (
curses . LINES - 1 , min ( 3 + len ( self . nickname ) + self . input_index , curses . COLS - 4 ) )
except curses . error :
continue
except KeyboardInterrupt :
# Exit the program and send GoAway to neighbours
self . leave ( )
return
if key == curses . KEY_MOUSE :
try :
_ , x , y , _ , attr = curses . getmouse ( )
self . handle_mouse_click ( y , x , attr )
continue
except curses . error :
# This is not a valid click
continue
self . handle_key_pressed ( key )
def handle_key_pressed ( self , key : str ) - > None : # noqa: C901
"""
Process the key press from the user .
"""
if key == " \x7f " or key == curses . KEY_BACKSPACE : # backspace
# delete character at the good position
if self . input_index :
self . input_index - = 1
self . input_buffer = self . input_buffer [ : self . input_index ] + self . input_buffer [ self . input_index + 1 : ]
return
elif key == curses . KEY_DC : # key
if self . input_index < len ( self . input_buffer ) :
self . input_buffer = self . input_buffer [ : self . input_index ] + self . input_buffer [ self . input_index + 1 : ]
return
elif key == curses . KEY_LEFT :
# Navigate in the message to the left
self . input_index = max ( 0 , self . input_index - 1 )
return
elif key == curses . KEY_RIGHT :
# Navigate in the message to the right
self . input_index = min ( len ( self . input_buffer ) , self . input_index + 1 )
return
elif key == curses . KEY_UP :
# Scroll up in the history
self . last_line = min ( max ( curses . LINES - 3 , self . last_line - 1 ) , len ( self . history ) - 1 )
return
elif key == curses . KEY_DOWN :
# Scroll down in the history
self . last_line = min ( len ( self . history ) - 1 , self . last_line + 1 )
return
elif key == curses . KEY_PPAGE :
# Page up in the history
self . last_line = min ( max ( curses . LINES - 3 , self . last_line - ( curses . LINES - 3 ) ) , len ( self . history ) - 1 )
return
elif key == curses . KEY_NPAGE :
# Page down in the history
self . last_line = min ( len ( self . history ) - 1 , self . last_line + ( curses . LINES - 3 ) )
return
elif key == curses . KEY_HOME :
# Place the cursor at the beginning of the typing word
self . input_index = 0
return
elif key == curses . KEY_END :
# Place the cursor at the end of the typing word
self . input_index = len ( self . input_buffer )
return
elif isinstance ( key , int ) :
# Unmanaged complex key
self . add_system_message ( str ( key ) )
return
elif key != " \n " :
# Insert the pressed key in the current message
new_buffer = self . input_buffer [ : self . input_index ] + key + self . input_buffer [ self . input_index : ]
if len ( DataTLV . construct ( f " { self . nickname } : { new_buffer } " , None ) ) > 255 - 8 - 4 :
# The message is too long to be sent once. We don't allow the user to type any other character.
curses . beep ( )
return
self . input_buffer = new_buffer
self . input_index + = 1
return
# Send message to neighbours
msg = self . input_buffer
self . input_buffer = " "
self . input_index = 0
if not msg :
return
if msg . startswith ( " / " ) :
return self . handle_command ( msg [ 1 : ] )
msg = f " { self . nickname } : { msg } "
self . add_message ( msg )
pkt = Packet . construct ( DataTLV . construct ( msg , self ) )
for peer in self . active_peers :
self . send_packet ( peer , pkt )
def handle_mouse_click ( self , y : int , x : int , attr : int ) - > None :
"""
The user clicks on the screen , at coordinates ( y , x ) .
According to the position , we can indicate what can be done .
"""
if not self . squinnondation . no_emoji :
if y == curses . LINES - 1 and x > = curses . COLS - 3 :
# Click on the emoji, open or close the emoji pad
self . emoji_panel_page * = - 1
elif self . emoji_panel_page > 0 and y == curses . LINES - 4 and x > = curses . COLS - 5 :
# Open next emoji page
self . emoji_panel_page + = 1
elif self . emoji_panel_page > 1 and y == curses . LINES - curses . LINES / / 2 - 1 \
and x > = curses . COLS - 5 :
# Open previous emoji page
self . emoji_panel_page - = 1
elif self . emoji_panel_page > 0 and y > = curses . LINES / / 2 - 1 and x > = curses . COLS / / 2 - 1 :
pad_y , pad_x = y - ( curses . LINES - curses . LINES / / 2 ) + 1 , \
( x - ( curses . COLS - curses . COLS / / 3 ) + 1 ) / / 2
# Click on an emoji on the pad to autocomplete an emoji
self . click_on_emoji_pad ( pad_y , pad_x )
def click_on_emoji_pad ( self , pad_y : int , pad_x : int ) - > None :
"""
The emoji pad contains the list of all available emojis .
Clicking on a emoji auto - complete the emoji in the input pad .
"""
import emoji
from emoji import unicode_codes
height , width = self . emoji_pad . getmaxyx ( )
height - = 1
width - = 1
emojis = list ( unicode_codes . UNICODE_EMOJI )
emojis = [ c for c in emojis if len ( c ) == 1 ]
size = ( height - 2 ) * ( width - 4 ) / / 2
page = emojis [ ( self . emoji_panel_page - 1 ) * size : self . emoji_panel_page * size ]
index = pad_y * ( width - 4 ) / / 2 + pad_x
char = page [ index ]
if char :
demojized = emoji . demojize ( char )
if char != demojized :
for c in reversed ( demojized ) :
curses . ungetch ( c )
def handle_command ( self , command : str ) - > None :
"""
The user sent a command . We analyse it and process what is needed .
"""
self . data_lock . acquire ( )
def resolve_address ( address : str ) - > str :
# Resolve address
try :
# Resolve DNS as an IPv6
return socket . getaddrinfo ( address , None , socket . AF_INET6 ) [ 0 ] [ 4 ] [ 0 ]
except socket . gaierror :
# This is not a valid IPv6. Assume it can be resolved as an IPv4, and we use IPv4-mapping
# to compute a valid IPv6.
# See https://fr.wikipedia.org/wiki/Adresse_IPv6_mappant_IPv4
try :
return " ::ffff: " + socket . getaddrinfo ( address , None , socket . AF_INET ) [ 0 ] [ 4 ] [ 0 ]
except socket . gaierror :
raise ValueError ( f " { address } is not resolvable " )
def resolve_port ( port : str ) - > int :
try :
port = int ( port )
if not 1 < = port < = 65565 :
raise ValueError
return port
except ValueError :
raise ValueError ( f " { port } is not a valid port " )
args = command . split ( " " )
command , args = args [ 0 ] . lower ( ) , args [ 1 : ]
if command == " help " or command == " usage " :
self . add_system_message ( " **/help** \t \t \t \t Display this help menu " , italic = False , ignore_debug = True )
self . add_system_message ( " **/connect address port** \t \t Add this address in the potential neighbours " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/hello address port** \t \t Send short hello to the given neighbour " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/unban address port** \t \t Reset the error counter of a given neighbour " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/info id|nickname|addr port** \t Display information about a neighbour " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/active** \t \t \t Display the list of all active neighbours. " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/potential** \t \t \t Display the list of all potential neighbours. " ,
italic = False , ignore_debug = True )
self . add_system_message ( " **/debug** \t \t \t \t Toggle debug mode " , italic = False , ignore_debug = True )
self . add_system_message ( " **/emojis** \t \t \t Toggle emojis support " , italic = False , ignore_debug = True )
self . add_system_message ( " **/markdown** \t \t \t Toggle markdown support " , italic = False , ignore_debug = True )
elif command == " connect " :
if len ( args ) != 2 :
self . add_system_message ( " Usage: /connect address port " , italic = False , ignore_debug = True )
return
try :
address , port = resolve_address ( args [ 0 ] ) , resolve_port ( args [ 1 ] )
except ValueError as e :
self . add_system_message ( str ( e ) , ignore_debug = True )
return
if ( address , port ) in self . neighbours :
self . add_system_message ( " There is already a known client with this address. " , ignore_debug = True )
return
peer = self . new_peer ( address , port )
self . neighbours [ ( address , port ) ] = peer
self . add_system_message ( f " Potential client successfully added! You can send a hello by running "
f " \" /hello { address } { port } \" . " , ignore_debug = True )
elif command == " hello " :
if len ( args ) != 2 :
self . add_system_message ( " Usage: /hello address port " , italic = False , ignore_debug = True )
return
try :
address , port = resolve_address ( args [ 0 ] ) , resolve_port ( args [ 1 ] )
except ValueError as e :
self . add_system_message ( str ( e ) , ignore_debug = True )
return
if ( address , port ) not in self . neighbours :
self . add_system_message ( " This client is unknown. Please register it by running "
f " \" /connect { address } { port } \" " , ignore_debug = True )
return
peer = self . find_peer ( address , port )
self . send_packet ( peer , Packet . construct ( HelloTLV . construct ( 8 , self ) ) )
self . add_system_message ( " Hello successfully sent! " , ignore_debug = True )
elif command == " unban " :
if len ( args ) != 2 :
self . add_system_message ( " Usage: /unban address port " , italic = False , ignore_debug = True )
return
try :
address , port = resolve_address ( args [ 0 ] ) , resolve_port ( args [ 1 ] )
except ValueError as e :
self . add_system_message ( str ( e ) , ignore_debug = True )
return
if ( address , port ) not in self . neighbours :
self . add_system_message ( " This client is unknown. Please register it by running "
f " \" /connect { address } { port } \" " , ignore_debug = True )
return
peer = self . find_peer ( address , port )
peer . errors = 0
self . add_system_message ( " The client is unbanned. " , ignore_debug = True )
elif command == " info " :
if len ( args ) > 2 :
self . add_system_message ( " Usage: /info me|id|nickname|addr port " , italic = False , ignore_debug = True )
return
if not args :
peers = [ self ]
elif len ( args ) == 2 :
try :
address , port = resolve_address ( args [ 0 ] ) , resolve_port ( args [ 1 ] )
except ValueError as e :
self . add_system_message ( str ( e ) , ignore_debug = True )
return
if ( address , port ) not in self . neighbours :
self . add_system_message ( " This client is unknown. Please register it by running "
f " \" /connect { address } { port } \" " , ignore_debug = True )
return
peers = [ self . find_peer ( address , port ) ]
else :
peers = list ( self . find_peer_by_nickname ( args [ 0 ] ) )
if args [ 0 ] . isnumeric ( ) :
identifier = int ( args [ 0 ] )
peers . append ( self . find_peer_by_id ( identifier ) )
if not peers :
self . add_system_message ( " Unknown client. " )
return
for peer in peers :
self . add_system_message ( f " **Identifier:** { peer . id or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Nickname:** { peer . nickname or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Addresses:** "
+ " , " . join ( f " { address } : { port } " for address , port in peer . addresses ) ,
italic = False , ignore_debug = True )
elif command == " active " :
if not self . active_peers :
self . add_system_message ( " No active neighbour. " , italic = False , ignore_debug = True )
return
for peer in self . active_peers :
self . add_system_message ( f " **Identifier:** { peer . id or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Nickname:** { peer . nickname or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Addresses:** "
+ " , " . join ( f " { address } : { port } " for address , port in peer . addresses ) ,
italic = False , ignore_debug = True )
elif command == " potential " :
if not self . potential_peers :
self . add_system_message ( " No potential neighbour. " , italic = False , ignore_debug = True )
return
for peer in self . potential_peers :
self . add_system_message ( f " **Identifier:** { peer . id or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Nickname:** { peer . nickname or ' <*unknown*> ' } " ,
italic = False , ignore_debug = True )
self . add_system_message ( f " **Addresses:** "
+ " , " . join ( f " { address } : { port } " for address , port in peer . addresses ) ,
italic = False , ignore_debug = True )
elif command == " debug " :
self . squinnondation . debug ^ = True
self . add_system_message (
" Debug mode " + ( " enabled " if self . squinnondation . debug else " disabled " ) + " . " , ignore_debug = True )
elif command == " emojis " :
self . squinnondation . no_emoji ^ = True
self . add_system_message (
" Emojis support " + ( " disabled " if self . squinnondation . no_emoji else " enabled " ) + " . " ,
ignore_debug = True )
elif command == " markdown " :
self . squinnondation . debug ^ = True
self . add_system_message (
" Markdown support " + ( " disabled " if self . squinnondation . no_markdown else " enabled " ) + " . " ,
ignore_debug = True )
else :
self . add_system_message ( " Unknown command. Please do /help to see available commands. " , ignore_debug = True )
self . data_lock . release ( )
def add_message ( self , msg : str ) - > None :
"""
Store a new message into the history .
"""
self . history . append ( msg )
if self . last_line == len ( self . history ) - 2 :
self . last_line + = 1
def receive_message_from ( self , tlv : DataTLV , msg : str , sender_id : int , nonce : int , relay : Peer ) - > bool :
"""
This method is called by a DataTLV , sent by a real person .
This add the message in the history if not already done .
Returns True iff the message was not already received previously .
"""
self . data_lock . acquire ( )
if ( sender_id , nonce ) not in self . received_messages :
# If it is a new message, add it to recent_messages
d = self . make_inundation_dict ( )
pkt = Packet ( ) . construct ( tlv )
self . recent_messages [ ( sender_id , nonce ) ] = [ pkt , time . time ( ) , d ]
# in all cases, remove the sender from the list of neighbours to be inundated
self . remove_from_inundation ( relay , sender_id , nonce )
if ( sender_id , nonce ) in self . received_messages :
return False
self . add_message ( msg ) # for display purposes
self . received_messages [ ( sender_id , nonce ) ] = Message ( msg , sender_id , nonce )
self . data_lock . release ( )
return True
def make_inundation_dict ( self ) - > dict :
"""
Takes the active peers dictionnary and returns a list of [ peer , date + random , 0 ]
"""
self . data_lock . acquire ( )
res = dict ( )
peers = self . active_peers
for peer in peers :
if peer . symmetric :
next_send = uniform ( 1 , 2 )
res [ peer . main_address ] = [ peer , time . time ( ) + next_send , 0 ]
self . data_lock . release ( )
return res
def remove_from_inundation ( self , peer : Peer , sender_id : int , nonce : int ) - > None :
"""
Remove the sender from the list of neighbours to be inundated
"""
self . data_lock . acquire ( )
if ( sender_id , nonce ) in self . recent_messages :
# If a peer is late in its acknowledgement, the absence of the previous if causes an error.
for addr in peer . addresses :
self . recent_messages [ ( sender_id , nonce ) ] [ 2 ] . pop ( addr , None )
if not self . recent_messages [ ( sender_id , nonce ) ] [ 2 ] : # If dictionnary is empty, remove the message
self . recent_messages . pop ( ( sender_id , nonce ) , None )
self . data_lock . release ( )
def clean_inundation ( self ) - > None :
"""
Remove messages which are overdue ( older than 2 minutes ) from the inundation dictionnary .
"""
self . data_lock . acquire ( )
for key in self . recent_messages :
if time . time ( ) - self . recent_messages [ key ] [ 1 ] > 120 :
self . recent_messages . pop ( key )
self . data_lock . release ( )
def main_inundation ( self ) - > None :
"""
The main inundation function .
"""
for key in self . recent_messages :
k = list ( self . recent_messages [ key ] [ 2 ] . keys ( ) )
for key2 in k :
if time . time ( ) > = self . recent_messages [ key ] [ 2 ] [ key2 ] [ 1 ] :
self . add_system_message ( f " inundating { self . recent_messages [ key ] [ 2 ] [ key2 ] [ 0 ] . id } with message { key } " )
# send the packet if it is overdue
self . send_packet ( self . recent_messages [ key ] [ 2 ] [ key2 ] [ 0 ] , self . recent_messages [ key ] [ 0 ] )
# change the time until the next send
a = self . recent_messages [ key ] [ 2 ] [ key2 ] [ 2 ]
self . recent_messages [ key ] [ 2 ] [ key2 ] [ 2 ] = a + 1
next_send = uniform ( 2 * * ( a - 1 ) , 2 * * a )
self . recent_messages [ key ] [ 2 ] [ key2 ] [ 1 ] = time . time ( ) + next_send
if self . recent_messages [ key ] [ 2 ] [ key2 ] [ 2 ] > = 5 : # the neighbour is not reactive enough
gatlv = GoAwayTLV ( ) . construct ( GoAwayType . TIMEOUT , f " { self . id } No acknowledge " )
pkt = Packet ( ) . construct ( gatlv )
peer = self . recent_messages [ key ] [ 2 ] [ key2 ] [ 0 ]
self . send_packet ( peer , pkt )
self . recent_messages [ key ] [ 2 ] . pop ( key2 )
def add_system_message ( self , msg : str , italic : bool = True , ignore_debug : bool = False ) - > None :
"""
Add a new system log message .
"""
if self . squinnondation . debug or ignore_debug :
return self . add_message (
f " system: * { msg } * " if not self . squinnondation . no_markdown and italic else f " system: { msg } " )
def print_markdown ( self , pad : Any , y : int , x : int , msg : str ,
bold : bool = False , italic : bool = False , underline : bool = False , strike : bool = False ) - > int :
"""
Parse a markdown - formatted text and format the text as bold , italic or text text .
* * * text * * * : bold , italic
* * text * * : bold
* text * : italic
__text__ : underline
_text_ : italic
~ ~ text ~ ~ : strikethrough
"""
# Replace :emoji_name: by the good emoji
if not self . squinnondation . no_emoji :
import emoji
msg = emoji . emojize ( msg , use_aliases = True )
if self . squinnondation . no_markdown :
pad . addstr ( y , x , msg )
return len ( msg )
underline_match = re . match ( " (.*)__(.*)__(.*) " , msg )
if underline_match :
before , text , after = underline_match . group ( 1 ) , underline_match . group ( 2 ) , underline_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline )
len_mid = self . print_markdown ( pad , y , x + len_before , text , bold , italic , not underline )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline )
return len_before + len_mid + len_after
italic_match = re . match ( " (.*)_(.*)_(.*) " , msg )
if italic_match :
before , text , after = italic_match . group ( 1 ) , italic_match . group ( 2 ) , italic_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline )
len_mid = self . print_markdown ( pad , y , x + len_before , text , bold , not italic , underline )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline )
return len_before + len_mid + len_after
bold_italic_match = re . match ( " (.*) \\ * \\ * \\ *(.*) \\ * \\ * \\ *(.*) " , msg )
if bold_italic_match :
before , text , after = bold_italic_match . group ( 1 ) , bold_italic_match . group ( 2 ) , \
bold_italic_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline , strike )
len_mid = self . print_markdown ( pad , y , x + len_before , text , not bold , not italic , underline , strike )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline , strike )
return len_before + len_mid + len_after
bold_match = re . match ( " (.*) \\ * \\ *(.*) \\ * \\ *(.*) " , msg )
if bold_match :
before , text , after = bold_match . group ( 1 ) , bold_match . group ( 2 ) , bold_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline , strike )
len_mid = self . print_markdown ( pad , y , x + len_before , text , not bold , italic , underline , strike )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline , strike )
return len_before + len_mid + len_after
italic_match = re . match ( " (.*) \\ *(.*) \\ *(.*) " , msg )
if italic_match :
before , text , after = italic_match . group ( 1 ) , italic_match . group ( 2 ) , italic_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline , strike )
len_mid = self . print_markdown ( pad , y , x + len_before , text , bold , not italic , underline , strike )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline , strike )
return len_before + len_mid + len_after
strike_match = re . match ( " (.*)~~(.*)~~(.*) " , msg )
if strike_match :
before , text , after = strike_match . group ( 1 ) , strike_match . group ( 2 ) , strike_match . group ( 3 )
len_before = self . print_markdown ( pad , y , x , before , bold , italic , underline , strike )
len_mid = self . print_markdown ( pad , y , x + len_before , text , bold , italic , underline , not strike )
len_after = self . print_markdown ( pad , y , x + len_before + len_mid , after , bold , italic , underline , strike )
return len_before + len_mid + len_after
size = len ( msg )
attrs = 0
attrs | = curses . A_BOLD if bold else 0
attrs | = curses . A_ITALIC if italic else 0
attrs | = curses . A_UNDERLINE if underline else 0
if strike :
msg = " " . join ( c + " \u0336 " for c in msg )
remaining_lines = curses . LINES - 3 - ( y + x / / ( curses . COLS - 1 ) ) + 1
if remaining_lines > 0 :
# Don't print the end of the line if it is too long
space_left_on_line = ( curses . COLS - 2 ) - ( x % ( curses . COLS - 1 ) )
msg = msg [ : space_left_on_line + max ( 0 , ( curses . COLS - 1 ) * ( remaining_lines - 1 ) ) ]
if msg :
pad . addstr ( y + x / / ( curses . COLS - 1 ) , x % ( curses . COLS - 1 ) , msg , attrs )
return size
def refresh_history ( self ) - > None :
"""
Rewrite the history of the messages .
"""
self . refresh_lock . acquire ( )
y , x = self . squinnondation . screen . getmaxyx ( )
if curses . is_term_resized ( curses . LINES , curses . COLS ) :
curses . resizeterm ( y , x )
self . history_pad . resize ( curses . LINES - 2 , curses . COLS - 1 )
self . input_pad . resize ( 1 , curses . COLS - 1 )
self . history_pad . erase ( )
y_offset = 0
for i , msg in enumerate ( self . history [ max ( 0 , self . last_line - curses . LINES + 3 ) : self . last_line + 1 ] ) :
if i + y_offset > curses . LINES - 3 :
break
msg = re . sub ( " ([^:]*): (.*) " , " < \\ 1> \\ 2 " , msg , 1 )
if not re . match ( " <.*> .* " , msg ) :
msg = " <unknown> " + msg
match = re . match ( " <(.*)> (.*) " , msg )
nickname = match . group ( 1 )
msg = match . group ( 2 )
full_msg = f " < { nickname } > { msg } "
color_id = sum ( ord ( c ) for c in nickname ) % 6 + 1
true_width = self . print_markdown ( self . history_pad , i + y_offset , 0 , full_msg )
self . history_pad . addstr ( i + y_offset , 1 , nickname , curses . A_BOLD | curses . color_pair ( color_id + 1 ) )
y_offset + = true_width / / ( curses . COLS - 1 )
self . history_pad . refresh ( 0 , 0 , 0 , 0 , curses . LINES - 2 , curses . COLS )
self . refresh_lock . release ( )
def refresh_input ( self ) - > None :
"""
Redraw input line . Must not be called while the message is not sent .
"""
self . refresh_lock . acquire ( )
self . input_pad . erase ( )
color_id = sum ( ord ( c ) for c in self . nickname ) % 6 + 1
self . input_pad . addstr ( 0 , 0 , " < " )
self . input_pad . addstr ( 0 , 1 , self . nickname , curses . A_BOLD | curses . color_pair ( color_id + 1 ) )
self . input_pad . addstr ( 0 , 1 + len ( self . nickname ) , " > " )
msg = self . input_buffer
if self . input_index > = curses . COLS - len ( self . nickname ) - 7 :
msg = msg [ self . input_index - ( curses . COLS - len ( self . nickname ) - 7 ) : self . input_index ]
msg = msg [ : curses . COLS - len ( self . nickname ) - 7 ]
self . input_pad . addstr ( 0 , 3 + len ( self . nickname ) , msg )
if not self . squinnondation . no_emoji :
self . input_pad . addstr ( 0 , self . input_pad . getmaxyx ( ) [ 1 ] - 3 , " 😀 " )
self . input_pad . refresh ( 0 , 0 , curses . LINES - 1 , 0 , curses . LINES - 1 , curses . COLS - 1 )
self . refresh_lock . release ( )
def refresh_emoji_pad ( self ) - > None :
"""
Display the emoji pad if necessary .
"""
if self . squinnondation . no_emoji :
return
from emoji import unicode_codes
self . refresh_lock . acquire ( )
self . emoji_pad . erase ( )
if self . emoji_panel_page > 0 :
height , width = curses . LINES / / 2 , curses . COLS / / 3
self . emoji_pad . resize ( height + 1 , width + 1 )
self . emoji_pad . addstr ( 0 , 0 , " ┏ " + ( width - 2 ) * " ━ " + " ┓ " )
self . emoji_pad . addstr ( 0 , ( width - 14 ) / / 2 , " == EMOJIS == " )
for i in range ( 1 , height ) :
self . emoji_pad . addstr ( i , 0 , " ┃ " + ( width - 2 ) * " " + " ┃ " )
self . emoji_pad . addstr ( height - 1 , 0 , " ┗ " + ( width - 2 ) * " ━ " + " ┛ " )
emojis = list ( unicode_codes . UNICODE_EMOJI )
emojis = [ c for c in emojis if len ( c ) == 1 ]
size = ( height - 2 ) * ( width - 4 ) / / 2
page = emojis [ ( self . emoji_panel_page - 1 ) * size : self . emoji_panel_page * size ]
if self . emoji_panel_page != 1 :
self . emoji_pad . addstr ( 1 , width - 2 , " ⬆ " )
if len ( page ) == size :
self . emoji_pad . addstr ( height - 2 , width - 2 , " ⬇ " )
for i in range ( height - 2 ) :
for j in range ( ( width - 4 ) / / 2 + 1 ) :
index = i * ( width - 4 ) / / 2 + j
if index < len ( page ) :
self . emoji_pad . addstr ( i + 1 , 2 * j + 1 , page [ index ] )
self . emoji_pad . refresh ( 0 , 0 , curses . LINES - height - 2 , curses . COLS - width - 2 ,
curses . LINES - 2 , curses . COLS - 2 )
self . refresh_lock . release ( )
def potential_to_contact ( self ) - > list :
"""
Returns a list of peers the user should contact if it does
not have enough symmetric neighbours .
"""
self . data_lock . acquire ( )
res = [ ]
val = list ( self . potential_peers )
lp = len ( val )
for i in range ( min ( lp , max ( 0 , self . minNS - self . nbNS ) ) ) :
a = randint ( 0 , lp - 1 )
res . append ( val [ a ] )
self . data_lock . release ( )
return res
def send_hello ( self ) - > None :
"""
Sends a long HelloTLV to all active neighbours .
"""
self . data_lock . acquire ( )
for peer in self . active_peers :
htlv = HelloTLV ( ) . construct ( 16 , self , peer )
pkt = Packet ( ) . construct ( htlv )
self . send_packet ( peer , pkt )
self . data_lock . release ( )
def verify_activity ( self ) - > None :
"""
All neighbours that have not sent a HelloTLV in the last 2
minutes are considered not active .
"""
self . data_lock . acquire ( )
val = list ( self . active_peers ) # create a copy because the dict size will change
for peer in val :
if time . time ( ) - peer . last_hello_time > 2 * 60 :
gatlv = GoAwayTLV ( ) . construct ( GoAwayType . TIMEOUT , " you did not talk to me " )
pkt = Packet ( ) . construct ( gatlv )
self . send_packet ( peer , pkt )
peer . active = False
self . update_peer_table ( peer )
self . data_lock . release ( )
def update_peer_table ( self , peer : Peer ) - > None :
"""
We insert the peer into our table of clients .
If there is a collision with the address / the ID , then we merge clients into a unique one .
"""
self . data_lock . acquire ( )
for addr in peer . addresses :
if addr in self . neighbours :
# Merge with the previous peer
old_peer = self . neighbours [ addr ]
peer . merge ( old_peer )
self . neighbours [ addr ] = peer
for other_peer in list ( self . neighbours . values ( ) ) :
if other_peer . id == peer . id > 0 and other_peer != peer :
# The peer with the same id is known as a different address. We merge everything
peer . merge ( other_peer )
self . data_lock . release ( )
def send_neighbours ( self ) - > None :
"""
Update the number of symmetric neighbours and
send all neighbours NeighbourTLV
"""
self . data_lock . acquire ( )
nb_ns = 0
# could send the same to all neighbour, but it means that neighbour
# A could receive a message with itself in it -> if the others do not pay attention, trouble
for peer in self . active_peers :
if time . time ( ) - peer . last_long_hello_time < = 2 * 60 :
nb_ns + = 1
peer . symmetric = True
ntlv = NeighbourTLV ( ) . construct ( * peer . main_address )
pkt = Packet ( ) . construct ( ntlv )
for destination in self . active_peers :
if destination . id != peer . id :
self . send_packet ( destination , pkt )
else :
peer . symmetric = False
self . nbNS = nb_ns
self . data_lock . release ( )
def leave ( self ) - > None :
"""
The program is exited . We send a GoAway to our neighbours , then close the program .
"""
# Last inundation
self . main_inundation ( )
self . clean_inundation ( )
# Broadcast a GoAway
gatlv = GoAwayTLV ( ) . construct ( GoAwayType . EXIT , " I am leaving! Good bye! " )
pkt = Packet . construct ( gatlv )
for peer in self . active_peers :
self . send_packet ( peer , pkt )
exit ( 0 )
2021-01-07 14:06:23 +00:00
def send_hello_multicast ( self ) - > int :
"""
Send a short hello on the multicast group .
"""
htlv = HelloTLV ( ) . construct ( 8 , self )
pkt = Packet ( ) . construct ( htlv )
res = self . send_multicast ( pkt . marshal ( ) )
return res
def send_multicast ( self , data : bytes ) - > int :
"""
Send a packet on the multicast .
"""
return self . multicast_socket . sendto ( data , ( " ff02::4242:4242 " , 1212 ) )
def receive_hello_multicast ( self ) - > Tuple [ Packet , Peer ] :
"""
Receive a packet from the multicast and translate it into a Python object .
"""
data , addr = self . receive_raw_data ( )
peer = self . find_peer ( addr [ 0 ] , addr [ 1 ] )
try :
pkt = Packet . unmarshal ( data )
except ValueError as error :
# The packet contains an error. We memorize it.
peer . errors + = 1
self . add_system_message ( " An error occured on the multicast " )
raise error
else :
return pkt , peer
2021-01-06 20:47:07 +00:00
2021-01-07 14:06:23 +00:00
def receive_multicast ( self ) - > Tuple [ bytes , Any ] :
"""
Receive a packet from the socket .
"""
return self . multicast_socket . recvfrom ( 1024 )
2021-01-06 20:47:07 +00:00
2021-01-07 14:06:23 +00:00
class Listener ( Thread ) :
2021-01-06 20:47:07 +00:00
"""
2021-01-07 14:06:23 +00:00
It is the peer listener .
2021-01-06 20:47:07 +00:00
It always waits for an incoming packet , then it treats it , and continues to wait .
It is in a dedicated thread .
"""
def __init__ ( self , user : User , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . user = user
def run ( self ) - > None :
while True :
try :
pkt , peer = self . user . receive_packet ( )
except ValueError as error :
self . user . add_system_message ( " An error occurred while receiving a packet: {} " . format ( error ) )
self . user . refresh_history ( )
self . user . refresh_input ( )
else :
if peer . banned :
# Ignore banned peers
continue
for tlv in pkt . body :
tlv . handle ( self . user , peer )
self . user . refresh_history ( )
self . user . refresh_input ( )
2021-01-07 14:06:23 +00:00
class Multicastlistener ( Thread ) :
"""
Used to listen on the multicast group to discover new people
"""
def __init__ ( self , user : User , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . user = user
def run ( self ) - > None :
while True :
try :
pkt , peer = self . user . receive_hello_multicast ( )
except ValueError as error :
self . user . add_system_message ( " An error occurred while receiving a packet: {} " . format ( error ) )
self . user . refresh_history ( )
self . user . refresh_input ( )
else :
if peer . banned :
# Ignore banned peers
continue
for tlv in pkt . body :
if tlv . type == 2 and tlv . length == 8 : # Only short hello TLVs allowed
tlv . handle_multicast ( self . user , peer )
self . user . refresh_history ( )
self . user . refresh_input ( )
2021-01-06 20:47:07 +00:00
class PeerManager ( Thread ) :
"""
A process to cleanly manage the user ' s neighbours
"""
def __init__ ( self , user : User , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . user = user
self . last_potential = 0
self . last_check = 0
self . last_neighbour = 0
2021-01-07 14:06:23 +00:00
self . last_multicast = 0
2021-01-06 20:47:07 +00:00
htlv = HelloTLV ( ) . construct ( 8 , self . user )
pkt = Packet ( ) . construct ( htlv )
self . hellopkt = pkt
def run ( self ) - > None :
while True :
# First part of neighbour management: ensure the user has enough
# symmetric neighbours.
if time . time ( ) - self . last_potential > 30 :
to_contact = self . user . potential_to_contact ( )
for peer in to_contact :
self . user . send_packet ( peer , self . hellopkt )
self . last_potential = time . time ( )
# Second part: send long HelloTLVs to neighbours every 30 seconds
if time . time ( ) - self . last_check > 30 :
self . user . add_system_message ( f " I have { len ( self . user . active_peers ) } friends " )
self . user . send_hello ( )
self . last_check = time . time ( )
# Third part: get rid of inactive neighbours
self . user . verify_activity ( )
# Fourth part: verify symmetric neighbours and send NeighbourTLV every minute
if time . time ( ) - self . last_neighbour > 60 :
self . user . send_neighbours ( )
self . last_neighbour = time . time ( )
2021-01-07 14:06:23 +00:00
# For the multicast discovery : send a hello every minute.
if time . time ( ) - self . last_multicast > 60 :
self . user . send_hello_multicast ( )
self . last_multicast = time . time ( )
2021-01-06 20:47:07 +00:00
# Avoid infinite loops
time . sleep ( 1 )
class Inondator ( Thread ) :
"""
A process to manage the inondation
"""
def __init__ ( self , user : User , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . user = user
self . last_check = 0
def run ( self ) - > None :
while True :
# clean the dictionnary
if time . time ( ) - self . last_check > 30 :
self . user . clean_inundation ( )
self . last_check = time . time ( )
# inundate
self . user . main_inundation ( )
# Avoid infinite loops
time . sleep ( 1 )
class Message :
"""
This class symbolises the data sent by a real client , excluding system messages .
This is useful to check unicity or to save and load messages .
"""
content : str
# TODO: Replace the id by the good (potential) peer
sender_id : int
nonce : int
created_at : datetime
def __init__ ( self , content : str , sender_id : int , nonce : int , created_at : datetime = None ) :
self . content = content
self . sender_id = sender_id
self . nonce = nonce
self . created_at = created_at or datetime . now ( )