multicast

This commit is contained in:
eichhornchen 2021-01-07 15:06:23 +01:00
parent df3298a39c
commit 1702d258bf
2 changed files with 103 additions and 8 deletions

View File

@ -199,6 +199,22 @@ class HelloTLV(TLV):
if not self.is_long:
user.send_packet(sender, Packet.construct(HelloTLV.construct(16, user, sender)))
def handle_multicast(self, user: Any, sender: Any) -> None:
if sender.id > 0 and sender.id != self.source_id:
user.add_system_message(f"A client known as the id {sender.id} declared that it uses "
f"the id {self.source_id}.")
sender.id = self.source_id
if self.source_id == user.id:
sender.marked_as_banned = True
if not sender.active:
sender.id = self.source_id # The sender we are given misses an id
# Add entry to/actualize the active peers dictionnary
user.update_peer_table(sender)
user.add_system_message(f"{self.source_id} sent me a Hello on multicast")
@property
def is_long(self) -> bool:
return self.length == 16

View File

@ -10,6 +10,7 @@ import curses
import re
import socket
import time
import struct
from .messages import Packet, DataTLV, HelloTLV, GoAwayTLV, GoAwayType, NeighbourTLV, WarningTLV
@ -90,7 +91,7 @@ class User(Peer):
"""
def __init__(self, instance: Any, nickname: str):
super().__init__(nickname, instance.bind_address, instance.bind_port)
# Random identifier on 64 bits
self.id = randint(0, 1 << 64 - 1)
self.incr_nonce = 0
@ -99,6 +100,13 @@ class User(Peer):
self.socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
# Bind the socket
self.socket.bind(self.main_address)
# Create multicast socket
self.multicast_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.multicast_socket.bind(('', 1212)) #listen on all interfaces?
# To join the group, we need to give setsockopt a binary packed representation of the multicast group's address and on what interfaces we will listen (here all)
mreq = struct.pack("16s15s", socket.inet_pton(socket.AF_INET6, "ff02::4242:4242"), bytes(socket.INADDR_ANY)) #The string "16s15s" corresponds to the packing options: here it packs the arguments into a 16-byte string and a 15-byte string.
self.multicast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
self.squinnondation = instance
@ -108,7 +116,7 @@ class User(Peer):
# Lock the refresh function in order to avoid concurrent refresh
self.refresh_lock = RLock()
# Lock function that can be used by two threads to avoid concurrent refresh
# Lock functions that can be used by two threads to avoid concurrent refresh
self.data_lock = RLock()
self.history = []
@ -129,7 +137,7 @@ class User(Peer):
self.nbNS = 0
self.minNS = 3 # minimal number of symmetric neighbours a user needs to have.
self.worm = Worm(self)
self.listener = Listener(self)
self.neighbour_manager = PeerManager(self)
self.inondator = Inondator(self)
@ -243,11 +251,11 @@ class User(Peer):
Start asynchronous threads.
"""
# Kill subthreads when exitting the program
self.worm.setDaemon(True)
self.listener.setDaemon(True)
self.neighbour_manager.setDaemon(True)
self.inondator.setDaemon(True)
self.worm.start()
self.listener.start()
self.neighbour_manager.start()
self.inondator.start()
@ -998,11 +1006,49 @@ class User(Peer):
self.send_packet(peer, pkt)
exit(0)
def send_hello_multicast(self) -> int:
"""
Send a short hello on the multicast group.
"""
htlv = HelloTLV().construct(8, self)
pkt = Packet().construct(htlv)
res = self.send_multicast(pkt.marshal())
return res
def send_multicast(self, data: bytes) -> int:
"""
Send a packet on the multicast.
"""
return self.multicast_socket.sendto(data, ("ff02::4242:4242", 1212))
def receive_hello_multicast(self) -> Tuple[Packet, Peer]:
"""
Receive a packet from the multicast and translate it into a Python object.
"""
data, addr = self.receive_raw_data()
peer = self.find_peer(addr[0], addr[1])
try:
pkt = Packet.unmarshal(data)
except ValueError as error:
# The packet contains an error. We memorize it.
peer.errors += 1
self.add_system_message("An error occured on the multicast")
raise error
else:
return pkt, peer
def receive_multicast(self) -> Tuple[bytes, Any]:
"""
Receive a packet from the socket.
"""
return self.multicast_socket.recvfrom(1024)
class Worm(Thread):
class Listener(Thread):
"""
The worm is the peer listener.
It is the peer listener.
It always waits for an incoming packet, then it treats it, and continues to wait.
It is in a dedicated thread.
"""
@ -1028,6 +1074,33 @@ class Worm(Thread):
self.user.refresh_history()
self.user.refresh_input()
class Multicastlistener(Thread):
"""
Used to listen on the multicast group to discover new people
"""
def __init__(self, user: User, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
def run(self) -> None:
while True:
try:
pkt, peer = self.user.receive_hello_multicast()
except ValueError as error:
self.user.add_system_message("An error occurred while receiving a packet: {}".format(error))
self.user.refresh_history()
self.user.refresh_input()
else:
if peer.banned:
# Ignore banned peers
continue
for tlv in pkt.body:
if tlv.type==2 and tlv.length==8: # Only short hello TLVs allowed
tlv.handle_multicast(self.user, peer)
self.user.refresh_history()
self.user.refresh_input()
class PeerManager(Thread):
"""
@ -1039,6 +1112,7 @@ class PeerManager(Thread):
self.last_potential = 0
self.last_check = 0
self.last_neighbour = 0
self.last_multicast = 0
htlv = HelloTLV().construct(8, self.user)
pkt = Packet().construct(htlv)
@ -1068,7 +1142,12 @@ class PeerManager(Thread):
if time.time() - self.last_neighbour > 60:
self.user.send_neighbours()
self.last_neighbour = time.time()
# For the multicast discovery : send a hello every minute.
if time.time() - self.last_multicast > 60:
self.user.send_hello_multicast()
self.last_multicast = time.time()
# Avoid infinite loops
time.sleep(1)