From 1702d258bf12ffc8eb265460a01e02308451eddf Mon Sep 17 00:00:00 2001 From: eichhornchen Date: Thu, 7 Jan 2021 15:06:23 +0100 Subject: [PATCH] multicast --- squinnondation/messages.py | 16 +++++++ squinnondation/peers.py | 95 ++++++++++++++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/squinnondation/messages.py b/squinnondation/messages.py index 5b8c4f8..a7d0673 100644 --- a/squinnondation/messages.py +++ b/squinnondation/messages.py @@ -199,6 +199,22 @@ class HelloTLV(TLV): if not self.is_long: user.send_packet(sender, Packet.construct(HelloTLV.construct(16, user, sender))) + def handle_multicast(self, user: Any, sender: Any) -> None: + if sender.id > 0 and sender.id != self.source_id: + user.add_system_message(f"A client known as the id {sender.id} declared that it uses " + f"the id {self.source_id}.") + sender.id = self.source_id + + if self.source_id == user.id: + sender.marked_as_banned = True + + if not sender.active: + sender.id = self.source_id # The sender we are given misses an id + + # Add entry to/actualize the active peers dictionnary + user.update_peer_table(sender) + user.add_system_message(f"{self.source_id} sent me a Hello on multicast") + @property def is_long(self) -> bool: return self.length == 16 diff --git a/squinnondation/peers.py b/squinnondation/peers.py index 584b86f..fa7dca3 100644 --- a/squinnondation/peers.py +++ b/squinnondation/peers.py @@ -10,6 +10,7 @@ import curses import re import socket import time +import struct from .messages import Packet, DataTLV, HelloTLV, GoAwayTLV, GoAwayType, NeighbourTLV, WarningTLV @@ -90,7 +91,7 @@ class User(Peer): """ def __init__(self, instance: Any, nickname: str): super().__init__(nickname, instance.bind_address, instance.bind_port) - + # Random identifier on 64 bits self.id = randint(0, 1 << 64 - 1) self.incr_nonce = 0 @@ -99,6 +100,13 @@ class User(Peer): self.socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # Bind the socket self.socket.bind(self.main_address) + + # Create multicast socket + self.multicast_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + self.multicast_socket.bind(('', 1212)) #listen on all interfaces? + # To join the group, we need to give setsockopt a binary packed representation of the multicast group's address and on what interfaces we will listen (here all) + mreq = struct.pack("16s15s", socket.inet_pton(socket.AF_INET6, "ff02::4242:4242"), bytes(socket.INADDR_ANY)) #The string "16s15s" corresponds to the packing options: here it packs the arguments into a 16-byte string and a 15-byte string. + self.multicast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) self.squinnondation = instance @@ -108,7 +116,7 @@ class User(Peer): # Lock the refresh function in order to avoid concurrent refresh self.refresh_lock = RLock() - # Lock function that can be used by two threads to avoid concurrent refresh + # Lock functions that can be used by two threads to avoid concurrent refresh self.data_lock = RLock() self.history = [] @@ -129,7 +137,7 @@ class User(Peer): self.nbNS = 0 self.minNS = 3 # minimal number of symmetric neighbours a user needs to have. - self.worm = Worm(self) + self.listener = Listener(self) self.neighbour_manager = PeerManager(self) self.inondator = Inondator(self) @@ -243,11 +251,11 @@ class User(Peer): Start asynchronous threads. """ # Kill subthreads when exitting the program - self.worm.setDaemon(True) + self.listener.setDaemon(True) self.neighbour_manager.setDaemon(True) self.inondator.setDaemon(True) - self.worm.start() + self.listener.start() self.neighbour_manager.start() self.inondator.start() @@ -998,11 +1006,49 @@ class User(Peer): self.send_packet(peer, pkt) exit(0) + + def send_hello_multicast(self) -> int: + """ + Send a short hello on the multicast group. + """ + htlv = HelloTLV().construct(8, self) + pkt = Packet().construct(htlv) + + res = self.send_multicast(pkt.marshal()) + return res + + def send_multicast(self, data: bytes) -> int: + """ + Send a packet on the multicast. + """ + return self.multicast_socket.sendto(data, ("ff02::4242:4242", 1212)) + + def receive_hello_multicast(self) -> Tuple[Packet, Peer]: + """ + Receive a packet from the multicast and translate it into a Python object. + """ + data, addr = self.receive_raw_data() + peer = self.find_peer(addr[0], addr[1]) + try: + pkt = Packet.unmarshal(data) + except ValueError as error: + # The packet contains an error. We memorize it. + peer.errors += 1 + self.add_system_message("An error occured on the multicast") + raise error + else: + return pkt, peer + + def receive_multicast(self) -> Tuple[bytes, Any]: + """ + Receive a packet from the socket. + """ + return self.multicast_socket.recvfrom(1024) -class Worm(Thread): +class Listener(Thread): """ - The worm is the peer listener. + It is the peer listener. It always waits for an incoming packet, then it treats it, and continues to wait. It is in a dedicated thread. """ @@ -1028,6 +1074,33 @@ class Worm(Thread): self.user.refresh_history() self.user.refresh_input() +class Multicastlistener(Thread): + """ + Used to listen on the multicast group to discover new people + """ + def __init__(self, user: User, *args, **kwargs): + super().__init__(*args, **kwargs) + self.user = user + + def run(self) -> None: + while True: + try: + pkt, peer = self.user.receive_hello_multicast() + except ValueError as error: + self.user.add_system_message("An error occurred while receiving a packet: {}".format(error)) + self.user.refresh_history() + self.user.refresh_input() + else: + if peer.banned: + # Ignore banned peers + continue + + for tlv in pkt.body: + if tlv.type==2 and tlv.length==8: # Only short hello TLVs allowed + tlv.handle_multicast(self.user, peer) + self.user.refresh_history() + self.user.refresh_input() + class PeerManager(Thread): """ @@ -1039,6 +1112,7 @@ class PeerManager(Thread): self.last_potential = 0 self.last_check = 0 self.last_neighbour = 0 + self.last_multicast = 0 htlv = HelloTLV().construct(8, self.user) pkt = Packet().construct(htlv) @@ -1068,7 +1142,12 @@ class PeerManager(Thread): if time.time() - self.last_neighbour > 60: self.user.send_neighbours() self.last_neighbour = time.time() - + + # For the multicast discovery : send a hello every minute. + if time.time() - self.last_multicast > 60: + self.user.send_hello_multicast() + self.last_multicast = time.time() + # Avoid infinite loops time.sleep(1)