Skip to content

Commit

Permalink
rename RsaPublic/PrivateKey -> Public/PrivateKey
Browse files Browse the repository at this point in the history
this is in preparation of supporting other key types

CMK-15083

Change-Id: I06ea07767977a9e05721afbfd0e76158809931fd
  • Loading branch information
hrantzsch authored and Zatcmk committed Nov 22, 2023
1 parent 5d42be1 commit befdf2c
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 69 deletions.
6 changes: 2 additions & 4 deletions cmk/gui/type_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
CertificatePEM,
CertificateWithPrivateKey,
EncryptedPrivateKeyPEM,
RsaPrivateKey,
PrivateKey,
)
from cmk.utils.crypto.password import Password, PasswordHash
from cmk.utils.labels import Labels
Expand Down Expand Up @@ -676,9 +676,7 @@ class Key(BaseModel):
def to_certificate_with_private_key(self, passphrase: Password) -> CertificateWithPrivateKey:
return CertificateWithPrivateKey(
certificate=Certificate.load_pem(CertificatePEM(self.certificate)),
private_key=RsaPrivateKey.load_pem(
EncryptedPrivateKeyPEM(self.private_key), passphrase
),
private_key=PrivateKey.load_pem(EncryptedPrivateKeyPEM(self.private_key), passphrase),
)

def fingerprint(self, algorithm: HashAlgorithm) -> str:
Expand Down
4 changes: 2 additions & 2 deletions cmk/gui/valuespec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7804,7 +7804,7 @@ def _generate_ssh_key() -> SSHKeyPairValue:
# TODO: This method is the only reason we have to offer dump_legacy_pkcs1. Can we use
# dump_pem instead? The only difference is "-----BEGIN RSA PRIVATE KEY-----" (pkcs1) vs
# "-----BEGIN PRIVATE KEY-----".
key = certificate.RsaPrivateKey.generate(4096)
key = certificate.PrivateKey.generate(4096)
private_key = key.dump_legacy_pkcs1().str
public_key = key.public_key.dump_openssh()
return (private_key, public_key)
Expand Down Expand Up @@ -8016,7 +8016,7 @@ def _validate_private_key(value: str, varprefix: str) -> None:
raise MKUserError(varprefix, _("Encrypted private keys are not supported"))

try:
certificate.RsaPrivateKey.load_pem(certificate.PlaintextPrivateKeyPEM(value))
certificate.PrivateKey.load_pem(certificate.PlaintextPrivateKeyPEM(value))
except Exception:
raise MKUserError(varprefix, _("Invalid private key"))

Expand Down
12 changes: 6 additions & 6 deletions cmk/utils/backup/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
Certificate,
CertificatePEM,
EncryptedPrivateKeyPEM,
RsaPrivateKey,
RsaPublicKey,
PrivateKey,
PublicKey,
)
from cmk.utils.crypto.deprecated import (
AesCbcCipher,
Expand Down Expand Up @@ -136,12 +136,12 @@ def _get_encrypted_chunk(self) -> tuple[bytes, bool]:

return self._cipher.update(chunk), was_last_chunk

def _get_encryption_public_key(self, key_id: bytes) -> RsaPublicKey:
def _get_encryption_public_key(self, key_id: bytes) -> PublicKey:
key = self._get_key_spec(key_id)
return Certificate.load_pem(CertificatePEM(key["certificate"])).public_key

# logic from http://stackoverflow.com/questions/6309958/encrypting-a-file-with-rsa-in-python
def _derive_key(self, pubkey: RsaPublicKey, key_length: int) -> tuple[bytes, bytes]:
def _derive_key(self, pubkey: PublicKey, key_length: int) -> tuple[bytes, bytes]:
secret_key = os.urandom(key_length)
return secret_key, encrypt_for_rsa_key(pubkey, secret_key)

Expand Down Expand Up @@ -215,7 +215,7 @@ def read_field() -> bytes:

return file_version, encrypted_secret_key

def _get_encryption_private_key(self, key_id: bytes) -> RsaPrivateKey:
def _get_encryption_private_key(self, key_id: bytes) -> PrivateKey:
key = self._get_key_spec(key_id)

try:
Expand All @@ -228,7 +228,7 @@ def _get_encryption_private_key(self, key_id: bytes) -> RsaPrivateKey:
)

try:
return RsaPrivateKey.load_pem(
return PrivateKey.load_pem(
EncryptedPrivateKeyPEM(key["private_key"]), Password(passphrase)
)
except (ValueError, IndexError, TypeError, MKException):
Expand Down
12 changes: 6 additions & 6 deletions cmk/utils/certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Certificate,
CertificateSigningRequest,
CertificateWithPrivateKey,
RsaPrivateKey,
PrivateKey,
X509Name,
)

Expand Down Expand Up @@ -61,8 +61,8 @@ def rsa(self) -> RSAPrivateKey:
def load(cls, path: Path) -> RootCA:
cert = x509.load_pem_x509_certificate(pem_bytes := path.read_bytes())
key = load_pem_private_key(pem_bytes, None)
assert isinstance(key, RSAPrivateKey)
return cls(certificate=Certificate(cert), private_key=RsaPrivateKey(key))
assert isinstance(key, RSAPrivateKey) # TODO
return cls(certificate=Certificate(cert), private_key=PrivateKey(key))

@classmethod
def load_or_create(
Expand All @@ -89,8 +89,8 @@ def issue_new_certificate(
common_name: str,
validity: relativedelta = _DEFAULT_VALIDITY,
key_size: int = _DEFAULT_KEY_SIZE,
) -> tuple[Certificate, RsaPrivateKey]:
new_cert_key = RsaPrivateKey.generate(key_size)
) -> tuple[Certificate, PrivateKey]:
new_cert_key = PrivateKey.generate(key_size)
new_cert_csr = CertificateSigningRequest.create(
subject_name=X509Name.create(common_name=common_name),
subject_private_key=new_cert_key,
Expand Down Expand Up @@ -130,7 +130,7 @@ def write_cert_store(source_dir: Path, store_path: Path) -> None:
def _save_cert_chain(
path_pem: Path,
certificate_chain: Iterable[Certificate],
key: RsaPrivateKey,
key: PrivateKey,
) -> None:
path_pem.parent.mkdir(mode=0o770, parents=True, exist_ok=True)
with path_pem.open(mode="wb") as f:
Expand Down
68 changes: 34 additions & 34 deletions cmk/utils/crypto/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
contains the public key and certificate information, but no private key. Useful for validating
certificates and signatures.
RsaPublicKey/RsaPrivateKey
PublicKey/PrivateKey
probably don't have a direct use case on their own in our code base, at the moment.
"""
Expand Down Expand Up @@ -111,7 +111,7 @@ class CertificateWithPrivateKey(NamedTuple):
"""A bundle of a certificate and its matching private key"""

certificate: Certificate
private_key: RsaPrivateKey
private_key: PrivateKey

@classmethod
def generate_self_signed(
Expand All @@ -127,7 +127,7 @@ def generate_self_signed(
) -> CertificateWithPrivateKey:
"""Generate an RSA private key and create a self-signed certificated for it."""

private_key = RsaPrivateKey.generate(key_size)
private_key = PrivateKey.generate(key_size)
name = X509Name.create(
common_name=common_name,
organization_name=organization or f"Checkmk Site {omd_site()}",
Expand All @@ -147,7 +147,7 @@ def generate_self_signed(
return CertificateWithPrivateKey(certificate, private_key)

@property
def public_key(self) -> RsaPublicKey:
def public_key(self) -> PublicKey:
"""
Convenience accessor to the certificate's public key.
Expand Down Expand Up @@ -177,15 +177,15 @@ def load_combined_file_content(
)
) is None:
raise InvalidPEMError("Could not find encrypted private key")
key = RsaPrivateKey.load_pem(EncryptedPrivateKeyPEM(key_match.group(0)), passphrase)
key = PrivateKey.load_pem(EncryptedPrivateKeyPEM(key_match.group(0)), passphrase)
else:
if (
key_match := re.search(
r"-----BEGIN PRIVATE KEY-----[\s\w+/=]+-----END PRIVATE KEY-----", content
)
) is None:
raise InvalidPEMError("Could not find private key")
key = RsaPrivateKey.load_pem(PlaintextPrivateKeyPEM(key_match.group(0)), None)
key = PrivateKey.load_pem(PlaintextPrivateKeyPEM(key_match.group(0)), None)

return cls(
certificate=cert,
Expand Down Expand Up @@ -232,7 +232,7 @@ def __new__(
certificate_path: Path,
certificate: Certificate,
private_key_path: Path,
private_key: RsaPrivateKey,
private_key: PrivateKey,
) -> PersistedCertificateWithPrivateKey:
"""
Initialize the certificate bundle.
Expand Down Expand Up @@ -263,9 +263,9 @@ def read_files(
# bit verbose, as mypy thinks the PEM-NewTypes are bytes when I try to assign them directly
pk_pem = private_key_path.read_bytes()
if private_key_password is None:
key = RsaPrivateKey.load_pem(PlaintextPrivateKeyPEM(pk_pem))
key = PrivateKey.load_pem(PlaintextPrivateKeyPEM(pk_pem))
else:
key = RsaPrivateKey.load_pem(EncryptedPrivateKeyPEM(pk_pem), private_key_password)
key = PrivateKey.load_pem(EncryptedPrivateKeyPEM(pk_pem), private_key_password)

return PersistedCertificateWithPrivateKey(certificate_path, cert, private_key_path, key)

Expand Down Expand Up @@ -314,15 +314,15 @@ def _create(
cls,
*,
# subject info
subject_public_key: RsaPublicKey,
subject_public_key: PublicKey,
subject_name: X509Name,
subject_alt_dns_names: list[str] | None = None,
# cert properties
expiry: relativedelta,
start_date: datetime,
is_ca: bool = False,
# issuer info
issuer_signing_key: RsaPrivateKey,
issuer_signing_key: PrivateKey,
issuer_name: X509Name,
) -> Certificate:
"""
Expand Down Expand Up @@ -413,10 +413,10 @@ def serial_number_string(self) -> str:
return sn.to_bytes((sn.bit_length() + 7) // 8).hex(":")

@property
def public_key(self) -> RsaPublicKey:
def public_key(self) -> PublicKey:
pk = self._cert.public_key()
assert isinstance(pk, rsa.RSAPublicKey)
return RsaPublicKey(pk)
return PublicKey(pk)

@property
def subject(self) -> X509Name:
Expand Down Expand Up @@ -460,7 +460,7 @@ def verify_is_signed_by(self, signer: Certificate) -> None:
* if the signature scheme is not supported, see below
We assume the signature is made with PKCS1 v1.5 padding, as this is the only scheme
cryptography.io supports for X.509 certificates (see `RsaPublicKey.verify`). This is true
cryptography.io supports for X.509 certificates (see `PublicKey.verify`). This is true
for certificates created with `Certificate._create`, but might not be true for certificates
loaded from elsewhere.
"""
Expand Down Expand Up @@ -611,7 +611,7 @@ def _naive_utcnow() -> datetime:
return datetime.now(tz=timezone.utc).replace(tzinfo=None)


class RsaPrivateKey:
class PrivateKey:
"""
An unencrypted RSA private key.
Expand All @@ -622,25 +622,25 @@ def __init__(self, key: rsa.RSAPrivateKey) -> None:
self._key = key

@classmethod
def generate(cls, key_size: int) -> RsaPrivateKey:
return RsaPrivateKey(rsa.generate_private_key(public_exponent=65537, key_size=key_size))
def generate(cls, key_size: int) -> PrivateKey:
return PrivateKey(rsa.generate_private_key(public_exponent=65537, key_size=key_size))

@overload
@classmethod
def load_pem(cls, pem_data: PlaintextPrivateKeyPEM, password: None = None) -> RsaPrivateKey:
def load_pem(cls, pem_data: PlaintextPrivateKeyPEM, password: None = None) -> PrivateKey:
...

@overload
@classmethod
def load_pem(cls, pem_data: EncryptedPrivateKeyPEM, password: Password) -> RsaPrivateKey:
def load_pem(cls, pem_data: EncryptedPrivateKeyPEM, password: Password) -> PrivateKey:
...

@classmethod
def load_pem(
cls,
pem_data: EncryptedPrivateKeyPEM | PlaintextPrivateKeyPEM,
password: Password | None = None,
) -> RsaPrivateKey:
) -> PrivateKey:
"""
Decode a PKCS8 PEM encoded RSA private key.
Expand All @@ -654,7 +654,7 @@ def load_pem(
TypeError: when trying to load an EncryptedPrivateKeyPEM but no password is given.
This would be caught by mypy though.
>>> RsaPrivateKey.load_pem(EncryptedPrivateKeyPEM(""))
>>> PrivateKey.load_pem(EncryptedPrivateKeyPEM(""))
Traceback (most recent call last):
...
cmk.utils.crypto.certificate.InvalidPEMError
Expand Down Expand Up @@ -682,10 +682,10 @@ def load_pem(
... ])
... )
>>> RsaPrivateKey.load_pem(pem, Password("foo"))
<cmk.utils.crypto.certificate.RsaPrivateKey object at 0x...>
>>> PrivateKey.load_pem(pem, Password("foo"))
<cmk.utils.crypto.certificate.PrivateKey object at 0x...>
>>> RsaPrivateKey.load_pem(pem, Password("not foo"))
>>> PrivateKey.load_pem(pem, Password("not foo"))
Traceback (most recent call last):
...
cmk.utils.crypto.certificate.WrongPasswordError
Expand All @@ -695,7 +695,7 @@ def load_pem(
try:
deserialized = serialization.load_pem_private_key(pem_data.bytes, password=pw)
assert isinstance(deserialized, rsa.RSAPrivateKey)
return RsaPrivateKey(deserialized)
return PrivateKey(deserialized)
except ValueError as exception:
if str(exception) == "Bad decrypt. Incorrect password?":
raise WrongPasswordError
Expand Down Expand Up @@ -744,24 +744,24 @@ def dump_legacy_pkcs1(self) -> PlaintextPrivateKeyPEM:
return PlaintextPrivateKeyPEM(bytes_)

@property
def public_key(self) -> RsaPublicKey:
return RsaPublicKey(self._key.public_key())
def public_key(self) -> PublicKey:
return PublicKey(self._key.public_key())

def sign_data(
self, data: bytes, hash_algorithm: HashAlgorithm = HashAlgorithm.Sha512
) -> Signature:
return Signature(self._key.sign(data, padding.PKCS1v15(), hash_algorithm.value))


class RsaPublicKey:
class PublicKey:
def __init__(self, key: rsa.RSAPublicKey) -> None:
self._key = key

@classmethod
def load_pem(cls, pem_data: PublicKeyPEM) -> RsaPublicKey:
def load_pem(cls, pem_data: PublicKeyPEM) -> PublicKey:
deserialized = serialization.load_pem_public_key(pem_data.bytes)
assert isinstance(deserialized, rsa.RSAPublicKey)
return RsaPublicKey(deserialized)
return PublicKey(deserialized)

def dump_pem(self) -> PublicKeyPEM:
# TODO: Use SubjectPublicKeyInfo format rather than PKCS1. PKCS1 doesn't include an
Expand All @@ -781,7 +781,7 @@ def dump_openssh(self) -> str:
).decode("utf-8")

def __eq__(self, other: object) -> bool:
if not isinstance(other, RsaPublicKey):
if not isinstance(other, PublicKey):
return NotImplemented
return self._key.public_numbers() == other._key.public_numbers()

Expand Down Expand Up @@ -899,7 +899,7 @@ class CertificateSigningRequest:

@classmethod
def create(
cls, subject_name: X509Name, subject_private_key: RsaPrivateKey
cls, subject_name: X509Name, subject_private_key: PrivateKey
) -> CertificateSigningRequest:
"""Create a new Certificate Signing Request
Expand All @@ -918,10 +918,10 @@ def subject(self) -> X509Name:
return X509Name(self.csr.subject)

@property
def public_key(self) -> RsaPublicKey:
def public_key(self) -> PublicKey:
pk = self.csr.public_key()
assert isinstance(pk, rsa.RSAPublicKey)
return RsaPublicKey(pk)
return PublicKey(pk)

@property
def is_signature_valid(self) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions cmk/utils/crypto/deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def unpad_block(block: bytes) -> bytes:
return block[: -block[-1]]


def encrypt_for_rsa_key(recipient_key: certificate.RsaPublicKey, data: bytes) -> bytes:
def encrypt_for_rsa_key(recipient_key: certificate.PublicKey, data: bytes) -> bytes:
"""Deprecated. Do not use."""
return recipient_key._key.encrypt(
data,
Expand All @@ -62,7 +62,7 @@ def encrypt_for_rsa_key(recipient_key: certificate.RsaPublicKey, data: bytes) ->
)


def decrypt_with_rsa_key(recipient_key: certificate.RsaPrivateKey, data: bytes) -> bytes:
def decrypt_with_rsa_key(recipient_key: certificate.PrivateKey, data: bytes) -> bytes:
"""Deprecated. Do not use."""
return recipient_key._key.decrypt(
data,
Expand Down
Loading

0 comments on commit befdf2c

Please sign in to comment.