175 lines
5.9 KiB
Python
175 lines
5.9 KiB
Python
# Copyright (C) 2020 by eichhornchen, ÿnérant
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import socket
|
|
from argparse import ArgumentParser
|
|
from typing import Any, Tuple
|
|
|
|
|
|
class Squinnondation:
|
|
args: Any
|
|
bind_address: str
|
|
bind_port: int
|
|
client_address: str
|
|
client_port: int
|
|
|
|
def parse_arguments(self) -> None:
|
|
parser = ArgumentParser(description="MIRC client.")
|
|
parser.add_argument('bind_address', type=str, default="localhost",
|
|
help="Address of the client.")
|
|
parser.add_argument('bind_port', type=int, default=2500,
|
|
help="Port of the client. Must be between 1024 and 65535.")
|
|
parser.add_argument('--client_address', type=str, default="localhost",
|
|
help="Address of the first neighbour.")
|
|
parser.add_argument('--client_port', type=int, default=2500,
|
|
help="Port of the first neighbour. Must be between 1024 and 65535.")
|
|
parser.add_argument('--bind-only', '-b', action='store_true',
|
|
help="Don't connect to another client, only listen to connections.")
|
|
self.args = parser.parse_args()
|
|
|
|
if not (1024 <= self.args.bind_port <= 65535) and (1024 <= self.args.client_port <= 65535):
|
|
raise ValueError("Ports must be between 1024 and 65535.")
|
|
|
|
self.bind_address = self.args.bind_address
|
|
self.bind_port = self.args.bind_port
|
|
self.client_address = self.args.client_address
|
|
self.client_port = self.args.client_port
|
|
|
|
@staticmethod
|
|
def main() -> None: # pragma: no cover
|
|
instance = Squinnondation()
|
|
instance.parse_arguments()
|
|
|
|
squirrel = Squirrel(input("Enter your nickname: "), instance.bind_address, instance.bind_port)
|
|
|
|
if not instance.args.bind_only:
|
|
hazelnut = Hazelnut(address=instance.client_address, port=instance.client_port)
|
|
pkt = Packet()
|
|
pkt.magic = 95
|
|
pkt.version = 0
|
|
pkt.body = TLV()
|
|
msg = f"Hello world, my name is {squirrel.nickname}!"
|
|
pkt.body.raw_data = msg.encode("UTF-8")
|
|
pkt.body_length = len(pkt.body.raw_data)
|
|
squirrel.send_packet(hazelnut, pkt)
|
|
|
|
while True:
|
|
pkt, addr = squirrel.receive_packet()
|
|
print(f"received message: {pkt.body.raw_data.decode('UTF-8')}")
|
|
|
|
|
|
class TLV:
|
|
"""
|
|
The Tag-Length-Value contains the different type of data that can be sent.
|
|
TODO: add subclasses for each type of TLV
|
|
"""
|
|
type: int
|
|
raw_data: bytes
|
|
|
|
def validate_data(self) -> bool:
|
|
"""
|
|
Ensure that the TLV is well-formed.
|
|
Raises a ValueError if it is not the case.
|
|
TODO: Make some tests
|
|
"""
|
|
return True
|
|
|
|
|
|
class Packet:
|
|
"""
|
|
A Packet is a wrapper around the
|
|
"""
|
|
magic: int
|
|
version: int
|
|
body_length: int
|
|
body: TLV
|
|
|
|
def validate_data(self) -> bool:
|
|
"""
|
|
Ensure that the packet is well-formed.
|
|
Raises a ValueError if the packet contains bad data.
|
|
"""
|
|
if self.magic != 95:
|
|
raise ValueError("The magic code of the packet must be 95, found: {:d}".format(self.magic))
|
|
if self.version != 0:
|
|
raise ValueError("The version of the packet is not supported: {:d}".format(self.version))
|
|
if not (0 <= self.body_length <= 120):
|
|
raise ValueError("The body length of the packet is negative or too high. It must be between 0 and 1020,"
|
|
"found: {:d}".format(self.body_length))
|
|
return self.body.validate_data()
|
|
|
|
@staticmethod
|
|
def unmarshal(data: bytes) -> "Packet":
|
|
"""
|
|
Read raw data and build the packet wrapper.
|
|
Raises a ValueError whenever the data is invalid.
|
|
"""
|
|
pkt = Packet()
|
|
pkt.magic = data[0]
|
|
pkt.version = data[1]
|
|
pkt.body_length = int.from_bytes(data[2:4], byteorder="big")
|
|
pkt.body = TLV()
|
|
pkt.body.raw_data = data[4:4+pkt.body_length]
|
|
|
|
pkt.validate_data()
|
|
|
|
return pkt
|
|
|
|
def marshal(self) -> bytes:
|
|
"""
|
|
Compute the byte array data associated to the packet.
|
|
"""
|
|
data = self.magic.to_bytes(1, "big")
|
|
data += self.version.to_bytes(1, "big")
|
|
data += self.body_length.to_bytes(2, "big")
|
|
data += self.body.raw_data
|
|
return data
|
|
|
|
|
|
class Hazelnut:
|
|
"""
|
|
A hazelnut is a connected client, with its socket.
|
|
"""
|
|
def __init__(self, nickname: str = "anonymous", address: str = "localhost", port: int = 2500):
|
|
self.nickname = nickname
|
|
self.address = address
|
|
self.port = port
|
|
|
|
|
|
class Squirrel(Hazelnut):
|
|
"""
|
|
The squirrel is the user of the program. It can speak with other clients, that are called hazelnuts.
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.socket.bind((self.address, self.port))
|
|
print(f"Listening on {self.address}:{self.port}")
|
|
|
|
def send_packet(self, client: Hazelnut, pkt: Packet) -> int:
|
|
"""
|
|
Send a formatted packet to a client.
|
|
"""
|
|
return self.send_raw_data(client, pkt.marshal())
|
|
|
|
def send_raw_data(self, client: Hazelnut, data: bytes) -> int:
|
|
"""
|
|
Send a raw packet to a client.
|
|
"""
|
|
return self.socket.sendto(data, (client.address, client.port))
|
|
|
|
def receive_packet(self) -> Tuple[Packet, Any]:
|
|
"""
|
|
Receive a packet from the socket and translate it into a Python object.
|
|
TODO: Translate the address into the correct hazelnut.
|
|
"""
|
|
data, addr = self.receive_raw_data()
|
|
return Packet.unmarshal(data), addr
|
|
|
|
def receive_raw_data(self) -> Tuple[bytes, Any]:
|
|
"""
|
|
Receive a packet from the socket.
|
|
TODO: Translate the address into the correct hazelnut.
|
|
"""
|
|
return self.socket.recvfrom(1024)
|