From bf0f9a723a3fb9b7a101adf2e4cfacd6b5648bff Mon Sep 17 00:00:00 2001 From: peppelinux Date: Thu, 11 Jun 2026 14:32:00 +0200 Subject: [PATCH] feat: X509 in MSO signer and further coeherence checks --- README.md | 63 ++++++++++++ docs/CERTIFICATE-CHAIN-VERIFICATION.md | 34 ++++++ docs/MSO.md | 15 ++- pymdoccbor/__init__.py | 2 +- pymdoccbor/mdoc/issuer.py | 7 +- pymdoccbor/mso/issuer.py | 65 +++++------- pymdoccbor/tests/test_06_mso_issuer.py | 137 +++++++++++++++++++++++++ pymdoccbor/tools.py | 12 +-- pymdoccbor/x509.py | 62 +++++++++++ 9 files changed, 342 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 771fec7..d23c65e 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,69 @@ assert mdoci.dumps() # >> mdoci.dumps() returns mdoc bytes ```` +### Issue an MDOC CBOR with X.509 certificate chain + +To embed a full `x5chain` (COSE header label 33) in the MSO unprotected header, +pass `x509_chain` to `MdocCborIssuer.new()` or `MsoIssuer`. The Document Signer +(DS) certificate must be first; intermediate certificates follow. The trusted +root (IACA) is usually omitted. + +Each element of `x509_chain` is an `X509ChainSource` with one of these types: + +| Type | Meaning | Example | +|------|---------|---------| +| `str` | **File path** to a PEM or DER certificate (not PEM text inline) | `"certs/ds.pem"` | +| `bytes` | PEM or DER content; PEM bundles with multiple certificates are expanded in order | `open("chain.pem", "rb").read()` | +| `cryptography.x509.Certificate` | Certificate object already loaded in memory | `ds_cert` | + +A `str` value is always read as a filesystem path. To pass PEM text, encode it as +`bytes` (for example `pem_text.encode("utf-8")`). + +````python +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from pymdoccbor.mdoc.issuer import MdocCborIssuer + +with open("certs/ds.pem", "rb") as f: + ds_cert = x509.load_pem_x509_certificate(f.read(), default_backend()) + +mdoci = MdocCborIssuer( + private_key=PKEY, + alg="ES256", +) + +mdoc = mdoci.new( + doctype="eu.europa.ec.eudiw.pid.1", + data=PID_DATA, + devicekeyinfo=PKEY, + validity={"issuance_date": "2025-01-17", "expiry_date": "2030-01-17"}, + x509_chain=[ + ds_cert, # Certificate object + "certs/intermediate.pem", # file path + open("certs/backup-intermediate.der", "rb").read(), # DER/PEM bytes + ], +) +```` + +For a single DS certificate, `cert_path` remains supported and is equivalent to +`x509_chain` with one entry. The two parameters are mutually exclusive. + +When issuing an MSO directly with `MsoIssuer`, use the same `x509_chain` parameter: + +````python +from pymdoccbor.mso.issuer import MsoIssuer + +msoi = MsoIssuer( + data=PID_DATA, + private_key=PKEY, + alg="ES256", + validity={"issuance_date": "2025-01-17", "expiry_date": "2030-01-17"}, + x509_chain=[ds_cert, intermediate_cert], +) + +mso = msoi.sign(doctype="eu.europa.ec.eudiw.pid.1", device_key=DEVICE_KEY) +```` + ### Issue an MSO alone MsoIssuer is a class that handles private keys, data processing, digests and signature operations. diff --git a/docs/CERTIFICATE-CHAIN-VERIFICATION.md b/docs/CERTIFICATE-CHAIN-VERIFICATION.md index 83d6046..2fd79a8 100644 --- a/docs/CERTIFICATE-CHAIN-VERIFICATION.md +++ b/docs/CERTIFICATE-CHAIN-VERIFICATION.md @@ -197,6 +197,40 @@ cert = x509.load_pem_x509_certificate(pem_data.encode(), default_backend()) The DS certificate is automatically extracted from the mDOC's Mobile Security Object (MSO). It is embedded in the COSE_Sign1 structure's unprotected header (label 33). +### Issuing an mDOC with an X.509 chain + +Since version 1.3.0, `MsoIssuer` and `MdocCborIssuer.new()` accept an `x509_chain` +parameter to populate the COSE `x5chain` header (label 33) during MSO issuance. + +```python +from pymdoccbor.mso.issuer import MsoIssuer + +msoi = MsoIssuer( + data=data, + private_key=ds_private_key, + alg="ES256", + validity={"issuance_date": "2025-01-17", "expiry_date": "2030-01-17"}, + x509_chain=[ + "certs/ds.pem", # Document Signer (first) + "certs/intermediate.pem", # optional intermediate(s) + ], +) + +mso = msoi.sign(doctype="org.iso.18013.5.1.mDL", device_key=device_key) +``` + +Each chain entry may be: + +- a file path (PEM or DER; PEM bundles with multiple certificates are expanded in order), +- raw PEM/DER bytes, or +- a `cryptography.x509.Certificate` object. + +With a single certificate, the header value is encoded as DER `bytes`. With two or +more certificates, it is encoded as a `list` of DER `bytes`, with the DS certificate +first. The trusted root (IACA) is typically not included in the chain. + +`cert_path` (single certificate) and `x509_chain` are mutually exclusive. + ## Certificate Chain Structure ``` diff --git a/docs/MSO.md b/docs/MSO.md index 4d0463d..95eee13 100644 --- a/docs/MSO.md +++ b/docs/MSO.md @@ -51,8 +51,19 @@ protected header. Other elements should not be present in the protected header. The DS certificate shall be included as a ‘x5chain’ element as described -in “draft-ietf-cose-x509-04”. It shall be included as an -unprotected header element. +in RFC 9360. It shall be included as an unprotected header element (COSE label 33). + +At issuance time, pass `x509_chain` to `MsoIssuer` or `MdocCborIssuer.new()`: + +```` +msoi = MsoIssuer( + data=data, + private_key=ds_private_key, + alg="ES256", + validity={"issuance_date": "2025-01-17", "expiry_date": "2030-01-17"}, + x509_chain=["ds.pem", "intermediate.pem"], +) +```` The input for the digest function is the binary data of the IssuerSignedItem. diff --git a/pymdoccbor/__init__.py b/pymdoccbor/__init__.py index c68196d..67bc602 100644 --- a/pymdoccbor/__init__.py +++ b/pymdoccbor/__init__.py @@ -1 +1 @@ -__version__ = "1.2.0" +__version__ = "1.3.0" diff --git a/pymdoccbor/mdoc/issuer.py b/pymdoccbor/mdoc/issuer.py index 807ef6b..c36641c 100644 --- a/pymdoccbor/mdoc/issuer.py +++ b/pymdoccbor/mdoc/issuer.py @@ -15,6 +15,7 @@ from pymdoccbor.mdoc.exceptions import InvalidStatusDescriptor from pymdoccbor.mso.issuer import MsoIssuer +from pymdoccbor.x509 import X509ChainSource logger = logging.getLogger("pymdoccbor") @@ -80,6 +81,7 @@ def new( validity: dict | None = None, devicekeyinfo: dict | CoseKey | str | None = None, cert_path: str | None = None, + x509_chain: list[X509ChainSource] | None = None, revocation: dict | None = None, status: dict | None = None ) -> dict: @@ -90,7 +92,8 @@ def new( :param doctype: str: document type :param validity: dict: validity info :param devicekeyinfo: Union[dict, CoseKey, str]: device key info - :param cert_path: str: path to the certificate + :param cert_path: str: path to a single certificate (PEM or DER) + :param x509_chain: list: X.509 chain for the MSO x5chain header (label 33) :param revocation: dict: revocation status dict it may include status_list and identifier_list keys :param status: dict: status dict with uri and idx per draft-ietf-oauth-status-list :return: dict: signed mdoc @@ -180,6 +183,7 @@ def new( msoi = MsoIssuer( data=data, cert_path=cert_path, + x509_chain=x509_chain, hsm=self.hsm, key_label=self.key_label, user_pin=self.user_pin, @@ -198,6 +202,7 @@ def new( private_key=self.private_key, alg=self.alg, cert_path=cert_path, + x509_chain=x509_chain, validity=validity, revocation=revocation, cert_info=self.cert_info diff --git a/pymdoccbor/mso/issuer.py b/pymdoccbor/mso/issuer.py index ae6fb8a..62037ac 100644 --- a/pymdoccbor/mso/issuer.py +++ b/pymdoccbor/mso/issuer.py @@ -4,13 +4,9 @@ import logging import secrets import uuid -from typing import Union import cbor2 from cbor_diag import cbor2diag -from cryptography import x509 -from cryptography.hazmat.primitives import serialization -from cryptography.x509 import Certificate from pycose.headers import Algorithm from pycose.keys import CoseKey from pycose.messages import Sign1Message @@ -18,7 +14,7 @@ from pymdoccbor import settings from pymdoccbor.exceptions import MsoPrivateKeyRequired from pymdoccbor.tools import shuffle_dict -from pymdoccbor.x509 import selfsigned_x509cert +from pymdoccbor.x509 import X509ChainSource, encode_x5chain, selfsigned_x509cert logger = logging.getLogger("pymdoccbor") @@ -33,6 +29,7 @@ def __init__( data: dict, validity: dict, cert_path: str | None = None, + x509_chain: list[X509ChainSource] | None = None, key_label: str | None = None, user_pin: str | None = None, lib_path: str | None = None, @@ -50,7 +47,11 @@ def __init__( :param data: dict: the data to sign :param validity: validity: the validity info of the mso - :param cert_path: str: the path to the certificate + :param cert_path: str: the path to a single certificate (PEM or DER) + :param x509_chain: list: X.509 chain for COSE header 33 (x5chain); each + item may be a file path, PEM/DER bytes, or a Certificate object. + The Document Signer certificate must be first. Mutually exclusive + with cert_path. :param key_label: str: key label :param user_pin: str: user pin :param lib_path: str: path to the library cryptographic library @@ -97,11 +98,18 @@ def __init__( self.revocation = revocation self.cert_path = cert_path + self.x509_chain = x509_chain self.cert_info = cert_info - if not self.cert_path and (not self.cert_info or not self.private_key): + if self.cert_path and self.x509_chain: + raise ValueError("cert_path and x509_chain are mutually exclusive") + + if not self.cert_path and not self.x509_chain and ( + not self.cert_info or not self.private_key + ): raise ValueError( - "cert_path or cert_info with a private key must be provided to properly insert a certificate" + "cert_path, x509_chain, or cert_info with a private key must be " + "provided to properly insert a certificate" ) alg_map = {"ES256": "sha256", "ES384": "sha384", "ES512": "sha512"} @@ -216,35 +224,15 @@ def sign( if self.revocation is not None: payload.update({"status": self.revocation}) - if self.cert_path: - # Try to load the certificate file - with open(self.cert_path, "rb") as file: - certificate = file.read() - _parsed_cert: Union[Certificate, None] = None - try: - _parsed_cert = x509.load_pem_x509_certificate(certificate) - except Exception: - logger.error( - f"Certificate at {self.cert_path} could not be loaded as PEM, trying DER" - ) - - if not _parsed_cert: - try: - _parsed_cert = x509.load_der_x509_certificate(certificate) - except Exception: - _err_msg = ( - f"Certificate at {self.cert_path} could not be loaded as DER" - ) - logger.error(_err_msg) - - if _parsed_cert: - cert = _parsed_cert - else: - raise Exception(f"Certificate at {self.cert_path} failed parse") - _cert = cert.public_bytes(getattr(serialization.Encoding, "DER")) + if self.x509_chain: + _cert = encode_x5chain(self.x509_chain) + elif self.cert_path: + _cert = encode_x5chain([self.cert_path]) else: if not self.cert_info: - raise ValueError("cert_info must be provided if cert_path is not set") + raise ValueError( + "cert_info must be provided if cert_path and x509_chain are not set" + ) logger.warning( "A self-signed certificate will be created using the provided " @@ -261,9 +249,6 @@ def sign( Algorithm: self.alg, # 33: _cert }, - # TODO: x509 (cbor2.CBORTag(33)) and federation trust_chain support (cbor2.CBORTag(27?)) here - # 33 means x509chain standing to rfc9360 - # in both protected and unprotected for interop purpose .. for now. uhdr={33: _cert}, payload=cbor2.dumps( cbor2.CBORTag(24, cbor2.dumps(payload, canonical=True)), @@ -281,11 +266,7 @@ def sign( phdr={ Algorithm: self.private_key.alg, # KID: self.private_key.kid, - # 33: _cert }, - # TODO: x509 (cbor2.CBORTag(33)) and federation trust_chain support (cbor2.CBORTag(27?)) here - # 33 means x509chain standing to rfc9360 - # in both protected and unprotected for interop purpose .. for now. uhdr={33: _cert}, payload=cbor2.dumps( cbor2.CBORTag(24, cbor2.dumps(payload, canonical=True)), diff --git a/pymdoccbor/tests/test_06_mso_issuer.py b/pymdoccbor/tests/test_06_mso_issuer.py index 66066d1..79de3d6 100644 --- a/pymdoccbor/tests/test_06_mso_issuer.py +++ b/pymdoccbor/tests/test_06_mso_issuer.py @@ -1,11 +1,73 @@ +from datetime import datetime, timedelta, timezone + +import pytest +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.x509.oid import NameOID +from pycose.headers import X5chain from pycose.messages import CoseMessage +from pymdoccbor.mdoc.issuer import MdocCborIssuer from pymdoccbor.mso.issuer import MsoIssuer +from pymdoccbor.mso.verifier import MsoVerifier from pymdoccbor.tests.cert_data import CERT_DATA from pymdoccbor.tests.micov_data import MICOV_DATA from pymdoccbor.tests.pkey import PKEY +def _generate_test_certificates(): + root_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) + root_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test Root CA"), + x509.NameAttribute(NameOID.COMMON_NAME, "Test Root CA"), + ]) + root_cert = ( + x509.CertificateBuilder() + .subject_name(root_subject) + .issuer_name(root_subject) + .public_key(root_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + .sign(root_key, hashes.SHA256(), default_backend()) + ) + + ds_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) + ds_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Test DS"), + x509.NameAttribute(NameOID.COMMON_NAME, "Test Document Signer"), + ]) + ds_cert = ( + x509.CertificateBuilder() + .subject_name(ds_subject) + .issuer_name(root_subject) + .public_key(ds_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(timezone.utc)) + .not_valid_after(datetime.now(timezone.utc) + timedelta(days=365)) + .sign(root_key, hashes.SHA256(), default_backend()) + ) + + return root_cert, ds_cert + + +def _validity(): + return {"issuance_date": "2024-12-31", "expiry_date": "2050-12-31"} + + +def _issuer_auth_x5chain(issuer_auth): + msov = MsoVerifier(issuer_auth) + return msov.raw_public_keys + + def test_mso_issuer_fail(): try: MsoIssuer(None, None) @@ -46,3 +108,78 @@ def test_mso_issuer_sign(): mso = msoi.sign() assert isinstance(mso, CoseMessage) + + +def test_mso_issuer_x509_chain_single_certificate(): + _, ds_cert = _generate_test_certificates() + + msoi = MsoIssuer( + data=MICOV_DATA, + private_key=PKEY, + validity=_validity(), + alg="ES256", + x509_chain=[ds_cert], + ) + + mso = msoi.sign() + x5chain = mso.uhdr[X5chain] + + assert isinstance(x5chain, bytes) + assert x5chain == ds_cert.public_bytes(serialization.Encoding.DER) + + +def test_mso_issuer_x509_chain_multiple_certificates(): + root_cert, ds_cert = _generate_test_certificates() + + msoi = MsoIssuer( + data=MICOV_DATA, + private_key=PKEY, + validity=_validity(), + alg="ES256", + x509_chain=[ds_cert, root_cert], + ) + + mso = msoi.sign() + x5chain = mso.uhdr[X5chain] + + assert isinstance(x5chain, list) + assert len(x5chain) == 2 + assert x5chain[0] == ds_cert.public_bytes(serialization.Encoding.DER) + assert x5chain[1] == root_cert.public_bytes(serialization.Encoding.DER) + + +def test_mso_issuer_cert_path_and_x509_chain_are_mutually_exclusive(): + _, ds_cert = _generate_test_certificates() + + with pytest.raises(ValueError, match="mutually exclusive"): + MsoIssuer( + data=MICOV_DATA, + private_key=PKEY, + validity=_validity(), + alg="ES256", + cert_path="unused.pem", + x509_chain=[ds_cert], + ) + + +def test_mdoc_cbor_issuer_x509_chain_in_issuer_auth(): + root_cert, ds_cert = _generate_test_certificates() + + mdoc = MdocCborIssuer( + private_key=PKEY, + alg="ES256", + cert_info=CERT_DATA, + ) + mdoc.new( + data=MICOV_DATA, + doctype="org.micov.medical.1", + validity=_validity(), + x509_chain=[ds_cert, root_cert], + ) + + issuer_auth = mdoc.signed["documents"][0]["issuerSigned"]["issuerAuth"] + x5chain = _issuer_auth_x5chain(issuer_auth) + + assert len(x5chain) == 2 + assert x5chain[0] == ds_cert.public_bytes(serialization.Encoding.DER) + assert x5chain[1] == root_cert.public_bytes(serialization.Encoding.DER) diff --git a/pymdoccbor/tools.py b/pymdoccbor/tools.py index 96fadca..95ae19e 100644 --- a/pymdoccbor/tools.py +++ b/pymdoccbor/tools.py @@ -19,20 +19,14 @@ def bytes2CoseSign1(data: bytes) -> Sign1Message: return decoded -def cborlist2CoseSign1(data: list) -> Sign1Message: +def cborlist2CoseSign1(data: list | tuple) -> Sign1Message: """ Gets cbor2 decoded COSE Sign1 as a list and return a COSE_Sign1 object - :param data: list: the COSE Sign1 as a list + :param data: list | tuple: the COSE Sign1 as a list (cbor2 may decode arrays as tuples) :return: Sign1Message: the COSE Sign1 object """ - decoded = Sign1Message.decode( - cbor2.dumps( - cbor2.CBORTag(18, value=data) - ) - ) - - return decoded + return Sign1Message.from_cose_obj(list(data), allow_unknown_attributes=True) def pretty_print(cbor_loaded: dict) -> None: diff --git a/pymdoccbor/x509.py b/pymdoccbor/x509.py index 525053b..ce2ac1e 100644 --- a/pymdoccbor/x509.py +++ b/pymdoccbor/x509.py @@ -8,6 +8,68 @@ from cryptography.x509.oid import NameOID from pycose.keys import CoseKey +X509ChainSource = Union[str, bytes, Certificate] + + +def load_x509_certificates_from_bytes(data: bytes) -> list[Certificate]: + """ + Load one or more X.509 certificates from PEM or DER bytes. + + :param data: bytes: PEM (single or bundle) or DER certificate data + :return: list[Certificate]: parsed certificates in file order + """ + try: + certs = x509.load_pem_x509_certificates(data) + if certs: + return certs + except ValueError: + pass + + return [x509.load_der_x509_certificate(data)] + + +def load_x509_certificates_from_source(source: X509ChainSource) -> list[Certificate]: + """ + Load one or more X.509 certificates from a file path, bytes, or Certificate. + + :param source: str | bytes | Certificate: certificate source + :return: list[Certificate]: parsed certificates + """ + if isinstance(source, Certificate): + return [source] + + if isinstance(source, str): + with open(source, "rb") as cert_file: + data = cert_file.read() + else: + data = source + + return load_x509_certificates_from_bytes(data) + + +def encode_x5chain(sources: list[X509ChainSource]) -> Union[bytes, list[bytes]]: + """ + Build the COSE x5chain value (header label 33) from certificate sources. + + A single certificate is encoded as DER bytes; multiple certificates are + encoded as a list of DER bytes with the Document Signer certificate first. + + :param sources: list of file paths, PEM/DER bytes, or Certificate objects + :return: bytes or list[bytes]: value for COSE unprotected header 33 + """ + der_certs: list[bytes] = [] + for source in sources: + for cert in load_x509_certificates_from_source(source): + der_certs.append(cert.public_bytes(serialization.Encoding.DER)) + + if not der_certs: + raise ValueError("x509_chain must contain at least one certificate") + + if len(der_certs) == 1: + return der_certs[0] + + return der_certs + def selfsigned_x509cert( cert_info: dict[str, Any],