Utilisation d'objets

Signed-off-by: Yohann D'ANELLO <ynerant@crans.org>
This commit is contained in:
Yohann D'ANELLO 2021-09-01 15:33:19 +02:00
parent 163cbc27c0
commit 07410eded3
Signed by: ynerant
GPG Key ID: 3A75C55819C8CF85
2 changed files with 257 additions and 119 deletions

248
pscheck/models.py Normal file
View File

@ -0,0 +1,248 @@
import base64
import cbor2
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import \
encode_dss_signature
from dataclasses import dataclass
import datetime
from enum import Enum
import os
from textwrap import wrap
from typing import Optional
import zlib
from . import base45
@dataclass
class Result:
disease: str # tg, always 840539006
country: str # co
certificate_issuer: str # is
certificate_identifier: str # ci
@dataclass
class Vaccination(Result):
vaccine: str # vp
medical_product: str # mp
manufacturer: str # ma
dose_number: int # dn
total_series_of_doses: int # sd
date_of_vaccination: str # dt
def check(self) -> bool:
# Toutes les doses ont bien été injectées
valid = self.dose_number >= self.total_series_of_doses
# Vérification de la date
# TODO: Pour Astrazeneca, c'est 28 jours
date = datetime.date.fromisoformat(self.date_of_vaccination)
today = datetime.date.today()
delta = today - date
valid = valid and delta.days >= 7
return valid
@staticmethod
def deserialize(payload) -> "Vaccination":
translation = {
'tg': 'disease',
'vp': 'vaccine',
'mp': 'medical_product',
'ma': 'manufacturer',
'dn': 'dose_number',
'sd': 'total_series_of_doses',
'dt': 'date_of_vaccination',
'co': 'country',
'is': 'certificate_issuer',
'ci': 'certificate_identifier',
}
return Vaccination(**{translation[k]: v for k, v in payload.items()})
@dataclass
class Test(Result):
class TestResult(Enum):
POSITIVE = '260373001'
NEGATIVE = '260415000'
type_of_test: str # tt
test_name: str # nm
manufacturer: str # ma
date: str # sc
result_date: str # dr
result: TestResult # tr
center: str # tc
@staticmethod
def deserialize(payload) -> "Test":
translation = {
'tg': 'disease',
'tt': 'type_of_test',
'nm': 'test_name',
'ma': 'manufacturer',
'sc': 'date',
'dt': 'result_date',
'tr': 'result',
'tc': 'center',
'co': 'country',
'is': 'certificate_issuer',
'ci': 'certificate_identifier',
}
return Test(**{translation[k]: v for k, v in payload.items()})
def check(self) -> bool:
valid = self.result == TestResult.NEGATIVE
test_date = datetime.datetime.fromisoformat(self.result_date)
tzinfo = test_date.tzinfo
delta = datetime.datetime.now(tzinfo) - test_date
valid = valid and delta.days < 3 # 72h de validité
return valid
@dataclass
class RecoveryStatement(Result):
positive_test_date: str # fr
valid_from: str # df
valid_until: str # du
@staticmethod
def deserialize(payload) -> "RecoveryStatement":
translation = {
'tg': 'disease',
'fr': 'postive_test_date',
'df': 'valid_from',
'du': 'valid_until',
'co': 'country',
'is': 'certificate_issuer',
'ci': 'certificate_identifier',
}
return RecoveryStatement(**{translation[k]: v
for k, v in payload.items()})
def check(self) -> bool:
valid_from = datetime.datetime.fromisoformat(self.valid_from)
valid_unti = datetime.datetime.fromisoformat(self.valid_until)
tzinfo = valid_from.tzinfo
now = datetime.datetime.now(tzinfo)
return valid_from <= now <= valid_until
@dataclass
class GreenCertificate:
header: dict
payload: dict
signature: str
family_name: str # nam.fn
given_name: str # nam.gn
date_of_birth: str # nam.dob
version: str # ver
vaccination: Optional[Vaccination] = None # v
test: Optional[Test] = None # t
recovery_statement: Optional[RecoveryStatement] = None # r
@property
def result(self) -> Result:
if self.vaccination is not None:
return self.vaccination
elif self.test is not None:
return self.test
elif self.recovery_statement is not None:
return self.recovery_statement
@staticmethod
def load(qrcode) -> "GreenCertificate":
if not qrcode.startswith('HC1:'):
raise ValueError("QR code invalide.")
plain_input = qrcode.replace('HC1:', '')
# QR Code en base 45
compressed_cose = base45.b45decode(plain_input)
# Décompression ZIP si nécessaire
cose = compressed_cose
if compressed_cose[0] == 0x78:
fb = compressed_cose[1]
if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA:
cose = zlib.decompress(compressed_cose)
# Chargement de l'objet CBOR
obj = cbor2.loads(cose)
header = cbor2.loads(obj.value[0])
payload = cbor2.loads(obj.value[2])
signature = obj.value[3]
return GreenCertificate.deserialize(header, payload, signature)
@staticmethod
def deserialize(header, payload, signature) -> "GreenCertificate":
# Information utile
p = payload[-260][1]
certificate = GreenCertificate(
header=header,
payload=payload,
signature=signature,
family_name=p['nam']['fn'],
given_name=p['nam']['gn'],
date_of_birth=p['dob'],
version=p['ver'],
)
if 'v' in p:
certificate.vaccination = Vaccination.deserialize(p['v'][0])
elif 't' in p:
certificate.test = Test.deserialize(p['t'][0])
elif 'r' in p:
certificate.recovery_statement = \
RecoveryStatement.deserialize(p['r'][0])
return certificate
def check_signature(self) -> bool:
kid = base64.b64encode(self.header[4]).decode()
cert_name = kid.replace('/', '_')
base_dir = os.path.dirname(__file__)
cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem")
if not os.path.isfile(cert_path):
print(f"Le certificat {kid} n'a pas été trouvé.")
print("Utilisez l'option --dontcheck pour sauter "
"la vérification.")
print("Si vous disposez du certificat, installez-le dans",
cert_path)
exit(1)
with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f:
cert_asc = f.read()
cert = x509.load_pem_x509_certificate(cert_asc.encode(),
default_backend())
public_key = cert.public_key()
# Calcul de la bonne signature et des données signées
data = cbor2.dumps(
["Signature1", cbor2.dumps(self.header),
bytes(), cbor2.dumps(self.payload)]
)
l = len(self.signature)
r = int.from_bytes(self.signature[:l // 2], 'big')
s = int.from_bytes(self.signature[l // 2:], 'big')
signature = encode_dss_signature(r, s)
try:
# Vérification de la signature
public_key.verify(signature, data,
ec.ECDSA(cert.signature_hash_algorithm))
return True
except:
return False
def check(self) -> bool:
return self.result.check()

View File

@ -1,21 +1,10 @@
#!/usr/bin/env python
import argparse
import base64
import cbor2
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import \
encode_dss_signature
import datetime
import json
import os
import sys
from textwrap import wrap
import zlib
from . import base45
from .models import GreenCertificate
def read_qrcode(file) -> str:
@ -41,128 +30,29 @@ def analyse_qrcode(qrcode: str, additional_info: bool = False,
Renvoie la validité du pass sous forme de booléen.
"""
if not qrcode.startswith('HC1:'):
raise ValueError("QR code invalide.")
plain_input = qrcode.replace('HC1:', '')
# QR Code en base 45
compressed_cose = base45.b45decode(plain_input)
# Décompression ZIP si nécessaire
cose = compressed_cose
if compressed_cose[0] == 0x78:
fb = compressed_cose[1]
if fb == 0x01 or fb == 0x5E or fb == 0x9C or fb == 0xDA:
cose = zlib.decompress(compressed_cose)
# Chargement de l'objet CBOR
obj = cbor2.loads(cose)
header = cbor2.loads(obj.value[0])
payload = cbor2.loads(obj.value[2])
signature = obj.value[3]
certificate = GreenCertificate.load(qrcode)
if check_signature:
# Récupération du certificat utilisé
kid = base64.b64encode(header[4]).decode()
cert_name = kid.replace('/', '_')
base_dir = os.path.dirname(__file__)
cert_path = os.path.join(base_dir, 'certs', f"{cert_name}.pem")
if not os.path.isfile(cert_path):
print(f"Le certificat {kid} n'a pas été trouvé.")
print("Utilisez l'option --dontcheck pour sauter "
"la vérification.")
print("Si vous disposez du certificat, installez-le dans",
cert_path)
exit(1)
with open(os.path.join(base_dir, 'certs', f"{cert_name}.pem")) as f:
cert_asc = f.read()
cert = x509.load_pem_x509_certificate(cert_asc.encode(),
default_backend())
public_key = cert.public_key()
# Calcul de la bonne signature et des données signées
data = cbor2.dumps(
["Signature1", cbor2.dumps(header),
bytes(), cbor2.dumps(payload)]
)
l = len(signature)
r = int.from_bytes(signature[:l // 2], 'big')
s = int.from_bytes(signature[l // 2:], 'big')
signature = encode_dss_signature(r, s)
try:
# Vérification de la signature
public_key.verify(signature, data,
ec.ECDSA(cert.signature_hash_algorithm))
valid = True
except:
valid = False
valid = certificate.check_signature()
else:
valid = True
print("Attention : la signature du QR code n'a pas été vérifiée.")
# Information utile
p = payload[-260][1]
if 'v' in p:
# Les vaccins sont valides 7 jours après la dernière dose
vaccin = p['v'][0]
# Toutes les doses sont requises
valid = valid and vaccin['dn'] == vaccin['sd']
# Vérification de la date
date = datetime.date.fromisoformat(vaccin['dt'])
today = datetime.date.today()
delta = today - date
valid = valid and delta.days >= 7
elif 't' in p:
# Les tests négatifs sont valables moins de 72h
test = p['t'][0]
assert test['tr'] in ['260373001', '260415000']
if test['tr'] == 260373001:
# Test positif
valid = False
test_date = datetime.datetime.fromisoformat(test['sc'])
tzinfo = test_date.tzinfo
delta = datetime.datetime.now(tzinfo) - test_date
valid = valid and delta.days < 3 # 3 jours de validité
elif 'r' in p:
# les tests positifs entre 11 et 182 jours après
test = p['r'][0]
valid_from = datetime.datetime.fromisoformat(test['df'])
valid_until = datetime.datetime.fromisoformat(test['du'])
tzinfo = valid_from.tzinfo
valid = valid and \
valid_from <= datetime.datetime.now(tzinfo) <= valid_until
else:
print("Type de passe inconnu.")
valid = valid and certificate.check()
if valid:
print("Pass sanitaire valide")
print("Nom :", p['nam']['fn'])
print("Prénom :", p['nam']['gn'])
print("Date de naissance :", p['dob'])
print("Nom :", certificate.family_name)
print("Prénom :", certificate.given_name)
print("Date de naissance :", certificate.date_of_birth)
if additional_info:
# TODO Meilleur affichage
if 'v' in p:
# Vaccination
print("Informations de vaccination :",
json.dumps(p['v'][0], indent=2))
elif 't' in p:
print("Informations de test :",
json.dumps(p['t'][0], indent=2))
elif 'r' in p:
print("Informations de test :",
json.dumps(p['r'][0], indent=2))
print("Informations supplémentaires :",
json.dumps(certificate.result.__dict__, indent=2))
else:
print("Pass sanitaire invalide")