pkcs11_ca_service package¶
Submodules¶
pkcs11_ca_service.acme_account module¶
ACME account module
- class pkcs11_ca_service.acme_account.AcmeAccount(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent an ACME account
- contact: str¶
- contact_as_list() List[str]¶
Get the contacts for this account as a json list They are stored in base64url in the database.
- Returns:
The json list
- Return type:
list[str]
- created: str¶
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'contact': <class 'str'>, 'created': <class 'str'>, 'id': <class 'str'>, 'public_key_pem': <class 'str'>, 'status': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {}¶
- db_table_name: str = 'acme_account'¶
- db_unique_fields: List[str] = ['id', 'public_key_pem']¶
- id: str¶
- public_key_pem: str¶
- response_data() Dict[str, Any]¶
The json view for an acme account
Returns: Dict[str, Any]
- status: str¶
- class pkcs11_ca_service.acme_account.AcmeAccountInput(*, serial: int | None = None, id: str | None = None, public_key_pem: str | None = None)¶
Bases:
InputObjectClass to represent an acme account matching from HTTP post data
- id: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'id': FieldInfo(annotation=Union[str, NoneType], required=False), 'public_key_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'serial': FieldInfo(annotation=Union[int, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- public_key_pem: str | None¶
- serial: int | None¶
- pkcs11_ca_service.acme_account.contact_from_payload(payload: Dict[str, Any]) str¶
Encode the acme new account contacts to base64url
- Parameters:
payload (dict[str, Any]) – The payload part pf the acme JWS.
- Returns:
The json list contacts as a base64url encoded string
- Return type:
str
pkcs11_ca_service.acme_lib module¶
ACME lib module
Handle the ACME requests
FIXME: handle and check for expired acme objects
- exception pkcs11_ca_service.acme_lib.NoSuchKID(message: str = 'No such KID')¶
Bases:
ExceptionClass to handle no such kid
- async pkcs11_ca_service.acme_lib.account_authzs(acme_authz_input: AcmeAuthorizationInput) List[AcmeAuthorization]¶
All acme authorizations for the account
Parameters: acme_authz_input (AcmeAuthorizationInput): The AcmeAuthorizationInput for the DB search.
Returns: Union[AcmeAuthorization, None]
- async pkcs11_ca_service.acme_lib.account_exists(acme_account_input: AcmeAccountInput) AcmeAccount | None¶
If the acme account exists (in DB)
Parameters: acme_account_input (AcmeAccountInput): The AcmeAccountInput for the DB search.
Returns: Union[AcmeAccount, None]
- pkcs11_ca_service.acme_lib.account_id_from_kid(kid: str) str¶
Get the account id from the acme KID by only keeping the id part. Example KID: ‘https://acme-server/path-to-acme/acct/djXYss-mx-L0wy3V47OBoNkLiyOQNUObz’
Parameters: kid (str): The acme KID.
Returns: str
- async pkcs11_ca_service.acme_lib.authz_response(_: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
List the acme authorization
Parameters: _ (AcmeAccount): The acme account that made this request. Here for uniformity with the other ACME functions jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.cert_response(account: AcmeAccount, jws: Dict[str, Any]) Response¶
Fetch the issued certificate. The certs issuer is included, it’s a chain.
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.Response
- async pkcs11_ca_service.acme_lib.chall_response(account: AcmeAccount, jws: Dict[str, Any], background_tasks: BackgroundTasks) JSONResponse¶
List or execute the acme challenge
The challenge will be executed if the payload is an empty json dict ‘{}’ by the server immediately after the http request finishes
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS. background_tasks (fastapi.background.BackgroundTasks): The list of background tasks which will execute the challenge.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.execute_challenge(public_key_pem: str, order: AcmeOrder, authz: AcmeAuthorization, challenge: Dict[str, str]) None¶
Execute the acme challenge.
Parameters: public_key_pem (str): The acme accounts public key in PEM form. order (AcmeOrder): The acme order for this challenge. authz (AcmeAuthorization): The acme AcmeAuthorization for this challenge. challenge (Dict[str, Any]): The challenge.
- async pkcs11_ca_service.acme_lib.existing_account_response(account: AcmeAccount) JSONResponse¶
List an existing acme account
Parameters: account (AcmeAccount): The acme account class object.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.finalize_order_response(_: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
- Finalize the order by sending a csr which the acme CA will sign
as long as it conforms to the orders identifiers
Parameters: _ (AcmeAccount): The acme account that made this request. Here for uniformity with the other ACME functions jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.handle_acme_routes(request: Request, background_tasks: BackgroundTasks) Response¶
All acme routes except new-nonce and directory are handled from here.
Parameters: request (fastapi.Request): The http request. background_tasks (fastapi.background.BackgroundTasks): A background to be run after the http request has finished.
Returns: fastapi.Response
- pkcs11_ca_service.acme_lib.http_01_challenge(url: str, token: str, key_authorization: str) bool¶
Execute the http-01 ACME challenge, return true if successful, false if failed Retry 3 times each after 3 seconds if the challenge fails.
Parameters: url (str): The challenge’s URL token (str): The challenge’s token key_authorization (str): The challenge’s key_authorization
Returns: bool
- pkcs11_ca_service.acme_lib.is_expired(expiry_date: str) bool¶
If the current date is past the expiry date
Parameters: expiry_date (str): The expiry date as string in %Y-%m-%dT%H:%M:%SZ
Returns: bool
- async pkcs11_ca_service.acme_lib.key_change_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
Change key for the acme account
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.new_account_response(jws: Dict[str, Any], request_url: str) JSONResponse¶
Create a new acme account
Parameters: jws (Dict[str, Any): The ACME jws
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.new_authz_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
An ACME pre-authorization
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.new_order_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
Create a new acme order
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.order_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
List the acme order
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.orders_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
List acme orders
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- pkcs11_ca_service.acme_lib.pem_from_jws(jws: Dict[str, Any]) str¶
Get a public key in PEM form from a JWS.
Parameters: jws (Dict[str, Any): The JWS.
Returns: str
- pkcs11_ca_service.acme_lib.random_string() str¶
Generate a random string, only base64url chars
Returns: str
- async pkcs11_ca_service.acme_lib.revoke_cert_response(jws: Dict[str, Any], request_url: str) Response¶
Revoke an acme issued cert. Authorization can be normal acme or the JWS being signed by the certs private key.
Parameters: jws (Dict[str, Any]): The JWS. request_url (str): The request url.
Returns: fastapi.responses.Response
- async pkcs11_ca_service.acme_lib.sign_cert(csr: CertificationRequest, order: AcmeOrder) None¶
Sign the validated csr into a certificate.
Parameters: csr (asn1crypto.csr.CertificationRequest): The csr. order (AcmeOrder): The acme order for this request.
- async pkcs11_ca_service.acme_lib.sunet_acme_authz(account: AcmeAccount, token: str) JSONResponse¶
A SUNET type challenge ACME pre-authorization
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.acme_lib.update_account_response(account: AcmeAccount, jws: Dict[str, Any]) JSONResponse¶
Update the acme account
Parameters: account (AcmeAccount): The acme account that made this request. jws (Dict[str, Any): The JWS.
Returns: fastapi.responses.JSONResponse
- pkcs11_ca_service.acme_lib.validate_csr(csr: CertificationRequest, order: AcmeOrder) None¶
Validate a csr to its orders identifications.
Parameters: csr (asn1crypto.csr.CertificationRequest): The csr. order (AcmeOrder): The acme order for this request.
- pkcs11_ca_service.acme_lib.validate_jwk(jwk: Dict[str, Any], alg: str, signed_data: str, signature: str) None¶
Validate a JWK signature to the specified alg
Parameters: jwk (Dict[str, Any]): The JWK. alg (str): The signature algorithm. signed_data (str): The signed data in base64url form. signature (str): The signature in base64url form.
Returns: fastapi.Response
- async pkcs11_ca_service.acme_lib.validate_jws(input_data: Dict[str, Any], request_url: str) None¶
Validate an acme JWS by either acme KID or JWK. Also validates url, nonce and alg according to acme
Parameters: input_data (Dict[str, Any]): The JWS. request_url (str): The request url.
Returns: fastapi.Response
- async pkcs11_ca_service.acme_lib.validate_kid(kid: str, signed_data: str, signature: str) None¶
Validate an acme KID signature Raises cryptography.exceptions.InvalidSignature or fastapi.HTTPException if KID not exists.
Parameters: kid (str): The acme KID. signed_data (str): The signed data in base64url form. signature (str): The signature in base64url form.
- pkcs11_ca_service.acme_lib.validate_signature(signer_public_key_data: str, signature: str, signed_data: str) None¶
Validate a signature Raises cryptography.exceptions.InvalidSignature or fastapi.HTTPException if failed
Parameters: signer_public_key_data (str): Public key in PEM form signature (str): The signature in base64url form. signed_data (str): The signed data in base64url form.
pkcs11_ca_service.acme_order module¶
ACME order module
- class pkcs11_ca_service.acme_order.AcmeOrder(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent an ACME order
- account: int¶
- authorizations: str¶
- authorizations_as_list() List[str]¶
Get the orders authorizations as list They are stored in base64url
Returns: List[str]
- certificate: str¶
- created: str¶
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'account': <class 'int'>, 'authorizations': <class 'str'>, 'certificate': <class 'str'>, 'created': <class 'str'>, 'expires': <class 'str'>, 'finalize': <class 'str'>, 'id': <class 'str'>, 'identifiers': <class 'str'>, 'issued_certificate': <class 'str'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'status': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {'account': 'acme_account(serial)'}¶
- db_table_name: str = 'acme_order'¶
- db_unique_fields: List[str] = ['id', 'issued_certificate']¶
- expires: str¶
- finalize: str¶
- id: str¶
- identifiers: str¶
- identifiers_as_list() List[Dict[str, str]]¶
Get the orders identifiers as list. They are stored in base64url.
Returns: List[Dict[str, str]]
- issued_certificate: str¶
- not_after: str¶
- not_before: str¶
- response_data() Dict[str, Any]¶
The json view for an acme order
Returns: Dict[str, Any]
- status: str¶
- class pkcs11_ca_service.acme_order.AcmeOrderInput(*, id: str | None = None, account: int | None = None, issued_certificate: str | None = None)¶
Bases:
InputObjectClass to represent an acme order matching from HTTP post data
- account: int | None¶
- id: str | None¶
- issued_certificate: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'account': FieldInfo(annotation=Union[int, NoneType], required=False), 'id': FieldInfo(annotation=Union[str, NoneType], required=False), 'issued_certificate': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pkcs11_ca_service.acme_order.authorizations_from_list(auths: List[str]) str¶
Base64url encode the acme authorizations.
Parameters: auths (List[str]): List of auths.
Returns: str
- pkcs11_ca_service.acme_order.identifiers_from_payload(payload: Dict[str, Any]) str¶
Encode the acme order identifiers to base64url
Parameters: payload (Dict[str, Any): The payload part pf the acme JWS.
Returns: str
pkcs11_ca_service.asn1 module¶
ASN1 module, mostly using asn1crypto
- pkcs11_ca_service.asn1.aia_and_cdp_exts(issuer_path: str, timestamping_usage: bool = False) Extensions¶
Create AIA and CDP extensions.
Parameters: issuer_path (str): Path to issuer CA.
Returns: asn1crypto.x509.Extensions
- pkcs11_ca_service.asn1.cert_as_der(pem: str) bytes¶
Certificate in DER form.
Parameters: pem (str): PEM certificate input data.
Returns: bytes
- pkcs11_ca_service.asn1.cert_from_der(der: bytes) str¶
Get cert from DER form.
Parameters: der (bytes): DER encoded cert.
Returns: str
- pkcs11_ca_service.asn1.cert_is_ca(pem: str) bool¶
If certificate is a CA.
Parameters: pem (str): PEM certificate input data.
Returns: bool
- pkcs11_ca_service.asn1.cert_is_self_signed(pem: str) bool¶
If certificate is self-signed (root CA)
Parameters: pem (str): PEM certificate input data.
Returns: bool
- pkcs11_ca_service.asn1.cert_issued_by_ca(cert_pem: str, issuer_pem: str) bool¶
If cert is signed by a CA
Parameters: cert_pem (str): cert PEM input data. issuer (str): CA PEM input data.
Returns: bool
- pkcs11_ca_service.asn1.cert_pem_serial_number(pem: str) int¶
Get serial number from cert in pem form.
Parameters: pem (str): PEM certificate input data.
Returns: int
- pkcs11_ca_service.asn1.cert_revoked(serial_number: int, crl_pem: str) bool¶
Check if certs serial number is revoked in the CRL.
Parameters: serial_number (int): Serial number to check crl_pem (str): PEN CRL input data.
Returns: bool
- pkcs11_ca_service.asn1.cert_revoked_time(serial_number: int, pem: str) Tuple[datetime, CRLReason | None]¶
Check if CRL has expired from its next_update field compared to current time.
Parameters: serial_number (int): Serial number for the revoked certificate. pem (str): PEM CRL input data.
Returns: Tuple[datetime.datetime, Union[asn1_crl.CRLReason, None]]
- pkcs11_ca_service.asn1.create_jwt_header_str(pub_key: bytes, priv_key: bytes, url: str) str¶
Create jwt header string.
Parameters: pub_key (bytes): Public key bytes. priv_key (bytes): Private key bytes. url (str): URL to request.
Returns: str
- pkcs11_ca_service.asn1.crl_as_der(pem: str) bytes¶
CRL in DER form.
Parameters: pem (str): PEM CRL input data.
Returns: bytes
- pkcs11_ca_service.asn1.crl_expired(pem: str) bool¶
Check if CRL has expired from its next_update field compared to current time.
Parameters: pem (str): PEM CRL input data.
Returns: bool
- pkcs11_ca_service.asn1.csr_from_der(der: bytes) str¶
Get CSR from DER form.
Parameters: der (bytes): DER encoded csr.
Returns: str
- pkcs11_ca_service.asn1.from_base64url(b64url: str) bytes¶
Decode base64url.
Parameters: b64url (str): Input data.
Returns: bytes
- pkcs11_ca_service.asn1.jwk_key_to_pem(data: Dict[str, str]) str¶
Get pem for public key from jwk key.
Parameters: data (Dict[str, str]): Input data.
Returns: str
- pkcs11_ca_service.asn1.jwk_thumbprint(jwk: Dict[str, str]) bytes¶
Get JWK thumbprint from JWK. https://www.rfc-editor.org/rfc/rfc7638
Parameters: jwk (Dict[str, str]): The JWK.
Returns: bytes
- pkcs11_ca_service.asn1.not_before_not_after_from_cert(pem: str) Tuple[str, str]¶
Get not_before and not_after from cert.
Parameters: pem (str): PEM certificate input data.
Returns: Tuple[str, str]
- pkcs11_ca_service.asn1.ocsp_decode(data: str) bytes¶
Decode OCSP data.
Parameters: data (str): Input data.
Returns: bytes
- pkcs11_ca_service.asn1.ocsp_encode(data: bytes) str¶
Encode OCSP data.
Parameters: data (bytes): Input data.
Returns: str
- pkcs11_ca_service.asn1.pem_cert_to_key_hash(pem: str) bytes¶
Get the key hash from pem data as a dict.
Parameters: pem (str): PEM input data.
Returns: bytes
- pkcs11_ca_service.asn1.pem_cert_to_name_dict(pem: str) Dict[str, str]¶
Get the subject name dict from pem data as a dict.
Parameters: pem (str): PEM input data.
Returns: Dict[str, str]
- pkcs11_ca_service.asn1.pem_cert_verify_signature(pem: str, signature: bytes, signed_data: bytes) None¶
Verify signature done by the certificates private key raises cryptography.exceptions.InvalidSignature if invalid signature or ValueError if the public key is not supported.
Potentially fails if the signature is made using nonstandard hashing of the data.
Parameters: pem (str): PEM input data. signature (bytes): The signature. signed_data (bytes): The signed data.
- pkcs11_ca_service.asn1.pem_key_to_jwk(pem: str) Dict[str, str]¶
Get jwk key from pem.
Parameters: pem (str): Input data.
Returns: Dict[str, str]
- pkcs11_ca_service.asn1.pem_to_sha256_fingerprint(pem: str) str¶
Get the sha256 hash from pem data.
Parameters: pem (str): PEM input data.
Returns: str
- pkcs11_ca_service.asn1.public_key_info_to_pem(public_key_info: PublicKeyInfo) str¶
Get pem from public key info data.
Parameters: public_key_info (asn1crypto.keys.PublicKeyInfo): Input data.
Returns: str
- pkcs11_ca_service.asn1.public_key_pem_from_cert(pem: str) str¶
Get public key in pem from cert.
Parameters: pem (str): PEM certificate input data.
Returns: str
- pkcs11_ca_service.asn1.public_key_pem_from_csr(pem: str) str¶
Get public key in pem from csr.
Parameters: pem (str): csr input data.
Returns: str
- pkcs11_ca_service.asn1.public_key_pem_to_sha1_fingerprint(pem: str) str¶
Get the sha1 fingerprint for the public key in pem data.
See https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2
Parameters: pem (str): PEM input data.
Returns: str
- pkcs11_ca_service.asn1.public_key_verify_signature(public_key_info_pem: str, signature: bytes, signed_data: bytes) None¶
Verify signature with a public key raises cryptography.exceptions.InvalidSignature if invalid signature or ValueError if the public key is not supported.
Potentially fails if the signature is made using nonstandard hashing of the data.
Parameters: public_key_info_pem (str): Public key in info in PEM form
- pkcs11_ca_service.asn1.this_update_next_update_from_crl(pem: str) Tuple[str, str]¶
Get this_update and next_update from crl.
Parameters: pem (str): PEM CRL input data.
Returns: Tuple[str, str]
- pkcs11_ca_service.asn1.to_base64url(data: bytes) str¶
Encode to base64url.
Parameters: data (bytes): Input data.
Returns: str
pkcs11_ca_service.auth module¶
Module which handle the authorization
- async pkcs11_ca_service.auth.authorized_by(request: Request) int¶
Authorize a request to our http server.
Returns the ID of the public key in DB which was used to authorize the request.
Parameters: request (fastapi.Request): The entire HTTP request which we can extract the JWT and nonce from.
Returns: int
pkcs11_ca_service.base module¶
Base file which contains the abstract base classes
- class pkcs11_ca_service.base.DataBaseObject¶
Bases:
ABCAbstract base class for database classes
- abstract async classmethod delete(table_name: str, unique_field: str, data: str | int) None¶
Delete data object in DB.
Parameters: table_name (str): Name of the DB table. unique_field (str): Name of a unique_field. data (Union[str, int]): The data in the unique field to delete.
Returns: None
- abstract async classmethod load(table_name: str, input_search: Dict[str, str | int], fields: List[str], unique_fields: List[str]) List[Dict[str, str | int]]¶
Load data objects from DB, returns a list of dict with fields as keys
Parameters: table_name (str): Name of the DB table. input_search (Dict[str, Union[str, int]]): Data to_search for. fields (List[str]): Fields to retrieve. unique_fields (List[str]): Data for the unique fields.
Returns: List[Dict[str, Union[str, int]]]
- pkcs11_session: PKCS11Session¶
- abstract async classmethod revoke_data_for_ca(ca_serial: int) Dict[str, str]¶
Get last CRL for an CA
Parameters: pem (str): The database row serial for the CA
Returns: Dict[str, str]
- abstract async classmethod save(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) int¶
Save data object in DB. Return the DB ID for the data object.
Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.
Returns: int
- abstract async classmethod startup(tables: List[str], fields: List[Dict[str, Type[str] | Type[int]]], reference_fields: List[Dict[str, str]], unique_fields: List[List[str]]) bool¶
Startup for the database. Creates all tables, create and insert the root ca and healthcheck PKCS11 key, if not exists and loads trusted admin keys, return true if DB startup ok
Parameters: tables (List[str]): Table names. fields (List[Dict[str, Union[Type[str], Type[int]]]]): Fields and their data type. reference_fields (List[Dict[str, str]]): Reference field names and their references. unique_fields (List[List[str]]): Unique field names.
Returns: bool
- abstract async classmethod update(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) None¶
Update data object in DB.
Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.
Returns: None
- class pkcs11_ca_service.base.DataClassObject(kwargs: Dict[str, str | int])¶
Bases:
ABCAbstract base class for data classes
- db: DataBaseObject¶
- db_data() Dict[str, str | int]¶
Gather only the data vars matching the DB fields.
Returns: Dict[str, Union[str, int]]
- db_fields: Dict[str, Type[str] | Type[int]]¶
- db_reference_fields: Dict[str, str]¶
- db_table_name: str¶
- db_unique_fields: List[str]¶
- async delete() None¶
Delete data object from its database.
Returns: None
- async save(field_set_to_serial: str | None = None) int¶
Save data object to its database. Return its serial/ID.
Parameters: field_set_to_serial (Union[str, None] = None): Set a field to its serial/ID field. For example if CA is a root CA then set its ‘issuer’ to its own ‘serial’ number
Returns: int
- async update() None¶
Update data object from its database.
Returns: None
- class pkcs11_ca_service.base.InputObject¶
Bases:
BaseModelFastAPI input object for HTTP post data
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- async pkcs11_ca_service.base.db_load_data_class(db_data_class: Type[DataClassObject], input_object: InputObject) List[DataClassObject]¶
Load data objects from search data from its data fields in DB.
Parameters: db_data_class (Type[DataClassObject]): Which class the object will be. input_object (InputObject): Object with search data.
Returns: int
pkcs11_ca_service.ca module¶
Module to handle certificate authorities
- class pkcs11_ca_service.ca.Ca(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a certificate authority
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'csr': <class 'int'>, 'fingerprint': <class 'str'>, 'issuer': <class 'int'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'path': <class 'str'>, 'pem': <class 'str'>, 'pkcs11_key': <class 'int'>, 'serial_number': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'csr': 'csr(serial)', 'pkcs11_key': 'pkcs11_key(serial)'}¶
- db_table_name: str = 'ca'¶
- db_unique_fields: List[str] = ['pem', 'pkcs11_key', 'fingerprint', 'path']¶
- async is_revoked() bool¶
If CA has been revoked
Returns: bool
- async issuer_pem() str¶
The issuer for this CA in PEM form
Returns: str
- async revoke(auth_by: int, reason: int | None = None) None¶
Revoke the certificate authority https://github.com/wbond/asn1crypto/blob/b5f03e6f9797c691a3b812a5bb1acade3a1f4eeb/asn1crypto/crl.py#L97
Parameters: auth_by (int): The revoker public key DB id
Returns: str
- class pkcs11_ca_service.ca.CaInput(*, name_dict: Dict[str, str] | None = None, pem: str | None = None, key_label: str | None = None, key_type: str | None = None, issuer_pem: str | None = None, path: str | None = None, pkcs11_key: int | None = None, serial_number: str | None = None)¶
Bases:
InputObjectClass to represent ca matching from HTTP post data
- issuer_pem: str | None¶
- key_label: str | None¶
- key_type: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'issuer_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_label': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_type': FieldInfo(annotation=Union[str, NoneType], required=False), 'name_dict': FieldInfo(annotation=Union[Dict[str, str], NoneType], required=False), 'path': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pkcs11_key': FieldInfo(annotation=Union[int, NoneType], required=False), 'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- name_dict: Dict[str, str] | None¶
- path: str | None¶
- pem: str | None¶
- pkcs11_key: int | None¶
- serial_number: str | None¶
- async pkcs11_ca_service.ca.search(input_object: InputObject) JSONResponse¶
Get cas matching the input_object pattern(s). Returns a fastAPI json response with a list of the cas.
Parameters: input_object (InputObject): The search pattern data.
Returns: fastapi.responses.JSONResponse
pkcs11_ca_service.certificate module¶
Module to handle certificates
- class pkcs11_ca_service.certificate.Certificate(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a certificate
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'csr': <class 'int'>, 'fingerprint': <class 'str'>, 'issuer': <class 'int'>, 'not_after': <class 'str'>, 'not_before': <class 'str'>, 'pem': <class 'str'>, 'public_key': <class 'int'>, 'serial_number': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'csr': 'csr(serial)', 'issuer': 'ca(serial)', 'public_key': 'public_key(serial)'}¶
- db_table_name: str = 'certificate'¶
- db_unique_fields: List[str] = ['pem', 'fingerprint']¶
- async is_revoked() bool¶
If certificate has been revoked
Returns: bool
- async issuer_pem() str¶
The issuer for this certificate in PEM form
Returns: str
- async revoke(auth_by: int, reason: int | None = None) None¶
Revoke the certificate. https://github.com/wbond/asn1crypto/blob/b5f03e6f9797c691a3b812a5bb1acade3a1f4eeb/asn1crypto/crl.py#L97
Parameters: auth_by (int): The revoker public key DB id
Returns: str
- class pkcs11_ca_service.certificate.CertificateInput(*, pem: str | None = None, fingerprint: str | None = None, serial_number: str | None = None)¶
Bases:
InputObjectClass to represent certificate matching from HTTP post data
- fingerprint: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'fingerprint': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pem: str | None¶
- serial_number: str | None¶
- async pkcs11_ca_service.certificate.search(input_object: InputObject) JSONResponse¶
Get certificates matching the input_object pattern(s). Returns a fastAPI json response with a list of the certificates.
Parameters: input_object (InputObject): The search pattern data.
Returns: fastapi.responses.JSONResponse
pkcs11_ca_service.cmc module¶
CMC functions
- async pkcs11_ca_service.cmc.cmc_handle_request(data: bytes) bytes¶
Handle and extract CMS CMC request
- async pkcs11_ca_service.cmc.cmc_revoke(revoke_data: bytes) None¶
Revoke a certificate based on the CMC RevokeRequest
- async pkcs11_ca_service.cmc.create_cert_from_csr(csr_data: CertificationRequest) Certificate¶
Create cert from a csr
- async pkcs11_ca_service.cmc.create_cmc_response(controls: Controls, created_certs: Dict[int, Certificate], failed: bool) bytes¶
Create a CMS response containing a CMC package
- async pkcs11_ca_service.cmc.create_cmc_response_packet(controls: Controls, created_certs: Dict[int, Certificate], failed: bool) PKIResponse¶
Create a CMC response package. Revoke cert(s) if the request had a RevokeRequest(s).
- pkcs11_ca_service.cmc.create_csr_from_crmf(cert_req: CertReqMsg) CertificationRequest¶
Manually handle the CRMF request into a CSR since we use CSR as data type in the database and currently all certs have a csr which the are created from
pkcs11_ca_service.config module¶
Config module
pkcs11_ca_service.crl module¶
Module to handle crls
- class pkcs11_ca_service.crl.Crl(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a crl
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'issuer': <class 'int'>, 'next_update': <class 'str'>, 'pem': <class 'str'>, 'this_update': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'issuer': 'ca(serial)', 'public_key': 'public_key(serial)'}¶
- db_table_name: str = 'crl'¶
- db_unique_fields: List[str] = ['pem']¶
- class pkcs11_ca_service.crl.CrlInput(*, pem: str | None = None, ca_pem: str | None = None)¶
Bases:
InputObjectClass to represent crl matching from HTTP post data
- ca_pem: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'ca_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pem: str | None¶
- async pkcs11_ca_service.crl.search(input_object: InputObject) JSONResponse¶
Get crls matching the input_object pattern(s). Returns a fastAPI json response with a list of the crls.
Parameters: input_object (InputObject): The search pattern data.
Returns: fastapi.responses.JSONResponse
pkcs11_ca_service.csr module¶
Module to handle csrs
- class pkcs11_ca_service.csr.Csr(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a csr
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'pem': <class 'str'>, 'public_key': <class 'int'>}¶
- db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'public_key': 'public_key(serial)'}¶
- db_table_name: str = 'csr'¶
- db_unique_fields: List[str] = ['pem']¶
- class pkcs11_ca_service.csr.CsrInput(*, pem: str | None = None, ca_pem: str | None = None)¶
Bases:
InputObjectClass to represent csr matching from HTTP post data
- ca_pem: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'ca_pem': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pem: str | None¶
- async pkcs11_ca_service.csr.search(input_object: InputObject) JSONResponse¶
Get csrs matching the input_object pattern(s). Returns a fastAPI json response with a list of the csrs.
Parameters: input_object (InputObject): The search pattern data.
Returns: fastapi.responses.JSONResponse
pkcs11_ca_service.error module¶
Module which have all our exceptions
- exception pkcs11_ca_service.error.DBInitFail(message: str = 'DB init fail')¶
Bases:
ExceptionClass to handle DB init
- exception pkcs11_ca_service.error.NoSuchDBObject(message: str = 'No such object in DB')¶
Bases:
ExceptionClass to handle objects not existing in DB
- exception pkcs11_ca_service.error.UnsupportedJWTAlgorithm(message: str = 'Unsupported JWT algorithm')¶
Bases:
ExceptionClass to handle unsupported JTW algorithms
- exception pkcs11_ca_service.error.WrongDataType(message: str = 'Wrong data type')¶
Bases:
ExceptionClass to handle wrong data types
pkcs11_ca_service.main module¶
Main module, FastAPI runs from here
- class pkcs11_ca_service.main.RevokeInput(*, pem: str, reason: int | None = None)¶
Bases:
InputObjectClass to represent revoke specification matching from HTTP post data
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'pem': FieldInfo(annotation=str, required=True), 'reason': FieldInfo(annotation=Union[int, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pem: str¶
- reason: int | None¶
- async pkcs11_ca_service.main.get_acme_directory() JSONResponse¶
ACME GET directory endpoint
Returns: Response
- async pkcs11_ca_service.main.get_acme_new_nonce() Response¶
ACME GET new-nonce endpoint
Returns: Response
- async pkcs11_ca_service.main.get_ca(ca_path: str) Response¶
/ca, GET method.
Get a ca.
Parameters: ca_path (str): The unique CA path.
Returns: fastapi.Response
- async pkcs11_ca_service.main.get_ca_search(request: Request) JSONResponse¶
/search/ca, GET method.
Fetch all cas.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.get_certificate_search(request: Request) JSONResponse¶
/search/certificate, GET method.
Fetch all certificates.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.get_crl(crl_path: str) Response¶
/ca, GET method.
Get a CRL.
Parameters: crl_path (str): The unique CRL path.
Returns: fastapi.Response
- async pkcs11_ca_service.main.get_crl_search(request: Request) JSONResponse¶
/search/crl, GET method.
Fetch all crls.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.get_csr_search(request: Request) JSONResponse¶
/search/csr, GET method.
Fetch all csrs.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.get_healthcheck(request: Request) JSONResponse¶
/healthcheck, GET method.
Do a healthcheck. Sign some data.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.get_new_nonce() Response¶
Get new nonce, GET method.
Returns: fastapi.Response
- async pkcs11_ca_service.main.get_ocsp(ocsp_path: str) Response¶
/ocsp, GET method.
Return an OCSP response.
Parameters: ocsp_path (str): OCSP path.
Returns: fastapi.Response
- async pkcs11_ca_service.main.get_public_key_search(request: Request) JSONResponse¶
/search/public_key, GET method.
Fetch all public keys.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.head_acme_new_nonce() Response¶
ACME HEAD new-nonce endpoint
Returns: Response
- async pkcs11_ca_service.main.head_new_nonce() Response¶
Get new nonce, HEAD method.
Returns: fastapi.Response
- async pkcs11_ca_service.main.is_revoked(request: Request, revoke_input: RevokeInput) JSONResponse¶
/is_revoked, POST method.
If a certificate/CA is revoked
Parameters: request (fastapi.Request): The entire HTTP request. revoke_input (PublicKeyInput): The revoke specification
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_acme_routes(request: Request, background_tasks: BackgroundTasks) Response¶
Handle all acme endpoints except for /directory endpoint
Parameters: request (Request): The http request background_tasks (BackgroundTasks): The list of background task to append to, for example executing challenges
Returns: Response
- async pkcs11_ca_service.main.post_ca(request: Request, ca_input: CaInput) JSONResponse¶
/ca, POST method.
Post/create a new ca.
Parameters: request (fastapi.Request): The entire HTTP request. ca_input (CaInput): The ca data.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_ca_search(request: Request, ca_input: CaInput) JSONResponse¶
/search/ca, POST method.
Fetch all cas which matches the search pattern.
Parameters: request (fastapi.Request): The entire HTTP request. ca_input (CaInput): The search pattern.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_certificate_search(request: Request, certificate_input: CertificateInput) JSONResponse¶
/search/certificate, POST method.
Fetch all certificates which matches the search pattern.
Parameters: request (fastapi.Request): The entire HTTP request. certificate_input (CertificateInput): The search pattern.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_cmc(request: Request) Response¶
CMC fixme
- async pkcs11_ca_service.main.post_crl(request: Request, crl_input: CrlInput) JSONResponse¶
/crl, POST method.
Create a new crl.
Parameters: request (fastapi.Request): The entire HTTP request. crl_input (CrlInput): The crl data.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_crl_search(request: Request, crl_input: CrlInput) JSONResponse¶
/search/crl, POST method.
Fetch all crls which matches the search pattern.
Parameters: request (fastapi.Request): The entire HTTP request. crl_input (CrlInput): The search pattern.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_csr_search(request: Request, csr_input: CsrInput) JSONResponse¶
/search/csr, POST method.
Fetch all csrs which matches the search pattern.
Parameters: request (fastapi.Request): The entire HTTP request. csr_input (CsrInput): The search pattern.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_ocsp(request: Request) Response¶
/ocsp, POST method.
Return an OCSP response.
Parameters: request (fastapi.Request): The entire HTTP request.
Returns: fastapi.Response
- async pkcs11_ca_service.main.post_pkcs11_sign(request: Request) JSONResponse¶
/pkcs11_sign, POST method.
- async pkcs11_ca_service.main.post_public_key(request: Request, public_key_input: PublicKeyInput) JSONResponse¶
/public_key, POST method.
Post/create a new public key.
Parameters: request (fastapi.Request): The entire HTTP request. public_key_input (PublicKeyInput): The public key data.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_public_key_search(request: Request, public_key_input: PublicKeyInput) JSONResponse¶
/search/public_key, POST method.
Fetch all public keys which matches the search pattern.
Parameters: request (fastapi.Request): The entire HTTP request. public_key_input (PublicKeyInput): The search pattern.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_revoke(request: Request, revoke_input: RevokeInput) JSONResponse¶
/revoke, POST method.
Revoke a certificate/CA as a CA is really just a certificate.
Parameters: request (fastapi.Request): The entire HTTP request. revoke_input (PublicKeyInput): The revoke specification
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_sign_csr(request: Request, csr_input: CsrInput) JSONResponse¶
/sign_csr, POST method.
Post/create a new csr, will create and return certificate.
If a cert has CA:TRUE then treat it as a normal cert since we dont have its private key
Parameters: request (fastapi.Request): The entire HTTP request. csr_input (CsrInput): The csr data.
Returns: fastapi.responses.JSONResponse
- async pkcs11_ca_service.main.post_timestamp(request: Request) Response¶
Timestamp fixme
pkcs11_ca_service.nonce module¶
Module which handle nonces
- pkcs11_ca_service.nonce.generate_nonce() str¶
Generate, store and returns a nonce
Returns: str
- pkcs11_ca_service.nonce.hash_nonce(nonce_base64url: str) str¶
Hash a nonce, we store nonces hashed by sha256 for the same reason as hashing passwords.
Parameters: nonce_base64url (str): base64url encoded nonce to be hashed.
Returns: str
- pkcs11_ca_service.nonce.nonce_response(status_code: int = 200) Response¶
Create HTTP response containing a new nonce.
Returns: fastapi.Response
- async pkcs11_ca_service.nonce.verify_nonce(nonce: str) bool¶
Verify a nonce, if verified then delete it from the list of nonces.
Parameters: nonce (str): base64url encoded nonce to be verified.
Returns: bool
pkcs11_ca_service.ocsp module¶
OCSP functions
- async pkcs11_ca_service.ocsp.ocsp_response(request: bytes, encoded: bool = False) bytes¶
Create an OCSP response
Parameters: request (bytes): OCSP request in bytes. encoded (str): If the request is base64 + url encoded
Returns: bytes
pkcs11_ca_service.pkcs11_key module¶
Module to handle pkcs11 keys
- class pkcs11_ca_service.pkcs11_key.Pkcs11Key(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a PKCS11 key
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'authorized_by': <class 'int'>, 'created': <class 'str'>, 'key_label': <class 'str'>, 'key_type': <class 'str'>, 'public_key': <class 'int'>}¶
- db_reference_fields: Dict[str, str] = {'authorized_by': 'public_key(serial)', 'public_key': 'public_key(serial)'}¶
- db_table_name: str = 'pkcs11_key'¶
- db_unique_fields: List[str] = ['public_key', 'key_label']¶
- class pkcs11_ca_service.pkcs11_key.Pkcs11KeyInput(*, key_label: str | None = None, serial: int | None = None, public_key: int | None = None, key_type: str | None = None)¶
Bases:
InputObjectClass to represent PKCS11 key matching from HTTP post data
- key_label: str | None¶
- key_type: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'key_label': FieldInfo(annotation=Union[str, NoneType], required=False), 'key_type': FieldInfo(annotation=Union[str, NoneType], required=False), 'public_key': FieldInfo(annotation=Union[int, NoneType], required=False), 'serial': FieldInfo(annotation=Union[int, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- public_key: int | None¶
- serial: int | None¶
pkcs11_ca_service.pkcs11_sign module¶
Module to handle /pkcs11_sign endpoint
- async pkcs11_ca_service.pkcs11_sign.pkcs11_sign(request: Dict[str, Any]) JSONResponse¶
Sign input data in the request schema format using a key in a pkcs11 device. Raises HTTP error if operation or data formats failed/invalid.
Parameters: request (Dict[str, Any]): The input data.
Returns: fastapi.responses.JSONResponse
- pkcs11_ca_service.pkcs11_sign.validate_input(request: Dict[str, Any]) None¶
Validate the json input with our schema, raises HTTP error 400 if invalid
Parameters: request (Dict[str, Any]): The input.
Returns: None
pkcs11_ca_service.postgres_db module¶
Module which handles database queries
- class pkcs11_ca_service.postgres_db.PostgresDB¶
Bases:
DataBaseObjectClass to use with Postgres DB
- async classmethod delete(table_name: str, unique_field: str, data: str | int) None¶
Delete data object in DB.
Parameters: table_name (str): Name of the DB table. unique_field (str): Name of a unique_field. data (Union[str, int]): The data in the unique field to delete.
Returns: None
- async classmethod load(table_name: str, input_search: Dict[str, str | int], fields: List[str], unique_fields: List[str]) List[Dict[str, str | int]]¶
Load data objects from DB, returns a list of dict with fields as keys
Parameters: table_name (str): Name of the DB table. input_search (Dict[str, Union[str, int]]): Data to_search for. fields (List[str]): Fields to retrieve. unique_fields (List[str]): Data for the unique fields.
Returns: List[Dict[str, Union[str, int]]]
- pool: Pool¶
- async classmethod revoke_data_for_ca(ca_serial: int) Dict[str, str]¶
Get last CRL for an CA
Parameters: pem (str): The database row serial for the CA
Returns: Dict[str, str]
- async classmethod save(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) int¶
Save data object in DB. Return the DB ID for the data object.
Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.
Returns: int
- async classmethod startup(tables: List[str], fields: List[Dict[str, Type[str] | Type[int]]], reference_fields: List[Dict[str, str]], unique_fields: List[List[str]]) bool¶
Startup for the database. Creates all tables, create and insert the root ca and healthcheck PKCS11 key, if not exists and loads trusted admin keys, return true if DB startup ok
Parameters: tables (List[str]): Table names. fields (List[Dict[str, Union[Type[str], Type[int]]]]): Fields and their data type. reference_fields (List[Dict[str, str]]): Reference field names and their references. unique_fields (List[List[str]]): Unique field names.
Returns: bool
- async classmethod update(table_name: str, fields: Dict[str, str | int], unique_fields: List[str]) None¶
Update data object in DB.
Parameters: table_name (str): Name of the DB table. fields (Dict[str, Union[str, int]]): Data for the fields. unique_fields (List[str]): Data for the unique fields.
Returns: None
pkcs11_ca_service.public_key module¶
Module to handle public keys
- class pkcs11_ca_service.public_key.PublicKey(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a public key
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'admin': <class 'int'>, 'authorized_by': <class 'int'>, 'created': <class 'str'>, 'fingerprint': <class 'str'>, 'info': <class 'str'>, 'pem': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {}¶
- db_table_name: str = 'public_key'¶
- db_unique_fields: List[str] = ['pem', 'fingerprint']¶
- class pkcs11_ca_service.public_key.PublicKeyInput(*, pem: str | None = None, fingerprint: str | None = None, admin: int | None = None)¶
Bases:
InputObjectClass to represent public key matching from HTTP post data
- admin: int | None¶
- fingerprint: str | None¶
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'admin': FieldInfo(annotation=Union[int, NoneType], required=False), 'fingerprint': FieldInfo(annotation=Union[str, NoneType], required=False), 'pem': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- pem: str | None¶
- async pkcs11_ca_service.public_key.search(input_object: InputObject) JSONResponse¶
Get public keys matching the input_object pattern(s). Returns a fastAPI json response with a list of the public keys.
Parameters: input_object (InputObject): The search pattern data.
Returns: fastapi.responses.JSONResponse
pkcs11_ca_service.route_functions module¶
Route functions
- async pkcs11_ca_service.route_functions.ca_request(ca_input: CaInput) Ca¶
Get CA object.
Parameters: ca_input (CaInput): CA input object.
Returns: Ca
- async pkcs11_ca_service.route_functions.crl_request(auth_by: int, issuer_obj: Ca) str¶
Get CRL.
Parameters: auth_by (int): Who is the author, comes from the auth.py file issuer_object (Ca): CA object of its issuer,
Returns: str
- async pkcs11_ca_service.route_functions.healthcheck() JSONResponse¶
Healthcheck, query the DB, sign som data. If fail then send http status code 503.
Returns: JSONResponse
- async pkcs11_ca_service.route_functions.pkcs11_key_request(pkcs11_key_input: Pkcs11KeyInput) Pkcs11Key¶
Get pkcs11 key object.
Parameters: pkcs11_key_input (Pkcs11KeyInput): PKCS11 key input object.
Returns: Pkcs11Key
- async pkcs11_ca_service.route_functions.public_key_request(public_key_input: PublicKeyInput) PublicKey¶
Get public key object.
Parameters: public_key_input (CaInput): Public key input object.
Returns: PublicKey
- async pkcs11_ca_service.route_functions.sign_csr(auth_by: int, issuer_obj: Ca, csr_obj: Csr, public_key_obj: PublicKey) str¶
- Parameters:
auth_by – DB public key id.
issuer_obj – Which CA should sign the CSR.
csr_obj – Which CSR object will be signed.
public_key_obj – Which public key object created this csr.
- Returns:
str
pkcs11_ca_service.startup module¶
Startup module
- async pkcs11_ca_service.startup.startup() None¶
Startup main function
Returns: None
pkcs11_ca_service.timestamp module¶
Timestamp functions
- class pkcs11_ca_service.timestamp.Timestamp(kwargs: Dict[str, str | int])¶
Bases:
DataClassObjectClass to represent a timestamp
- cert_key_label: str¶
- created: str¶
- db: DataBaseObject¶
- db_fields: Dict[str, Type[str] | Type[int]] = {'cert_key_label': <class 'str'>, 'created': <class 'str'>, 'gen_time': <class 'str'>, 'serial_number': <class 'str'>}¶
- db_reference_fields: Dict[str, str] = {}¶
- db_table_name: str = 'timestamp'¶
- db_unique_fields: List[str] = ['serial_number']¶
- gen_time: str¶
- serial_number: str¶
- class pkcs11_ca_service.timestamp.TimestampInput(*, serial_number: str | None = None)¶
Bases:
InputObjectClass to represent a timestamp matching from HTTP post data
- model_config: ClassVar[ConfigDict] = {}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'serial_number': FieldInfo(annotation=Union[str, NoneType], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- serial_number: str | None¶
- async pkcs11_ca_service.timestamp.create_timestamp_certificate(issuer_path: str, key_label: str, subject_name: Dict[str, str], key_type: str, signer_key_label: str, signer_subject_name: Dict[str, str], signer_key_type: str) str¶
Create a timestamp certificate signed by a PKCS11 key. Returns the PEM encoded certificate
Parameters: issuer_path (str): The path for CRL and AIA extension to write into the cert key_label (str): The pkcs11 key label to create the certificate’s key. subject_name (Dict[str, str]): The certificate’s subject name dict. key_type (str): The certificate’s key type. signer_key_label (str): The pkcs11 key label to sign the certificate. signer_subject_name (Dict[str, str]): The certificate’s issuers subject name dict. signer_key_type (str): The certificate’s signers key type.
Returns: str
- async pkcs11_ca_service.timestamp.create_timestamp_response(hashed_message: bytes, nonce: int | None = None) bytes¶
Create a timestamp certificate.
Parameters: hashed_message (bytes): The hashed message. nonce (Union[int, None]): The request nonce.
Returns: bytes
- async pkcs11_ca_service.timestamp.create_timestamp_response_packet(hashed_message: bytes, nonce: int | None) TSTInfo¶
Create a timestamp response asn1 package
Parameters: hashed_message (bytes): The timestamp request. nonce (Union[int, None]): The nonce
Returns: asn1crypto.tsp.TSTInfo
- async pkcs11_ca_service.timestamp.create_timestamp_signed_data(hashed_message: bytes, nonce: int | None) ContentInfo¶
Create a timestamp signed data asn1 structure
Parameters: hashed_message (bytes): The timestamp request. nonce (Union[int, None]): The nonce
Returns: asn1crypto.cms.ContentInfo
- async pkcs11_ca_service.timestamp.timestamp_handle_request(data: bytes) bytes¶
Handle and respond to a timestamp request.
Parameters: data (bytes): The timestamp request.
Returns: bytes
pkcs11_ca_service.validate module¶
Module to validate data
- pkcs11_ca_service.validate.validate_input_string(input_string: str) None¶
Validate an input string. We only allow letters, numbers, [+, -, _, /, =, ‘ ‘, ] and swedish [å, ä, ö]
Parameters: input_string (str): Input data.
Returns: None
Module contents¶
Python CA service