Removes somr locks that were blocking the threads

This commit is contained in:
Eichhornchen 2021-01-07 17:48:56 +01:00
parent 7e1323dc74
commit eae4f13066
1 changed files with 8 additions and 37 deletions

View File

@ -117,7 +117,7 @@ class User(Peer):
# Lock the refresh function in order to avoid concurrent refresh
self.refresh_lock = RLock()
# Lock functions that can be used by two threads to avoid concurrent refresh
# Lock functions that can be used by two threads to avoid concurrent writing
self.data_lock = RLock()
self.history = []
@ -182,25 +182,18 @@ class User(Peer):
Retrieve the peer that is known by its id. Return None if it is unknown.
The given identifier must be positive.
"""
self.data_lock.acquire()
if peer_id > 0:
for peer in self.neighbours.values():
if peer.id == peer_id:
return peer
self.data_lock.release()
def find_peer_by_nickname(self, nickname: str) -> Generator[Peer, Any, None]:
"""
Retrieve the peers that are known by their nicknames.
"""
self.data_lock.acquire()
for peer in self.neighbours.values():
if peer.nickname == nickname:
yield peer
self.data_lock.release()
def send_packet(self, client: Peer, pkt: Packet) -> int:
"""
@ -426,7 +419,6 @@ class User(Peer):
"""
The user sent a command. We analyse it and process what is needed.
"""
self.data_lock.acquire()
def resolve_address(address: str) -> str:
# Resolve address
@ -483,8 +475,10 @@ class User(Peer):
if (address, port) in self.neighbours:
self.add_system_message("There is already a known client with this address.", ignore_debug=True)
return
self.data_lock.acquire()
peer = self.new_peer(address, port)
self.neighbours[(address, port)] = peer
self.data_lock.release()
self.add_system_message(f"Potential client successfully added! You can send a hello by running "
f"\"/hello {address} {port}\".", ignore_debug=True)
elif command == "hello":
@ -531,7 +525,9 @@ class User(Peer):
return
if not args:
self.data_lock.acquire()
peers = [self]
self.data_lock.release()
elif len(args) == 2:
try:
address, port = resolve_address(args[0]), resolve_port(args[1])
@ -543,8 +539,9 @@ class User(Peer):
self.add_system_message("This client is unknown. Please register it by running "
f"\"/connect {address} {port}\"", ignore_debug=True)
return
self.data_lock.acquire()
peers = [self.find_peer(address, port)]
self.data_lock.release()
else:
peers = list(self.find_peer_by_nickname(args[0]))
if args[0].isnumeric():
@ -604,8 +601,6 @@ class User(Peer):
ignore_debug=True)
else:
self.add_system_message("Unknown command. Please do /help to see available commands.", ignore_debug=True)
self.data_lock.release()
def add_message(self, msg: str) -> None:
"""
@ -621,7 +616,6 @@ class User(Peer):
This add the message in the history if not already done.
Returns True iff the message was not already received previously.
"""
self.data_lock.acquire()
if (sender_id, nonce) not in self.received_messages:
# If it is a new message, add it to recent_messages
@ -638,29 +632,24 @@ class User(Peer):
self.add_message(msg) # for display purposes
self.received_messages[(sender_id, nonce)] = Message(msg, sender_id, nonce)
self.data_lock.release()
return True
def make_inundation_dict(self) -> dict:
"""
Takes the active peers dictionnary and returns a list of [peer, date+random, 0]
"""
self.data_lock.acquire()
res = dict()
peers = self.active_peers
for peer in peers:
if peer.symmetric:
next_send = uniform(1, 2)
res[peer.main_address] = [peer, time.time() + next_send, 0]
self.data_lock.release()
return res
def remove_from_inundation(self, peer: Peer, sender_id: int, nonce: int) -> None:
"""
Remove the sender from the list of neighbours to be inundated
"""
self.data_lock.acquire()
if (sender_id, nonce) in self.recent_messages:
# If a peer is late in its acknowledgement, the absence of the previous if causes an error.
for addr in peer.addresses:
@ -668,17 +657,14 @@ class User(Peer):
if not self.recent_messages[(sender_id, nonce)][2]: # If dictionnary is empty, remove the message
self.recent_messages.pop((sender_id, nonce), None)
self.data_lock.release()
def clean_inundation(self) -> None:
"""
Remove messages which are overdue (older than 2 minutes) from the inundation dictionnary.
"""
self.data_lock.acquire()
for key in self.recent_messages:
if time.time() - self.recent_messages[key][1] > 120:
self.recent_messages.pop(key)
self.data_lock.release()
def main_inundation(self) -> None:
@ -693,7 +679,7 @@ class User(Peer):
# send the packet if it is overdue
self.send_packet(self.recent_messages[key][2][key2][0], self.recent_messages[key][0])
# change the time until the next send
a = self.recent_messages[key][2][key2][2]
self.recent_messages[key][2][key2][2] = a + 1
@ -907,8 +893,6 @@ class User(Peer):
Returns a list of peers the user should contact if it does
not have enough symmetric neighbours.
"""
self.data_lock.acquire()
res = []
val = list(self.potential_peers)
lp = len(val)
@ -916,30 +900,22 @@ class User(Peer):
for i in range(min(lp, max(0, self.minNS - self.nbNS))):
a = randint(0, lp - 1)
res.append(val[a])
self.data_lock.release()
return res
def send_hello(self) -> None:
"""
Sends a long HelloTLV to all active neighbours.
"""
self.data_lock.acquire()
for peer in self.active_peers:
htlv = HelloTLV().construct(16, self, peer)
pkt = Packet().construct(htlv)
self.send_packet(peer, pkt)
self.data_lock.release()
def verify_activity(self) -> None:
"""
All neighbours that have not sent a HelloTLV in the last 2
minutes are considered not active.
"""
self.data_lock.acquire()
val = list(self.active_peers) # create a copy because the dict size will change
for peer in val:
@ -949,8 +925,6 @@ class User(Peer):
self.send_packet(peer, pkt)
peer.active = False
self.update_peer_table(peer)
self.data_lock.release()
def update_peer_table(self, peer: Peer) -> None:
"""
@ -978,7 +952,6 @@ class User(Peer):
Update the number of symmetric neighbours and
send all neighbours NeighbourTLV
"""
self.data_lock.acquire()
nb_ns = 0
# could send the same to all neighbour, but it means that neighbour
@ -995,8 +968,6 @@ class User(Peer):
else:
peer.symmetric = False
self.nbNS = nb_ns
self.data_lock.release()
def leave(self) -> None:
"""