diff --git a/pscheck/models.py b/pscheck/models.py new file mode 100644 index 0000000..6181f5a --- /dev/null +++ b/pscheck/models.py @@ -0,0 +1,248 @@ +import base64 +import cbor2 +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import \ + encode_dss_signature +from dataclasses import dataclass +import datetime +from enum import Enum +import os +from textwrap import wrap +from typing import Optional +import zlib + +from . import base45 + + +@dataclass +class Result: + disease: str # tg, always 840539006 + country: str # co + certificate_issuer: str # is + certificate_identifier: str # ci + + +@dataclass +class Vaccination(Result): + vaccine: str # vp + medical_product: str # mp + manufacturer: str # ma + dose_number: int # dn + total_series_of_doses: int # sd + date_of_vaccination: str # dt + + def check(self) -> bool: + # Toutes les doses ont bien été injectées + valid = self.dose_number >= self.total_series_of_doses + # Vérification de la date + # TODO: Pour Astrazeneca, c'est 28 jours + date = datetime.date.fromisoformat(self.date_of_vaccination) + today = datetime.date.today() + delta = today - date + valid = valid and delta.days >= 7 + + return valid + + @staticmethod + def deserialize(payload) -> "Vaccination": + translation = { + 'tg': 'disease', + 'vp': 'vaccine', + 'mp': 'medical_product', + 'ma': 'manufacturer', + 'dn': 'dose_number', + 'sd': 'total_series_of_doses', + 'dt': 'date_of_vaccination', + 'co': 'country', + 'is': 'certificate_issuer', + 'ci': 'certificate_identifier', + } + return Vaccination(**{translation[k]: v for k, v in payload.items()}) + + +@dataclass +class Test(Result): + class TestResult(Enum): + POSITIVE = '260373001' + NEGATIVE = '260415000' + + type_of_test: str # tt + test_name: str # nm + manufacturer: str # ma + date: str # sc + result_date: str # dr + result: TestResult # tr + center: str # tc + + @staticmethod + def deserialize(payload) -> "Test": + translation = { + 'tg': 'disease', + 'tt': 'type_of_test', + 'nm': 'test_name', + 'ma': 'manufacturer', + 'sc': 'date', + 'dt': 'result_date', + 'tr': 'result', + 'tc': 'center', + 'co': 'country', + 'is': 'certificate_issuer', + 'ci': 'certificate_identifier', + } + return Test(**{translation[k]: v for k, v in payload.items()}) + + def check(self) -> bool: + valid = self.result == TestResult.NEGATIVE + test_date = datetime.datetime.fromisoformat(self.result_date) + tzinfo = test_date.tzinfo + delta = datetime.datetime.now(tzinfo) - test_date + valid = valid and delta.days < 3 # 72h de validité + + return valid + + +@dataclass +class RecoveryStatement(Result): + positive_test_date: str # fr + valid_from: str # df + valid_until: str # du + + @staticmethod + def deserialize(payload) -> "RecoveryStatement": + translation = { + 'tg': 'disease', + 'fr': 'postive_test_date', + 'df': 'valid_from', + 'du': 'valid_until', + 'co': 'country', + 'is': 'certificate_issuer', + 'ci': 'certificate_identifier', + } + return RecoveryStatement(**{translation[k]: v + for k, v in payload.items()}) + + def check(self) -> bool: + valid_from = datetime.datetime.fromisoformat(self.valid_from) + valid_unti = datetime.datetime.fromisoformat(self.valid_until) + tzinfo = valid_from.tzinfo + now = datetime.datetime.now(tzinfo) + return valid_from <= now <= valid_until + + +@dataclass +class GreenCertificate: + header: dict + payload: dict + signature: str + + family_name: str # nam.fn + given_name: str # nam.gn + date_of_birth: str # nam.dob + version: str # ver + vaccination: Optional[Vaccination] = None # v + test: Optional[Test] = None # t + recovery_statement: Optional[RecoveryStatement] = None # r + + @property + def result(self) -> Result: + if self.vaccination is not None: + return self.vaccination + elif self.test is not None: + return self.test + elif self.recovery_statement is not None: + return self.recovery_statement + + @staticmethod + def load(qrcode) -> "GreenCertificate": + if not qrcode.startswith('HC1:'): + raise ValueError("QR code invalide.") + + plain_input = qrcode.replace('HC1:', '') + + # QR Code en base 45 + compressed_cose = base45.b45decode(plain_input) + + # Décompression ZIP si nécessaire + cose = compressed_cose + if compressed_cose[0] == 0x78: + fb = compressed_cose[1] + if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA: + cose = zlib.decompress(compressed_cose) + + # Chargement de l'objet CBOR + obj = cbor2.loads(cose) + header = cbor2.loads(obj.value[0]) + payload = cbor2.loads(obj.value[2]) + signature = obj.value[3] + + return GreenCertificate.deserialize(header, payload, signature) + + @staticmethod + def deserialize(header, payload, signature) -> "GreenCertificate": + # Information utile + p = payload[-260][1] + + certificate = GreenCertificate( + header=header, + payload=payload, + signature=signature, + family_name=p['nam']['fn'], + given_name=p['nam']['gn'], + date_of_birth=p['dob'], + version=p['ver'], + ) + + if 'v' in p: + certificate.vaccination = Vaccination.deserialize(p['v'][0]) + elif 't' in p: + certificate.test = Test.deserialize(p['t'][0]) + elif 'r' in p: + certificate.recovery_statement = \ + RecoveryStatement.deserialize(p['r'][0]) + + return certificate + + def check_signature(self) -> bool: + kid = base64.b64encode(self.header[4]).decode() + cert_name = kid.replace('/', '_') + + base_dir = os.path.dirname(__file__) + cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem") + if not os.path.isfile(cert_path): + print(f"Le certificat {kid} n'a pas été trouvé.") + print("Utilisez l'option --dontcheck pour sauter " + "la vérification.") + print("Si vous disposez du certificat, installez-le dans", + cert_path) + exit(1) + with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f: + cert_asc = f.read() + + cert = x509.load_pem_x509_certificate(cert_asc.encode(), + default_backend()) + + public_key = cert.public_key() + + # Calcul de la bonne signature et des données signées + data = cbor2.dumps( + ["Signature1", cbor2.dumps(self.header), + bytes(), cbor2.dumps(self.payload)] + ) + + l = len(self.signature) + r = int.from_bytes(self.signature[:l // 2], 'big') + s = int.from_bytes(self.signature[l // 2:], 'big') + signature = encode_dss_signature(r, s) + try: + # Vérification de la signature + public_key.verify(signature, data, + ec.ECDSA(cert.signature_hash_algorithm)) + return True + except: + return False + + + def check(self) -> bool: + return self.result.check() diff --git a/pscheck/pscheck.py b/pscheck/pscheck.py index 8c48ecd..bdc618b 100644 --- a/pscheck/pscheck.py +++ b/pscheck/pscheck.py @@ -1,21 +1,10 @@ #!/usr/bin/env python import argparse -import base64 -import cbor2 -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives.asymmetric.utils import \ - encode_dss_signature -import datetime import json -import os import sys -from textwrap import wrap -import zlib -from . import base45 +from .models import GreenCertificate def read_qrcode(file) -> str: @@ -41,128 +30,29 @@ def analyse_qrcode(qrcode: str, additional_info: bool = False, Renvoie la validité du pass sous forme de booléen. """ - if not qrcode.startswith('HC1:'): - raise ValueError("QR code invalide.") - - plain_input = qrcode.replace('HC1:', '') - - # QR Code en base 45 - compressed_cose = base45.b45decode(plain_input) - - # Décompression ZIP si nécessaire - cose = compressed_cose - if compressed_cose[0] == 0x78: - fb = compressed_cose[1] - if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA: - cose = zlib.decompress(compressed_cose) - - # Chargement de l'objet CBOR - obj = cbor2.loads(cose) - header = cbor2.loads(obj.value[0]) - payload = cbor2.loads(obj.value[2]) - signature = obj.value[3] + certificate = GreenCertificate.load(qrcode) if check_signature: # Récupération du certificat utilisé - kid = base64.b64encode(header[4]).decode() - cert_name = kid.replace('/', '_') - - base_dir = os.path.dirname(__file__) - cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem") - if not os.path.isfile(cert_path): - print(f"Le certificat {kid} n'a pas été trouvé.") - print("Utilisez l'option --dontcheck pour sauter " - "la vérification.") - print("Si vous disposez du certificat, installez-le dans", - cert_path) - exit(1) - with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f: - cert_asc = f.read() - - cert = x509.load_pem_x509_certificate(cert_asc.encode(), - default_backend()) - - public_key = cert.public_key() - - # Calcul de la bonne signature et des données signées - data = cbor2.dumps( - ["Signature1", cbor2.dumps(header), - bytes(), cbor2.dumps(payload)] - ) - - l = len(signature) - r = int.from_bytes(signature[:l // 2], 'big') - s = int.from_bytes(signature[l // 2:], 'big') - signature = encode_dss_signature(r, s) - try: - # Vérification de la signature - public_key.verify(signature, data, - ec.ECDSA(cert.signature_hash_algorithm)) - valid = True - except: - valid = False + valid = certificate.check_signature() else: valid = True print("Attention : la signature du QR code n'a pas été vérifiée.") - # Information utile - p = payload[-260][1] - - if 'v' in p: - # Les vaccins sont valides 7 jours après la dernière dose - vaccin = p['v'][0] - # Toutes les doses sont requises - valid = valid and vaccin['dn'] == vaccin['sd'] - # Vérification de la date - date = datetime.date.fromisoformat(vaccin['dt']) - today = datetime.date.today() - delta = today - date - valid = valid and delta.days >= 7 - elif 't' in p: - # Les tests négatifs sont valables moins de 72h - test = p['t'][0] - assert test['tr'] in ['260373001', '260415000'] - - if test['tr'] == 260373001: - # Test positif - valid = False - - test_date = datetime.datetime.fromisoformat(test['sc']) - tzinfo = test_date.tzinfo - delta = datetime.datetime.now(tzinfo) - test_date - valid = valid and delta.days < 3 # 3 jours de validité - elif 'r' in p: - # les tests positifs entre 11 et 182 jours après - test = p['r'][0] - valid_from = datetime.datetime.fromisoformat(test['df']) - valid_until = datetime.datetime.fromisoformat(test['du']) - tzinfo = valid_from.tzinfo - valid = valid and \ - valid_from <= datetime.datetime.now(tzinfo) <= valid_until - else: - print("Type de passe inconnu.") - + valid = valid and certificate.check() if valid: print("Pass sanitaire valide") - print("Nom :", p['nam']['fn']) - print("Prénom :", p['nam']['gn']) - print("Date de naissance :", p['dob']) + print("Nom :", certificate.family_name) + print("Prénom :", certificate.given_name) + print("Date de naissance :", certificate.date_of_birth) if additional_info: # TODO Meilleur affichage - if 'v' in p: - # Vaccination - print("Informations de vaccination :", - json.dumps(p['v'][0], indent=2)) - elif 't' in p: - print("Informations de test :", - json.dumps(p['t'][0], indent=2)) - elif 'r' in p: - print("Informations de test :", - json.dumps(p['r'][0], indent=2)) + print("Informations supplémentaires :", + json.dumps(certificate.result.__dict__, indent=2)) else: print("Pass sanitaire invalide")