import base64 import cbor2 from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.utils import \ encode_dss_signature from dataclasses import dataclass import datetime from enum import Enum import os from textwrap import wrap from typing import Optional import zlib from . import base45 @dataclass class Result: disease: str # tg, always 840539006 country: str # co certificate_issuer: str # is certificate_identifier: str # ci @dataclass class Vaccination(Result): dose_number: int # dn total_series_of_doses: int # sd date_of_vaccination: str # dt vaccine: Optional[str] = None # vp medical_product: Optional[str] = None # mp manufacturer: Optional[str] = None # ma def check(self) -> bool: # Toutes les doses ont bien été injectées valid = self.dose_number >= self.total_series_of_doses # Vérification de la date # TODO: Pour Astrazeneca, c'est 28 jours date = datetime.date.fromisoformat(self.date_of_vaccination) today = datetime.date.today() delta = today - date valid = valid and delta.days >= 7 return valid @staticmethod def deserialize(payload) -> "Vaccination": translation = { 'tg': 'disease', 'vp': 'vaccine', 'mp': 'medical_product', 'ma': 'manufacturer', 'dn': 'dose_number', 'sd': 'total_series_of_doses', 'dt': 'date_of_vaccination', 'co': 'country', 'is': 'certificate_issuer', 'ci': 'certificate_identifier', } return Vaccination(**{translation[k]: v for k, v in payload.items()}) @dataclass class Test(Result): class TestResult(Enum): POSITIVE = '260373001' NEGATIVE = '260415000' date: str # sc result: TestResult # tr type_of_test: Optional[str] = None # tt test_name: Optional[str] = None # nm manufacturer: Optional[str] = None # ma result_date: Optional[str] = None # dr center: Optional[str] = None # tc @staticmethod def deserialize(payload) -> "Test": translation = { 'tg': 'disease', 'tt': 'type_of_test', 'nm': 'test_name', 'ma': 'manufacturer', 'sc': 'date', 'dt': 'result_date', 'tr': 'result', 'tc': 'center', 'co': 'country', 'is': 'certificate_issuer', 'ci': 'certificate_identifier', } return Test(**{translation[k]: v for k, v in payload.items()}) def check(self) -> bool: valid = self.result == Test.TestResult.NEGATIVE test_date = datetime.datetime.fromisoformat(self.date) tzinfo = test_date.tzinfo delta = datetime.datetime.now(tzinfo) - test_date valid = valid and delta.days < 3 # 72h de validité return valid @dataclass class RecoveryStatement(Result): positive_test_date: str # fr valid_from: str # df valid_until: str # du @staticmethod def deserialize(payload) -> "RecoveryStatement": translation = { 'tg': 'disease', 'fr': 'postive_test_date', 'df': 'valid_from', 'du': 'valid_until', 'co': 'country', 'is': 'certificate_issuer', 'ci': 'certificate_identifier', } return RecoveryStatement(**{translation[k]: v for k, v in payload.items()}) def check(self) -> bool: valid_from = datetime.datetime.fromisoformat(self.valid_from) valid_unti = datetime.datetime.fromisoformat(self.valid_until) tzinfo = valid_from.tzinfo now = datetime.datetime.now(tzinfo) return valid_from <= now <= valid_until @dataclass class GreenCertificate: header: dict payload: dict signature: str family_name: str # nam.fn given_name: str # nam.gn date_of_birth: str # nam.dob version: str # ver vaccination: Optional[Vaccination] = None # v test: Optional[Test] = None # t recovery_statement: Optional[RecoveryStatement] = None # r @property def result(self) -> Result: if self.vaccination is not None: return self.vaccination elif self.test is not None: return self.test elif self.recovery_statement is not None: return self.recovery_statement @staticmethod def load(qrcode) -> "GreenCertificate": if not qrcode.startswith('HC1:'): raise ValueError("QR code invalide.") plain_input = qrcode.replace('HC1:', '') # QR Code en base 45 compressed_cose = base45.b45decode(plain_input) # Décompression ZIP si nécessaire cose = compressed_cose if compressed_cose[0] == 0x78: fb = compressed_cose[1] if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA: cose = zlib.decompress(compressed_cose) # Chargement de l'objet CBOR obj = cbor2.loads(cose) header = cbor2.loads(obj.value[0]) payload = cbor2.loads(obj.value[2]) signature = obj.value[3] return GreenCertificate.deserialize(header, payload, signature) @staticmethod def deserialize(header, payload, signature) -> "GreenCertificate": # Information utile p = payload[-260][1] certificate = GreenCertificate( header=header, payload=payload, signature=signature, family_name=p['nam']['fn'], given_name=p['nam']['gn'], date_of_birth=p['dob'], version=p['ver'], ) if 'v' in p: certificate.vaccination = Vaccination.deserialize(p['v'][0]) elif 't' in p: certificate.test = Test.deserialize(p['t'][0]) elif 'r' in p: certificate.recovery_statement = \ RecoveryStatement.deserialize(p['r'][0]) return certificate def check_signature(self) -> bool: kid = base64.b64encode(self.header[4]).decode() cert_name = kid.replace('/', '_') base_dir = os.path.dirname(__file__) cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem") if not os.path.isfile(cert_path): print(f"Le certificat {kid} n'a pas été trouvé.") print("Utilisez l'option --dontcheck pour sauter " "la vérification.") print("Si vous disposez du certificat, installez-le dans", cert_path) exit(1) with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f: cert_asc = f.read() cert = x509.load_pem_x509_certificate(cert_asc.encode(), default_backend()) public_key = cert.public_key() # Calcul de la bonne signature et des données signées data = cbor2.dumps( ["Signature1", cbor2.dumps(self.header), bytes(), cbor2.dumps(self.payload)] ) l = len(self.signature) r = int.from_bytes(self.signature[:l // 2], 'big') s = int.from_bytes(self.signature[l // 2:], 'big') signature = encode_dss_signature(r, s) try: # Vérification de la signature public_key.verify(signature, data, ec.ECDSA(cert.signature_hash_algorithm)) return True except: return False def check(self) -> bool: return self.result.check()