pkcs11_ca_service package

Submodules

pkcs11_ca_service.acme_account module

ACME account module

class pkcs11_ca_service.acme_account.AcmeAccount(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent an ACME account

contact: str
contact_as_list() List[str]

Get the contacts for this account as a json list They are stored in base64url in the database.

Returns:

The json list

Return type:

list[str]

created: str
db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'contact': <class 'str'>, 'created': <class 'str'>, 'id': <class 'str'>, 'public_key_pem': <class 'str'>, 'status': <class 'str'>}
db_reference_fields: Dict[str, str] = {}
db_table_name: str = 'acme_account'
db_unique_fields: List[str] = ['id', 'public_key_pem']
id: str
public_key_pem: str
response_data() Dict[str, Any]

The json view for an acme account

Returns: Dict[str, Any]

status: str
class pkcs11_ca_service.acme_account.AcmeAccountInput(*, serial: int | None = None, id: str | None = None, public_key_pem: str | None = None)

Bases: InputObject

Class to represent an acme account matching from HTTP post data

id: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=Union[str, NoneType], required=False), 'public_key_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'serial': FieldInfo(annotation=Union[int, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

public_key_pem: str | None
serial: int | None
pkcs11_ca_service.acme_account.contact_from_payload(payload: Dict[str, Any]) str

Encode the acme new account contacts to base64url

Parameters:

payload (dict[str, Any]) – The payload part pf the acme JWS.

Returns:

The json list contacts as a base64url encoded string

Return type:

str

pkcs11_ca_service.acme_authorization module

ACME authorization module

class pkcs11_ca_service.acme_authorization.AcmeAuthorization(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent an ACME authorization

account: int
challenges: str
challenges_as_list() List[Dict[str, str]]

Get the authorizations challenges as list. They are stored in base64url.

Returns: List[Dict[str, str]]

created: str
db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'account': <class 'int'>, 'challenges': <class 'str'>, 'created': <class 'str'>, 'expires': <class 'str'>, 'id': <class 'str'>, 'identifier': <class 'str'>, 'status': <class 'str'>, 'wildcard': <class 'int'>}
db_reference_fields: Dict[str, str] = {'acme_order': 'acme_order(serial)'}
db_table_name: str = 'acme_authorization'
db_unique_fields: List[str] = ['id']
expires: str
id: str
identifier: str
identifier_as_dict() Dict[str, str]

Get the orders identifiers as list. They are stored in base64url.

Returns: List[Dict[str, str]]

response_data() Dict[str, Any]

The json view for an acme authorization

Returns: Dict[str, Any]

status: str
wildcard: int
class pkcs11_ca_service.acme_authorization.AcmeAuthorizationInput(*, account: int | None = None, id: str | None = None)

Bases: InputObject

Class to represent an acme authorization matching from HTTP post data

account: int | None
id: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'account': FieldInfo(annotation=Union[int, NoneType], required=False), 'id': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pkcs11_ca_service.acme_authorization.challenges_from_list(challenges: List[Dict[str, str]]) str

Base64url encode the acme challenges.

Parameters: challenges (List[Dict[str, str]]): List of challenges

Returns: str

pkcs11_ca_service.acme_lib module

ACME lib module

Handle the ACME requests

FIXME: handle and check for expired acme objects

exception pkcs11_ca_service.acme_lib.NoSuchKID(message: str = 'No such KID')

Bases: Exception

Class to handle no such kid

async pkcs11_ca_service.acme_lib.account_authzs(acme_authz_input: AcmeAuthorizationInput) List[AcmeAuthorization]

All acme authorizations for the account

Parameters: acme_authz_input (AcmeAuthorizationInput): The AcmeAuthorizationInput for the DB search.

Returns: Union[AcmeAuthorization, None]

async pkcs11_ca_service.acme_lib.account_exists(acme_account_input: AcmeAccountInput) AcmeAccount | None

If the acme account exists (in DB)

Parameters: acme_account_input (AcmeAccountInput): The AcmeAccountInput for the DB search.

Returns: Union[AcmeAccount, None]

pkcs11_ca_service.acme_lib.account_id_from_kid(kid: str) str

Get the account id from the acme KID by only keeping the id part. Example KID: ‘https://acme-server/path-to-acme/acct/djXYss-mx-L0wy3V47OBoNkLiyOQNUObz

Parameters: kid (str): The acme KID.

Returns: str

async pkcs11_ca_service.acme_lib.authz_response(_: AcmeAccount, jws: Dict[str, Any]) JSONResponse

List the acme authorization

Parameters: _ (AcmeAccount): The acme account that made this request. Here for uniformity with the other ACME functions jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.cert_response(account: AcmeAccount, jws: Dict[str, Any]) Response

Fetch the issued certificate. The certs issuer is included, it’s a chain.

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.Response

async pkcs11_ca_service.acme_lib.chall_response(account: AcmeAccount, jws: Dict[str, Any], background_tasks: BackgroundTasks) JSONResponse

List or execute the acme challenge

The challenge will be executed if the payload is an empty json dict ‘{}’ by the server immediately after the http request finishes

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS. background_tasks (fastapi.background.BackgroundTasks): The list of background tasks which will execute the challenge.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.execute_challenge(public_key_pem: str, order: AcmeOrder, authz: AcmeAuthorization, challenge: Dict[str, str]) None

Execute the acme challenge.

Parameters: public_key_pem (str): The acme accounts public key in PEM form. order (AcmeOrder): The acme order for this challenge. authz (AcmeAuthorization): The acme AcmeAuthorization for this challenge. challenge (Dict[str, Any]): The challenge.

async pkcs11_ca_service.acme_lib.existing_account_response(account: AcmeAccount) JSONResponse

List an existing acme account

Parameters: account (AcmeAccount): The acme account class object.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.finalize_order_response(_: AcmeAccount, jws: Dict[str, Any]) JSONResponse
Finalize the order by sending a csr which the acme CA will sign

as long as it conforms to the orders identifiers

Parameters: _ (AcmeAccount): The acme account that made this request. Here for uniformity with the other ACME functions jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.handle_acme_routes(request: Request, background_tasks: BackgroundTasks) Response

All acme routes except new-nonce and directory are handled from here.

Parameters: request (fastapi.Request): The http request. background_tasks (fastapi.background.BackgroundTasks): A background to be run after the http request has finished.

Returns: fastapi.Response

pkcs11_ca_service.acme_lib.http_01_challenge(url: str, token: str, key_authorization: str) bool

Execute the http-01 ACME challenge, return true if successful, false if failed Retry 3 times each after 3 seconds if the challenge fails.

Parameters: url (str): The challenge’s URL token (str): The challenge’s token key_authorization (str): The challenge’s key_authorization

Returns: bool

pkcs11_ca_service.acme_lib.is_expired(expiry_date: str) bool

If the current date is past the expiry date

Parameters: expiry_date (str): The expiry date as string in %Y-%m-%dT%H:%M:%SZ

Returns: bool

async pkcs11_ca_service.acme_lib.key_change_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

Change key for the acme account

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.new_account_response(jws: Dict[str, Any], request_url: str) JSONResponse

Create a new acme account

Parameters: jws (Dict[str, Any): The ACME jws

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.new_authz_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

An ACME pre-authorization

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.new_order_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

Create a new acme order

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.order_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

List the acme order

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.orders_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

List acme orders

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.acme_lib.pem_from_jws(jws: Dict[str, Any]) str

Get a public key in PEM form from a JWS.

Parameters: jws (Dict[str, Any): The JWS.

Returns: str

pkcs11_ca_service.acme_lib.random_string() str

Generate a random string, only base64url chars

Returns: str

async pkcs11_ca_service.acme_lib.revoke_cert_response(jws: Dict[str, Any], request_url: str) Response

Revoke an acme issued cert. Authorization can be normal acme or the JWS being signed by the certs private key.

Parameters: jws (Dict[str, Any]): The JWS. request_url (str): The request url.

Returns: fastapi.responses.Response

async pkcs11_ca_service.acme_lib.sign_cert(csr: CertificationRequest, order: AcmeOrder) None

Sign the validated csr into a certificate.

Parameters: csr (asn1crypto.csr.CertificationRequest): The csr. order (AcmeOrder): The acme order for this request.

async pkcs11_ca_service.acme_lib.sunet_acme_authz(account: AcmeAccount, token: str) JSONResponse

A SUNET type challenge ACME pre-authorization

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.acme_lib.update_account_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse

Update the acme account

Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.acme_lib.validate_csr(csr: CertificationRequest, order: AcmeOrder) None

Validate a csr to its orders identifications.

Parameters: csr (asn1crypto.csr.CertificationRequest): The csr. order (AcmeOrder): The acme order for this request.

pkcs11_ca_service.acme_lib.validate_jwk(jwk: Dict[str, Any], alg: str, signed_data: str, signature: str) None

Validate a JWK signature to the specified alg

Parameters: jwk (Dict[str, Any]): The JWK. alg (str): The signature algorithm. signed_data (str): The signed data in base64url form. signature (str): The signature in base64url form.

Returns: fastapi.Response

async pkcs11_ca_service.acme_lib.validate_jws(input_data: Dict[str, Any], request_url: str) None

Validate an acme JWS by either acme KID or JWK. Also validates url, nonce and alg according to acme

Parameters: input_data (Dict[str, Any]): The JWS. request_url (str): The request url.

Returns: fastapi.Response

async pkcs11_ca_service.acme_lib.validate_kid(kid: str, signed_data: str, signature: str) None

Validate an acme KID signature Raises cryptography.exceptions.InvalidSignature or fastapi.HTTPException if KID not exists.

Parameters: kid (str): The acme KID. signed_data (str): The signed data in base64url form. signature (str): The signature in base64url form.

pkcs11_ca_service.acme_lib.validate_signature(signer_public_key_data: str, signature: str, signed_data: str) None

Validate a signature Raises cryptography.exceptions.InvalidSignature or fastapi.HTTPException if failed

Parameters: signer_public_key_data (str): Public key in PEM form signature (str): The signature in base64url form. signed_data (str): The signed data in base64url form.

pkcs11_ca_service.acme_order module

ACME order module

class pkcs11_ca_service.acme_order.AcmeOrder(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent an ACME order

account: int
authorizations: str
authorizations_as_list() List[str]

Get the orders authorizations as list They are stored in base64url

Returns: List[str]

certificate: str
created: str
db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'account': <class 'int'>, 'authorizations': <class 'str'>, 'certificate': <class 'str'>, 'created': <class 'str'>, 'expires': <class 'str'>, 'finalize': <class 'str'>, 'id': <class 'str'>, 'identifiers': <class 'str'>, 'issued_certificate': <class 'str'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'status': <class 'str'>}
db_reference_fields: Dict[str, str] = {'account': 'acme_account(serial)'}
db_table_name: str = 'acme_order'
db_unique_fields: List[str] = ['id', 'issued_certificate']
expires: str
finalize: str
id: str
identifiers: str
identifiers_as_list() List[Dict[str, str]]

Get the orders identifiers as list. They are stored in base64url.

Returns: List[Dict[str, str]]

issued_certificate: str
not_after: str
not_before: str
response_data() Dict[str, Any]

The json view for an acme order

Returns: Dict[str, Any]

status: str
class pkcs11_ca_service.acme_order.AcmeOrderInput(*, id: str | None = None, account: int | None = None, issued_certificate: str | None = None)

Bases: InputObject

Class to represent an acme order matching from HTTP post data

account: int | None
id: str | None
issued_certificate: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'account': FieldInfo(annotation=Union[int, NoneType], required=False), 'id': FieldInfo(annotation=Union[str, NoneType], required=False), 'issued_certificate': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pkcs11_ca_service.acme_order.authorizations_from_list(auths: List[str]) str

Base64url encode the acme authorizations.

Parameters: auths (List[str]): List of auths.

Returns: str

pkcs11_ca_service.acme_order.identifiers_from_payload(payload: Dict[str, Any]) str

Encode the acme order identifiers to base64url

Parameters: payload (Dict[str, Any): The payload part pf the acme JWS.

Returns: str

pkcs11_ca_service.asn1 module

ASN1 module, mostly using asn1crypto

pkcs11_ca_service.asn1.aia_and_cdp_exts(issuer_path: str, timestamping_usage: bool = False) Extensions

Create AIA and CDP extensions.

Parameters: issuer_path (str): Path to issuer CA.

Returns: asn1crypto.x509.Extensions

pkcs11_ca_service.asn1.cert_as_der(pem: str) bytes

Certificate in DER form.

Parameters: pem (str): PEM certificate input data.

Returns: bytes

pkcs11_ca_service.asn1.cert_from_der(der: bytes) str

Get cert from DER form.

Parameters: der (bytes): DER encoded cert.

Returns: str

pkcs11_ca_service.asn1.cert_is_ca(pem: str) bool

If certificate is a CA.

Parameters: pem (str): PEM certificate input data.

Returns: bool

pkcs11_ca_service.asn1.cert_is_self_signed(pem: str) bool

If certificate is self-signed (root CA)

Parameters: pem (str): PEM certificate input data.

Returns: bool

pkcs11_ca_service.asn1.cert_issued_by_ca(cert_pem: str, issuer_pem: str) bool

If cert is signed by a CA

Parameters: cert_pem (str): cert PEM input data. issuer (str): CA PEM input data.

Returns: bool

pkcs11_ca_service.asn1.cert_pem_serial_number(pem: str) int

Get serial number from cert in pem form.

Parameters: pem (str): PEM certificate input data.

Returns: int

pkcs11_ca_service.asn1.cert_revoked(serial_number: int, crl_pem: str) bool

Check if certs serial number is revoked in the CRL.

Parameters: serial_number (int): Serial number to check crl_pem (str): PEN CRL input data.

Returns: bool

pkcs11_ca_service.asn1.cert_revoked_time(serial_number: int, pem: str) Tuple[datetime, CRLReason | None]

Check if CRL has expired from its next_update field compared to current time.

Parameters: serial_number (int): Serial number for the revoked certificate. pem (str): PEM CRL input data.

Returns: Tuple[datetime.datetime, Union[asn1_crl.CRLReason, None]]

pkcs11_ca_service.asn1.create_jwt_header_str(pub_key: bytes, priv_key: bytes, url: str) str

Create jwt header string.

Parameters: pub_key (bytes): Public key bytes. priv_key (bytes): Private key bytes. url (str): URL to request.

Returns: str

pkcs11_ca_service.asn1.crl_as_der(pem: str) bytes

CRL in DER form.

Parameters: pem (str): PEM CRL input data.

Returns: bytes

pkcs11_ca_service.asn1.crl_expired(pem: str) bool

Check if CRL has expired from its next_update field compared to current time.

Parameters: pem (str): PEM CRL input data.

Returns: bool

pkcs11_ca_service.asn1.csr_from_der(der: bytes) str

Get CSR from DER form.

Parameters: der (bytes): DER encoded csr.

Returns: str

pkcs11_ca_service.asn1.from_base64url(b64url: str) bytes

Decode base64url.

Parameters: b64url (str): Input data.

Returns: bytes

pkcs11_ca_service.asn1.jwk_key_to_pem(data: Dict[str, str]) str

Get pem for public key from jwk key.

Parameters: data (Dict[str, str]): Input data.

Returns: str

pkcs11_ca_service.asn1.jwk_thumbprint(jwk: Dict[str, str]) bytes

Get JWK thumbprint from JWK. https://www.rfc-editor.org/rfc/rfc7638

Parameters: jwk (Dict[str, str]): The JWK.

Returns: bytes

pkcs11_ca_service.asn1.not_before_not_after_from_cert(pem: str) Tuple[str, str]

Get not_before and not_after from cert.

Parameters: pem (str): PEM certificate input data.

Returns: Tuple[str, str]

pkcs11_ca_service.asn1.ocsp_decode(data: str) bytes

Decode OCSP data.

Parameters: data (str): Input data.

Returns: bytes

pkcs11_ca_service.asn1.ocsp_encode(data: bytes) str

Encode OCSP data.

Parameters: data (bytes): Input data.

Returns: str

pkcs11_ca_service.asn1.pem_cert_to_key_hash(pem: str) bytes

Get the key hash from pem data as a dict.

Parameters: pem (str): PEM input data.

Returns: bytes

pkcs11_ca_service.asn1.pem_cert_to_name_dict(pem: str) Dict[str, str]

Get the subject name dict from pem data as a dict.

Parameters: pem (str): PEM input data.

Returns: Dict[str, str]

pkcs11_ca_service.asn1.pem_cert_verify_signature(pem: str, signature: bytes, signed_data: bytes) None

Verify signature done by the certificates private key raises cryptography.exceptions.InvalidSignature if invalid signature or ValueError if the public key is not supported.

Potentially fails if the signature is made using nonstandard hashing of the data.

Parameters: pem (str): PEM input data. signature (bytes): The signature. signed_data (bytes): The signed data.

pkcs11_ca_service.asn1.pem_key_to_jwk(pem: str) Dict[str, str]

Get jwk key from pem.

Parameters: pem (str): Input data.

Returns: Dict[str, str]

pkcs11_ca_service.asn1.pem_to_sha256_fingerprint(pem: str) str

Get the sha256 hash from pem data.

Parameters: pem (str): PEM input data.

Returns: str

pkcs11_ca_service.asn1.public_key_info_to_pem(public_key_info: PublicKeyInfo) str

Get pem from public key info data.

Parameters: public_key_info (asn1crypto.keys.PublicKeyInfo): Input data.

Returns: str

pkcs11_ca_service.asn1.public_key_pem_from_cert(pem: str) str

Get public key in pem from cert.

Parameters: pem (str): PEM certificate input data.

Returns: str

pkcs11_ca_service.asn1.public_key_pem_from_csr(pem: str) str

Get public key in pem from csr.

Parameters: pem (str): csr input data.

Returns: str

pkcs11_ca_service.asn1.public_key_pem_to_sha1_fingerprint(pem: str) str

Get the sha1 fingerprint for the public key in pem data.

See https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2

Parameters: pem (str): PEM input data.

Returns: str

pkcs11_ca_service.asn1.public_key_verify_signature(public_key_info_pem: str, signature: bytes, signed_data: bytes) None

Verify signature with a public key raises cryptography.exceptions.InvalidSignature if invalid signature or ValueError if the public key is not supported.

Potentially fails if the signature is made using nonstandard hashing of the data.

Parameters: public_key_info_pem (str): Public key in info in PEM form

pkcs11_ca_service.asn1.this_update_next_update_from_crl(pem: str) Tuple[str, str]

Get this_update and next_update from crl.

Parameters: pem (str): PEM CRL input data.

Returns: Tuple[str, str]

pkcs11_ca_service.asn1.to_base64url(data: bytes) str

Encode to base64url.

Parameters: data (bytes): Input data.

Returns: str

pkcs11_ca_service.auth module

Module which handle the authorization

async pkcs11_ca_service.auth.authorized_by(request: Request) int

Authorize a request to our http server.

Returns the ID of the public key in DB which was used to authorize the request.

Parameters: request (fastapi.Request): The entire HTTP request which we can extract the JWT and nonce from.

Returns: int

pkcs11_ca_service.base module

Base file which contains the abstract base classes

class pkcs11_ca_service.base.DataBaseObject

Bases: ABC

Abstract base class for database classes

abstract async classmethod delete(table_name: str, unique_field: str, data: str | int) None

Delete data object in DB.

Parameters: table_name (str): Name of the DB table. unique_field (str): Name of a unique_field. data (Union[str, int]): The data in the unique field to delete.

Returns: None

abstract async classmethod load(table_name: str, input_search: Dict[str, str | int], fields: List[str], unique_fields: List[str]) List[Dict[str, str | int]]

Load data objects from DB, returns a list of dict with fields as keys

Parameters: table_name (str): Name of the DB table. input_search (Dict[str, Union[str, int]]): Data to_search for. fields (List[str]): Fields to retrieve. unique_fields (List[str]): Data for the unique fields.

Returns: List[Dict[str, Union[str, int]]]

pkcs11_session: PKCS11Session
abstract async classmethod revoke_data_for_ca(ca_serial: int) Dict[str, str]

Get last CRL for an CA

Parameters: pem (str): The database row serial for the CA

Returns: Dict[str, str]

abstract async classmethod save(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) int

Save data object in DB. Return the DB ID for the data object.

Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.

Returns: int

abstract async classmethod startup(tables: List[str], fields: List[Dict[str, Type[str] | Type[int]]], reference_fields: List[Dict[str, str]], unique_fields: List[List[str]]) bool

Startup for the database. Creates all tables, create and insert the root ca and healthcheck PKCS11 key, if not exists and loads trusted admin keys, return true if DB startup ok

Parameters: tables (List[str]): Table names. fields (List[Dict[str, Union[Type[str], Type[int]]]]): Fields and their data type. reference_fields (List[Dict[str, str]]): Reference field names and their references. unique_fields (List[List[str]]): Unique field names.

Returns: bool

abstract async classmethod update(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) None

Update data object in DB.

Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.

Returns: None

class pkcs11_ca_service.base.DataClassObject(kwargs: Dict[str, str | int])

Bases: ABC

Abstract base class for data classes

db: DataBaseObject
db_data() Dict[str, str | int]

Gather only the data vars matching the DB fields.

Returns: Dict[str, Union[str, int]]

db_fields: Dict[str, Type[str] | Type[int]]
db_reference_fields: Dict[str, str]
db_table_name: str
db_unique_fields: List[str]
async delete() None

Delete data object from its database.

Returns: None

async save(field_set_to_serial: str | None = None) int

Save data object to its database. Return its serial/ID.

Parameters: field_set_to_serial (Union[str, None] = None): Set a field to its serial/ID field. For example if CA is a root CA then set its ‘issuer’ to its own ‘serial’ number

Returns: int

async update() None

Update data object from its database.

Returns: None

class pkcs11_ca_service.base.InputObject

Bases: BaseModel

FastAPI input object for HTTP post data

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

async pkcs11_ca_service.base.db_load_data_class(db_data_class: Type[DataClassObject], input_object: InputObject) List[DataClassObject]

Load data objects from search data from its data fields in DB.

Parameters: db_data_class (Type[DataClassObject]): Which class the object will be. input_object (InputObject): Object with search data.

Returns: int

pkcs11_ca_service.ca module

Module to handle certificate authorities

class pkcs11_ca_service.ca.Ca(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a certificate authority

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'csr': <class 'int'>, 'fingerprint': <class 'str'>, 'issuer': <class 'int'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'path': <class 'str'>, 'pem': <class 'str'>, 'pkcs11_key': <class 'int'>, 'serial_number': <class 'str'>}
db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'csr': 'csr(serial)', 'pkcs11_key': 'pkcs11_key(serial)'}
db_table_name: str = 'ca'
db_unique_fields: List[str] = ['pem', 'pkcs11_key', 'fingerprint', 'path']
async is_revoked() bool

If CA has been revoked

Returns: bool

async issuer_pem() str

The issuer for this CA in PEM form

Returns: str

async revoke(auth_by: int, reason: int | None = None) None

Revoke the certificate authority https://github.com/wbond/asn1crypto/blob/b5f03e6f9797c691a3b812a5bb1acade3a1f4eeb/asn1crypto/crl.py#L97

Parameters: auth_by (int): The revoker public key DB id

Returns: str

class pkcs11_ca_service.ca.CaInput(*, name_dict: Dict[str, str] | None = None, pem: str | None = None, key_label: str | None = None, key_type: str | None = None, issuer_pem: str | None = None, path: str | None = None, pkcs11_key: int | None = None, serial_number: str | None = None)

Bases: InputObject

Class to represent ca matching from HTTP post data

issuer_pem: str | None
key_label: str | None
key_type: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'issuer_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_label': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_type': FieldInfo(annotation=Union[str, NoneType], required=False), 'name_dict': FieldInfo(annotation=Union[Dict[str, str], NoneType], required=False), 'path': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pkcs11_key': FieldInfo(annotation=Union[int, NoneType], required=False), 'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

name_dict: Dict[str, str] | None
path: str | None
pem: str | None
pkcs11_key: int | None
serial_number: str | None
async pkcs11_ca_service.ca.search(input_object: InputObject) JSONResponse

Get cas matching the input_object pattern(s). Returns a fastAPI json response with a list of the cas.

Parameters: input_object (InputObject): The search pattern data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.certificate module

Module to handle certificates

class pkcs11_ca_service.certificate.Certificate(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a certificate

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'csr': <class 'int'>, 'fingerprint': <class 'str'>, 'issuer': <class 'int'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'pem': <class 'str'>, 'public_key': <class 'int'>, 'serial_number': <class 'str'>}
db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'csr': 'csr(serial)', 'issuer': 'ca(serial)', 'public_key': 'public_key(serial)'}
db_table_name: str = 'certificate'
db_unique_fields: List[str] = ['pem', 'fingerprint']
async is_revoked() bool

If certificate has been revoked

Returns: bool

async issuer_pem() str

The issuer for this certificate in PEM form

Returns: str

async revoke(auth_by: int, reason: int | None = None) None

Revoke the certificate. https://github.com/wbond/asn1crypto/blob/b5f03e6f9797c691a3b812a5bb1acade3a1f4eeb/asn1crypto/crl.py#L97

Parameters: auth_by (int): The revoker public key DB id

Returns: str

class pkcs11_ca_service.certificate.CertificateInput(*, pem: str | None = None, fingerprint: str | None = None, serial_number: str | None = None)

Bases: InputObject

Class to represent certificate matching from HTTP post data

fingerprint: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'fingerprint': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pem: str | None
serial_number: str | None
async pkcs11_ca_service.certificate.search(input_object: InputObject) JSONResponse

Get certificates matching the input_object pattern(s). Returns a fastAPI json response with a list of the certificates.

Parameters: input_object (InputObject): The search pattern data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.cmc module

CMC functions

async pkcs11_ca_service.cmc.cmc_handle_request(data: bytes) bytes

Handle and extract CMS CMC request

async pkcs11_ca_service.cmc.cmc_revoke(revoke_data: bytes) None

Revoke a certificate based on the CMC RevokeRequest

async pkcs11_ca_service.cmc.create_cert_from_csr(csr_data: CertificationRequest) Certificate

Create cert from a csr

async pkcs11_ca_service.cmc.create_cmc_response(controls: Controls, created_certs: Dict[int, Certificate], failed: bool) bytes

Create a CMS response containing a CMC package

async pkcs11_ca_service.cmc.create_cmc_response_packet(controls: Controls, created_certs: Dict[int, Certificate], failed: bool) PKIResponse

Create a CMC response package. Revoke cert(s) if the request had a RevokeRequest(s).

pkcs11_ca_service.cmc.create_csr_from_crmf(cert_req: CertReqMsg) CertificationRequest

Manually handle the CRMF request into a CSR since we use CSR as data type in the database and currently all certs have a csr which the are created from

pkcs11_ca_service.config module

Config module

pkcs11_ca_service.crl module

Module to handle crls

class pkcs11_ca_service.crl.Crl(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a crl

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'issuer': <class 'int'>, 'next_update': <class 'str'>, 'pem': <class 'str'>, 'this_update': <class 'str'>}
db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'issuer': 'ca(serial)', 'public_key': 'public_key(serial)'}
db_table_name: str = 'crl'
db_unique_fields: List[str] = ['pem']
class pkcs11_ca_service.crl.CrlInput(*, pem: str | None = None, ca_pem: str | None = None)

Bases: InputObject

Class to represent crl matching from HTTP post data

ca_pem: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'ca_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pem: str | None
async pkcs11_ca_service.crl.search(input_object: InputObject) JSONResponse

Get crls matching the input_object pattern(s). Returns a fastAPI json response with a list of the crls.

Parameters: input_object (InputObject): The search pattern data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.csr module

Module to handle csrs

class pkcs11_ca_service.csr.Csr(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a csr

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'pem': <class 'str'>, 'public_key': <class 'int'>}
db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'public_key': 'public_key(serial)'}
db_table_name: str = 'csr'
db_unique_fields: List[str] = ['pem']
class pkcs11_ca_service.csr.CsrInput(*, pem: str | None = None, ca_pem: str | None = None)

Bases: InputObject

Class to represent csr matching from HTTP post data

ca_pem: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'ca_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pem: str | None
async pkcs11_ca_service.csr.search(input_object: InputObject) JSONResponse

Get csrs matching the input_object pattern(s). Returns a fastAPI json response with a list of the csrs.

Parameters: input_object (InputObject): The search pattern data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.error module

Module which have all our exceptions

exception pkcs11_ca_service.error.DBInitFail(message: str = 'DB init fail')

Bases: Exception

Class to handle DB init

exception pkcs11_ca_service.error.NoSuchDBObject(message: str = 'No such object in DB')

Bases: Exception

Class to handle objects not existing in DB

exception pkcs11_ca_service.error.UnsupportedJWTAlgorithm(message: str = 'Unsupported JWT algorithm')

Bases: Exception

Class to handle unsupported JTW algorithms

exception pkcs11_ca_service.error.WrongDataType(message: str = 'Wrong data type')

Bases: Exception

Class to handle wrong data types

pkcs11_ca_service.main module

Main module, FastAPI runs from here

class pkcs11_ca_service.main.RevokeInput(*, pem: str, reason: int | None = None)

Bases: InputObject

Class to represent revoke specification matching from HTTP post data

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'pem': FieldInfo(annotation=str, required=True), 'reason': FieldInfo(annotation=Union[int, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pem: str
reason: int | None
async pkcs11_ca_service.main.get_acme_directory() JSONResponse

ACME GET directory endpoint

Returns: Response

async pkcs11_ca_service.main.get_acme_new_nonce() Response

ACME GET new-nonce endpoint

Returns: Response

async pkcs11_ca_service.main.get_ca(ca_path: str) Response

/ca, GET method.

Get a ca.

Parameters: ca_path (str): The unique CA path.

Returns: fastapi.Response

/search/ca, GET method.

Fetch all cas.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

/search/certificate, GET method.

Fetch all certificates.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.get_crl(crl_path: str) Response

/ca, GET method.

Get a CRL.

Parameters: crl_path (str): The unique CRL path.

Returns: fastapi.Response

/search/crl, GET method.

Fetch all crls.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

/search/csr, GET method.

Fetch all csrs.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.get_healthcheck(request: Request) JSONResponse

/healthcheck, GET method.

Do a healthcheck. Sign some data.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.get_new_nonce() Response

Get new nonce, GET method.

Returns: fastapi.Response

async pkcs11_ca_service.main.get_ocsp(ocsp_path: str) Response

/ocsp, GET method.

Return an OCSP response.

Parameters: ocsp_path (str): OCSP path.

Returns: fastapi.Response

/search/public_key, GET method.

Fetch all public keys.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.head_acme_new_nonce() Response

ACME HEAD new-nonce endpoint

Returns: Response

async pkcs11_ca_service.main.head_new_nonce() Response

Get new nonce, HEAD method.

Returns: fastapi.Response

async pkcs11_ca_service.main.is_revoked(request: Request, revoke_input: RevokeInput) JSONResponse

/is_revoked, POST method.

If a certificate/CA is revoked

Parameters: request (fastapi.Request): The entire HTTP request. revoke_input (PublicKeyInput): The revoke specification

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_acme_routes(request: Request, background_tasks: BackgroundTasks) Response

Handle all acme endpoints except for /directory endpoint

Parameters: request (Request): The http request background_tasks (BackgroundTasks): The list of background task to append to, for example executing challenges

Returns: Response

async pkcs11_ca_service.main.post_ca(request: Request, ca_input: CaInput) JSONResponse

/ca, POST method.

Post/create a new ca.

Parameters: request (fastapi.Request): The entire HTTP request. ca_input (CaInput): The ca data.

Returns: fastapi.responses.JSONResponse

/search/ca, POST method.

Fetch all cas which matches the search pattern.

Parameters: request (fastapi.Request): The entire HTTP request. ca_input (CaInput): The search pattern.

Returns: fastapi.responses.JSONResponse

/search/certificate, POST method.

Fetch all certificates which matches the search pattern.

Parameters: request (fastapi.Request): The entire HTTP request. certificate_input (CertificateInput): The search pattern.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_cmc(request: Request) Response

CMC fixme

async pkcs11_ca_service.main.post_crl(request: Request, crl_input: CrlInput) JSONResponse

/crl, POST method.

Create a new crl.

Parameters: request (fastapi.Request): The entire HTTP request. crl_input (CrlInput): The crl data.

Returns: fastapi.responses.JSONResponse

/search/crl, POST method.

Fetch all crls which matches the search pattern.

Parameters: request (fastapi.Request): The entire HTTP request. crl_input (CrlInput): The search pattern.

Returns: fastapi.responses.JSONResponse

/search/csr, POST method.

Fetch all csrs which matches the search pattern.

Parameters: request (fastapi.Request): The entire HTTP request. csr_input (CsrInput): The search pattern.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_ocsp(request: Request) Response

/ocsp, POST method.

Return an OCSP response.

Parameters: request (fastapi.Request): The entire HTTP request.

Returns: fastapi.Response

async pkcs11_ca_service.main.post_pkcs11_sign(request: Request) JSONResponse

/pkcs11_sign, POST method.

async pkcs11_ca_service.main.post_public_key(request: Request, public_key_input: PublicKeyInput) JSONResponse

/public_key, POST method.

Post/create a new public key.

Parameters: request (fastapi.Request): The entire HTTP request. public_key_input (PublicKeyInput): The public key data.

Returns: fastapi.responses.JSONResponse

/search/public_key, POST method.

Fetch all public keys which matches the search pattern.

Parameters: request (fastapi.Request): The entire HTTP request. public_key_input (PublicKeyInput): The search pattern.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_revoke(request: Request, revoke_input: RevokeInput) JSONResponse

/revoke, POST method.

Revoke a certificate/CA as a CA is really just a certificate.

Parameters: request (fastapi.Request): The entire HTTP request. revoke_input (PublicKeyInput): The revoke specification

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_sign_csr(request: Request, csr_input: CsrInput) JSONResponse

/sign_csr, POST method.

Post/create a new csr, will create and return certificate.

If a cert has CA:TRUE then treat it as a normal cert since we dont have its private key

Parameters: request (fastapi.Request): The entire HTTP request. csr_input (CsrInput): The csr data.

Returns: fastapi.responses.JSONResponse

async pkcs11_ca_service.main.post_timestamp(request: Request) Response

Timestamp fixme

pkcs11_ca_service.nonce module

Module which handle nonces

pkcs11_ca_service.nonce.generate_nonce() str

Generate, store and returns a nonce

Returns: str

pkcs11_ca_service.nonce.hash_nonce(nonce_base64url: str) str

Hash a nonce, we store nonces hashed by sha256 for the same reason as hashing passwords.

Parameters: nonce_base64url (str): base64url encoded nonce to be hashed.

Returns: str

pkcs11_ca_service.nonce.nonce_response(status_code: int = 200) Response

Create HTTP response containing a new nonce.

Returns: fastapi.Response

async pkcs11_ca_service.nonce.verify_nonce(nonce: str) bool

Verify a nonce, if verified then delete it from the list of nonces.

Parameters: nonce (str): base64url encoded nonce to be verified.

Returns: bool

pkcs11_ca_service.ocsp module

OCSP functions

async pkcs11_ca_service.ocsp.ocsp_response(request: bytes, encoded: bool = False) bytes

Create an OCSP response

Parameters: request (bytes): OCSP request in bytes. encoded (str): If the request is base64 + url encoded

Returns: bytes

pkcs11_ca_service.pkcs11_key module

Module to handle pkcs11 keys

class pkcs11_ca_service.pkcs11_key.Pkcs11Key(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a PKCS11 key

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'key_label': <class 'str'>, 'key_type': <class 'str'>, 'public_key': <class 'int'>}
db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'public_key': 'public_key(serial)'}
db_table_name: str = 'pkcs11_key'
db_unique_fields: List[str] = ['public_key', 'key_label']
class pkcs11_ca_service.pkcs11_key.Pkcs11KeyInput(*, key_label: str | None = None, serial: int | None = None, public_key: int | None = None, key_type: str | None = None)

Bases: InputObject

Class to represent PKCS11 key matching from HTTP post data

key_label: str | None
key_type: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'key_label': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_type': FieldInfo(annotation=Union[str, NoneType], required=False), 'public_key': FieldInfo(annotation=Union[int, NoneType], required=False), 'serial': FieldInfo(annotation=Union[int, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

public_key: int | None
serial: int | None

pkcs11_ca_service.pkcs11_sign module

Module to handle /pkcs11_sign endpoint

async pkcs11_ca_service.pkcs11_sign.pkcs11_sign(request: Dict[str, Any]) JSONResponse

Sign input data in the request schema format using a key in a pkcs11 device. Raises HTTP error if operation or data formats failed/invalid.

Parameters: request (Dict[str, Any]): The input data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.pkcs11_sign.validate_input(request: Dict[str, Any]) None

Validate the json input with our schema, raises HTTP error 400 if invalid

Parameters: request (Dict[str, Any]): The input.

Returns: None

pkcs11_ca_service.postgres_db module

Module which handles database queries

class pkcs11_ca_service.postgres_db.PostgresDB

Bases: DataBaseObject

Class to use with Postgres DB

async classmethod delete(table_name: str, unique_field: str, data: str | int) None

Delete data object in DB.

Parameters: table_name (str): Name of the DB table. unique_field (str): Name of a unique_field. data (Union[str, int]): The data in the unique field to delete.

Returns: None

async classmethod load(table_name: str, input_search: Dict[str, str | int], fields: List[str], unique_fields: List[str]) List[Dict[str, str | int]]

Load data objects from DB, returns a list of dict with fields as keys

Parameters: table_name (str): Name of the DB table. input_search (Dict[str, Union[str, int]]): Data to_search for. fields (List[str]): Fields to retrieve. unique_fields (List[str]): Data for the unique fields.

Returns: List[Dict[str, Union[str, int]]]

pool: Pool
async classmethod revoke_data_for_ca(ca_serial: int) Dict[str, str]

Get last CRL for an CA

Parameters: pem (str): The database row serial for the CA

Returns: Dict[str, str]

async classmethod save(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) int

Save data object in DB. Return the DB ID for the data object.

Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.

Returns: int

async classmethod startup(tables: List[str], fields: List[Dict[str, Type[str] | Type[int]]], reference_fields: List[Dict[str, str]], unique_fields: List[List[str]]) bool

Startup for the database. Creates all tables, create and insert the root ca and healthcheck PKCS11 key, if not exists and loads trusted admin keys, return true if DB startup ok

Parameters: tables (List[str]): Table names. fields (List[Dict[str, Union[Type[str], Type[int]]]]): Fields and their data type. reference_fields (List[Dict[str, str]]): Reference field names and their references. unique_fields (List[List[str]]): Unique field names.

Returns: bool

async classmethod update(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) None

Update data object in DB.

Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.

Returns: None

pkcs11_ca_service.public_key module

Module to handle public keys

class pkcs11_ca_service.public_key.PublicKey(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a public key

db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'admin': <class 'int'>, 'authorized_by': <class 'int'>, 'created': <class 'str'>, 'fingerprint': <class 'str'>, 'info': <class 'str'>, 'pem': <class 'str'>}
db_reference_fields: Dict[str, str] = {}
db_table_name: str = 'public_key'
db_unique_fields: List[str] = ['pem', 'fingerprint']
class pkcs11_ca_service.public_key.PublicKeyInput(*, pem: str | None = None, fingerprint: str | None = None, admin: int | None = None)

Bases: InputObject

Class to represent public key matching from HTTP post data

admin: int | None
fingerprint: str | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'admin': FieldInfo(annotation=Union[int, NoneType], required=False), 'fingerprint': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pem: str | None
async pkcs11_ca_service.public_key.search(input_object: InputObject) JSONResponse

Get public keys matching the input_object pattern(s). Returns a fastAPI json response with a list of the public keys.

Parameters: input_object (InputObject): The search pattern data.

Returns: fastapi.responses.JSONResponse

pkcs11_ca_service.route_functions module

Route functions

async pkcs11_ca_service.route_functions.ca_request(ca_input: CaInput) Ca

Get CA object.

Parameters: ca_input (CaInput): CA input object.

Returns: Ca

async pkcs11_ca_service.route_functions.crl_request(auth_by: int, issuer_obj: Ca) str

Get CRL.

Parameters: auth_by (int): Who is the author, comes from the auth.py file issuer_object (Ca): CA object of its issuer,

Returns: str

async pkcs11_ca_service.route_functions.healthcheck() JSONResponse

Healthcheck, query the DB, sign som data. If fail then send http status code 503.

Returns: JSONResponse

async pkcs11_ca_service.route_functions.pkcs11_key_request(pkcs11_key_input: Pkcs11KeyInput) Pkcs11Key

Get pkcs11 key object.

Parameters: pkcs11_key_input (Pkcs11KeyInput): PKCS11 key input object.

Returns: Pkcs11Key

async pkcs11_ca_service.route_functions.public_key_request(public_key_input: PublicKeyInput) PublicKey

Get public key object.

Parameters: public_key_input (CaInput): Public key input object.

Returns: PublicKey

async pkcs11_ca_service.route_functions.sign_csr(auth_by: int, issuer_obj: Ca, csr_obj: Csr, public_key_obj: PublicKey) str
Parameters:
  • auth_by – DB public key id.

  • issuer_obj – Which CA should sign the CSR.

  • csr_obj – Which CSR object will be signed.

  • public_key_obj – Which public key object created this csr.

Returns:

str

pkcs11_ca_service.startup module

Startup module

async pkcs11_ca_service.startup.startup() None

Startup main function

Returns: None

pkcs11_ca_service.timestamp module

Timestamp functions

class pkcs11_ca_service.timestamp.Timestamp(kwargs: Dict[str, str | int])

Bases: DataClassObject

Class to represent a timestamp

cert_key_label: str
created: str
db: DataBaseObject
db_fields: Dict[str, Type[str] | Type[int]] = {'cert_key_label': <class 'str'>, 'created': <class 'str'>, 'gen_time': <class 'str'>, 'serial_number': <class 'str'>}
db_reference_fields: Dict[str, str] = {}
db_table_name: str = 'timestamp'
db_unique_fields: List[str] = ['serial_number']
gen_time: str
serial_number: str
class pkcs11_ca_service.timestamp.TimestampInput(*, serial_number: str | None = None)

Bases: InputObject

Class to represent a timestamp matching from HTTP post data

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

serial_number: str | None
async pkcs11_ca_service.timestamp.create_timestamp_certificate(issuer_path: str, key_label: str, subject_name: Dict[str, str], key_type: str, signer_key_label: str, signer_subject_name: Dict[str, str], signer_key_type: str) str

Create a timestamp certificate signed by a PKCS11 key. Returns the PEM encoded certificate

Parameters: issuer_path (str): The path for CRL and AIA extension to write into the cert key_label (str): The pkcs11 key label to create the certificate’s key. subject_name (Dict[str, str]): The certificate’s subject name dict. key_type (str): The certificate’s key type. signer_key_label (str): The pkcs11 key label to sign the certificate. signer_subject_name (Dict[str, str]): The certificate’s issuers subject name dict. signer_key_type (str): The certificate’s signers key type.

Returns: str

async pkcs11_ca_service.timestamp.create_timestamp_response(hashed_message: bytes, nonce: int | None = None) bytes

Create a timestamp certificate.

Parameters: hashed_message (bytes): The hashed message. nonce (Union[int, None]): The request nonce.

Returns: bytes

async pkcs11_ca_service.timestamp.create_timestamp_response_packet(hashed_message: bytes, nonce: int | None) TSTInfo

Create a timestamp response asn1 package

Parameters: hashed_message (bytes): The timestamp request. nonce (Union[int, None]): The nonce

Returns: asn1crypto.tsp.TSTInfo

async pkcs11_ca_service.timestamp.create_timestamp_signed_data(hashed_message: bytes, nonce: int | None) ContentInfo

Create a timestamp signed data asn1 structure

Parameters: hashed_message (bytes): The timestamp request. nonce (Union[int, None]): The nonce

Returns: asn1crypto.cms.ContentInfo

async pkcs11_ca_service.timestamp.timestamp_handle_request(data: bytes) bytes

Handle and respond to a timestamp request.

Parameters: data (bytes): The timestamp request.

Returns: bytes

pkcs11_ca_service.validate module

Module to validate data

pkcs11_ca_service.validate.validate_input_string(input_string: str) None

Validate an input string. We only allow letters, numbers, [+, -, _, /, =, ‘ ‘, ] and swedish [å, ä, ö]

Parameters: input_string (str): Input data.

Returns: None

Module contents

Python CA service