diff --git a/squinnondation/peers.py b/squinnondation/peers.py index 40a390a..60a448b 100644 --- a/squinnondation/peers.py +++ b/squinnondation/peers.py @@ -117,7 +117,7 @@ class User(Peer): # Lock the refresh function in order to avoid concurrent refresh self.refresh_lock = RLock() - # Lock functions that can be used by two threads to avoid concurrent refresh + # Lock functions that can be used by two threads to avoid concurrent writing self.data_lock = RLock() self.history = [] @@ -182,25 +182,18 @@ class User(Peer): Retrieve the peer that is known by its id. Return None if it is unknown. The given identifier must be positive. """ - self.data_lock.acquire() - if peer_id > 0: for peer in self.neighbours.values(): if peer.id == peer_id: return peer - - self.data_lock.release() def find_peer_by_nickname(self, nickname: str) -> Generator[Peer, Any, None]: """ Retrieve the peers that are known by their nicknames. """ - self.data_lock.acquire() - for peer in self.neighbours.values(): if peer.nickname == nickname: yield peer - self.data_lock.release() def send_packet(self, client: Peer, pkt: Packet) -> int: """ @@ -426,7 +419,6 @@ class User(Peer): """ The user sent a command. We analyse it and process what is needed. """ - self.data_lock.acquire() def resolve_address(address: str) -> str: # Resolve address @@ -483,8 +475,10 @@ class User(Peer): if (address, port) in self.neighbours: self.add_system_message("There is already a known client with this address.", ignore_debug=True) return + self.data_lock.acquire() peer = self.new_peer(address, port) self.neighbours[(address, port)] = peer + self.data_lock.release() self.add_system_message(f"Potential client successfully added! You can send a hello by running " f"\"/hello {address} {port}\".", ignore_debug=True) elif command == "hello": @@ -531,7 +525,9 @@ class User(Peer): return if not args: + self.data_lock.acquire() peers = [self] + self.data_lock.release() elif len(args) == 2: try: address, port = resolve_address(args[0]), resolve_port(args[1]) @@ -543,8 +539,9 @@ class User(Peer): self.add_system_message("This client is unknown. Please register it by running " f"\"/connect {address} {port}\"", ignore_debug=True) return - + self.data_lock.acquire() peers = [self.find_peer(address, port)] + self.data_lock.release() else: peers = list(self.find_peer_by_nickname(args[0])) if args[0].isnumeric(): @@ -604,8 +601,6 @@ class User(Peer): ignore_debug=True) else: self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True) - - self.data_lock.release() def add_message(self, msg: str) -> None: """ @@ -621,7 +616,6 @@ class User(Peer): This add the message in the history if not already done. Returns True iff the message was not already received previously. """ - self.data_lock.acquire() if (sender_id, nonce) not in self.received_messages: # If it is a new message, add it to recent_messages @@ -638,29 +632,24 @@ class User(Peer): self.add_message(msg) # for display purposes self.received_messages[(sender_id, nonce)] = Message(msg, sender_id, nonce) - self.data_lock.release() return True def make_inundation_dict(self) -> dict: """ Takes the active peers dictionnary and returns a list of [peer, date+random, 0] """ - self.data_lock.acquire() - res = dict() peers = self.active_peers for peer in peers: if peer.symmetric: next_send = uniform(1, 2) res[peer.main_address] = [peer, time.time() + next_send, 0] - self.data_lock.release() return res def remove_from_inundation(self, peer: Peer, sender_id: int, nonce: int) -> None: """ Remove the sender from the list of neighbours to be inundated """ - self.data_lock.acquire() if (sender_id, nonce) in self.recent_messages: # If a peer is late in its acknowledgement, the absence of the previous if causes an error. for addr in peer.addresses: @@ -668,17 +657,14 @@ class User(Peer): if not self.recent_messages[(sender_id, nonce)][2]: # If dictionnary is empty, remove the message self.recent_messages.pop((sender_id, nonce), None) - self.data_lock.release() def clean_inundation(self) -> None: """ Remove messages which are overdue (older than 2 minutes) from the inundation dictionnary. """ - self.data_lock.acquire() for key in self.recent_messages: if time.time() - self.recent_messages[key][1] > 120: self.recent_messages.pop(key) - self.data_lock.release() def main_inundation(self) -> None: @@ -693,7 +679,7 @@ class User(Peer): # send the packet if it is overdue self.send_packet(self.recent_messages[key][2][key2][0], self.recent_messages[key][0]) - + # change the time until the next send a = self.recent_messages[key][2][key2][2] self.recent_messages[key][2][key2][2] = a + 1 @@ -907,8 +893,6 @@ class User(Peer): Returns a list of peers the user should contact if it does not have enough symmetric neighbours. """ - self.data_lock.acquire() - res = [] val = list(self.potential_peers) lp = len(val) @@ -916,30 +900,22 @@ class User(Peer): for i in range(min(lp, max(0, self.minNS - self.nbNS))): a = randint(0, lp - 1) res.append(val[a]) - - self.data_lock.release() return res def send_hello(self) -> None: """ Sends a long HelloTLV to all active neighbours. """ - self.data_lock.acquire() - for peer in self.active_peers: htlv = HelloTLV().construct(16, self, peer) pkt = Packet().construct(htlv) self.send_packet(peer, pkt) - - self.data_lock.release() def verify_activity(self) -> None: """ All neighbours that have not sent a HelloTLV in the last 2 minutes are considered not active. """ - self.data_lock.acquire() - val = list(self.active_peers) # create a copy because the dict size will change for peer in val: @@ -949,8 +925,6 @@ class User(Peer): self.send_packet(peer, pkt) peer.active = False self.update_peer_table(peer) - - self.data_lock.release() def update_peer_table(self, peer: Peer) -> None: """ @@ -978,7 +952,6 @@ class User(Peer): Update the number of symmetric neighbours and send all neighbours NeighbourTLV """ - self.data_lock.acquire() nb_ns = 0 # could send the same to all neighbour, but it means that neighbour @@ -995,8 +968,6 @@ class User(Peer): else: peer.symmetric = False self.nbNS = nb_ns - - self.data_lock.release() def leave(self) -> None: """