This repository has been archived by the owner on Nov 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathca_client.py
275 lines (224 loc) · 10.7 KB
/
ca_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements the requires side handling of the 'tls-certificates' interface.
`CAClient`_ is the type providing integration with a certificate authority
charm providing the tls-certificates interface::
from pathlib import Path
from ops.charm import CharmBase
from ops.lib.tls_certificates.ca_client import CAClient
class MyCharm(CharmBase):
TLS_CONFIG_PATH = Path("/tls/config/path/for/your/app")
TLS_KEY_PATH = TLS_CONFIG_PATH / 'key.pem'
TLS_CERT_PATH = TLS_CONFIG_PATH / 'cert.pem'
TLS_CA_CERT_PATH = TLS_CONFIG_PATH / 'ca.pem'
def __init__(self, *args):
super().__init__(*args)
self.ca_client = CAClient(self, 'ca-client')
self.framework.observe(self.ca_client.on.tls_config_ready,
self._on_tls_config_ready)
self.framework.observe(self.ca_client.on.ca_available,
self._on_ca_available)
def _on_ca_available(self, event):
# Obtain a common name and a list of subject alternative names to
# place into certificates to be generated by a certificate
# authority and expose them to the CA.
self.ca_client.request_server_certificate(common_name, sans)
def _on_tls_config_ready(self, event):
# When TLS config is ready, a CA certificate, requested certificate
# and key will be available from an instance of CAClient. It can be
# written to the target files and used by the target application.
self.TLS_KEY_PATH.write_bytes(self.ca_client.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
self.TLS_CERT_PATH.write_bytes(
self.ca_client.certificate.public_bytes(encoding=serialization.Encoding.PEM))
self.TLS_CA_CERT_PATH.write_bytes(
self.ca_client.ca_certificate.public_bytes(encoding=serialization.Encoding.PEM))
# Reconfigure and reload your application after writing to files.
"""
import json
import logging
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import load_pem_x509_certificate
from ops.framework import (
Object,
EventBase,
EventSetBase,
EventSource,
StoredState
)
from ops.model import ModelError, BlockedStatus, WaitingStatus
logger = logging.getLogger(__name__)
class TLSCertificatesError(ModelError):
"""A base class for all errors raised by interface-tls-certificates.
The error provides the attribute self.status to indicate what status and
message the Unit should use based on the status of this relation. For
example, if there is no relation to a CA, it will raise a
BlockedStatus('Missing relation <relation-name>')
"""
def __init__(self, kind, message, relation_name):
super().__init__()
self.status = kind('{}: {}'.format(message, relation_name))
class CAClientError(TLSCertificatesError):
"""An error specific to the CAClient class"""
class CAAvailable(EventBase):
"""Event emitted by CAClient.on.ca_available.
This event will be emitted by CAClient when a new unit of a CA charm
joins the relation. If there are multiple units joining one by one,
multiple events will be triggered.
The expected response from a handler of that event is to request a
certificate from the CA via the API provided by CAClient.
"""
class TLSConfigReady(EventBase):
"""Event emitted by CAClient.on.tls_config_ready.
This event will be emitted by CAClient when a remote CA unit.
The expected response from a handler of that event is to request a
certificate from the CA via the API provided by CAClient.
"""
class CAClientEvents(EventSetBase):
"""Events emitted by the CAClient class."""
ca_available = EventSource(CAAvailable)
tls_config_ready = EventSource(TLSConfigReady)
class CAClient(Object):
"""Provides a client type that handles the interaction with CA charms.
It mainly provides:
* an indication that a CA unit is available to accept requests
for certificates;
* a method to provide details (CN, SANs) to the CA for
generating certificates;
* an indication that a certificate and a key have been generated;
* a way to retrieve the generated certificate and key as well as
a CA certificate.
"""
on = CAClientEvents()
_stored = StoredState()
def __init__(self, charm, relation_name):
"""
:param charm: the charm object to be used as a parent object.
:type charm: :class: `ops.charm.CharmBase`
"""
super().__init__(charm, relation_name)
self._relation_name = relation_name
self._common_name = None
self._sans = None
self._stored.set_default(ca_certificate=None, key=None,
certificate=None)
self.framework.observe(charm.on[relation_name].relation_joined,
self._on_relation_joined)
self.framework.observe(charm.on[relation_name].relation_changed,
self._on_relation_changed)
@property
def is_joined(self):
rel = self.framework.model.get_relation(self._relation_name)
return rel is not None
@property
def is_ready(self):
return all(p is not None for p in (self._stored.certificate,
self._stored.key,
self._stored.ca_certificate))
@property
def _is_certificate_requested(self):
rel = self.framework.model.get_relation(self._relation_name)
if rel is None:
return False
common_name = rel.data[self.framework.model.unit].get('common_name')
sans = rel.data[self.framework.model.unit].get('sans')
if common_name is None or sans is None:
return False
return True
@property
def certificate(self):
if not self._is_certificate_requested:
raise CAClientError(BlockedStatus,
'a certificate request has not been sent',
self._relation_name)
if self._stored.certificate is None:
raise CAClientError(WaitingStatus,
'a certificate has not been obtained yet.',
self._relation_name)
return load_pem_x509_certificate(
self._stored.certificate.encode('utf-8'),
backend=default_backend())
@property
def key(self):
if not self._is_certificate_requested:
raise CAClientError(BlockedStatus,
'a certificate request has not been sent',
self._relation_name)
if self._stored.key is None:
raise CAClientError(WaitingStatus,
'a key has not been obtained yet.',
self._relation_name)
return load_pem_private_key(self._stored.key.encode('utf-8'),
password=None,
backend=default_backend())
@property
def ca_certificate(self):
if not self._is_certificate_requested:
raise CAClientError(BlockedStatus,
'a certificate request has not been sent',
self._relation_name)
if self._stored.ca_certificate is None:
raise CAClientError(WaitingStatus,
'a CA certificate has not been obtained yet.',
self._relation_name)
return load_pem_x509_certificate(
self._stored.ca_certificate.encode('utf-8'),
backend=default_backend())
def _on_relation_joined(self, event):
self.on.ca_available.emit()
def request_server_certificate(self, common_name, sans):
"""Request a new server certificate.
If arguments have not changed from a previous request, then a different
certificate will not be generated. This method can be useful if a list
of SANS has changed during the lifetime of a charm and a new
certificate needs to be generated.
:param common_name: a new common name to use in a certificate.
:type common_name: str
:param sans: a list of Subject Alternative Names to use in a
certificate.
:type common_name: list(str)
"""
rel = self.framework.model.get_relation(self._relation_name)
if rel is None:
raise CAClientError(BlockedStatus, 'missing relation',
self._relation_name)
logger.info('Requesting a CA certificate. Common name: {}, SANS: {}'
''.format(common_name, sans))
rel_data = rel.data[self.model.unit]
rel_data['common_name'] = common_name
rel_data['sans'] = json.dumps(sans)
def _on_relation_changed(self, event):
# easy-rsa is not HA so there is only one unit to work with and Vault
# uses one leader unit to write responses and does not (at the time
# of writing) rely on app relation data.
remote_data = event.relation.data[event.unit]
munged_name = self.model.unit.name.replace("/", "_")
cert = remote_data.get('{}.server.cert'.format(munged_name))
key = remote_data.get('{}.server.key'.format(munged_name))
ca = remote_data.get('ca')
if not self._is_certificate_requested:
return
if cert is None or key is None or ca is None:
message = ('A CA has not yet exposed a requested certificate, '
'key and CA certificate.')
logger.info(message)
raise CAClientError(WaitingStatus, message, self._relation_name)
self._stored.certificate = cert
self._stored.key = key
self._stored.ca_certificate = ca
self.on.tls_config_ready.emit()