forked from couchbase/cbgt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcbauth.go
315 lines (257 loc) · 8.28 KB
/
cbauth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Copyright 2019-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbgt
import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"sync"
"sync/atomic"
"unsafe"
"github.com/couchbase/cbauth"
"github.com/couchbase/cbauth/cbauthimpl"
log "github.com/couchbase/clog"
"github.com/couchbase/go-couchbase/cbdatasource"
cbtls "github.com/couchbase/goutils/tls"
)
func init() {
securityCtx = &SecurityContext{
notifiers: make(map[string]ConfigRefreshNotifier),
}
}
var TLSCAFile string
var TLSCertFile string
var TLSKeyFile string
var ClientCertFile string
var ClientKeyFile string
type SecuritySetting struct {
TLSConfig *cbauth.TLSConfig
ClientAuthType *tls.ClientAuthType
EncryptionEnabled bool
DisableNonSSLPorts bool
ServerCertificate tls.Certificate
CACertInBytes []byte
ClientCertificate tls.Certificate
}
var currentSetting unsafe.Pointer = unsafe.Pointer(new(SecuritySetting))
func GetSecuritySetting() *SecuritySetting {
return (*SecuritySetting)(atomic.LoadPointer(¤tSetting))
}
var securityCtx *SecurityContext
// RegisterSecurityNotifications registers for the cbauth's security callbacks
func RegisterSecurityNotifications() {
cbauth.RegisterConfigRefreshCallback(securityCtx.refresh)
}
// SecurityContext let us register multiple tls config
// update callbacks and acts as a wrapper for handling
// config changes.
type SecurityContext struct {
mutex sync.RWMutex
notifiers map[string]ConfigRefreshNotifier
}
const (
AuthChange_encryption = 1 << iota
AuthChange_nonSSLPorts
AuthChange_certificates
AuthChange_clientCertificates
)
// ConfigRefreshNotifier defines the SecuritySetting's refresh
// callback signature
type ConfigRefreshNotifier func(status int) error
func RegisterConfigRefreshCallback(key string, cb ConfigRefreshNotifier) {
securityCtx.mutex.Lock()
securityCtx.notifiers[key] = cb
securityCtx.mutex.Unlock()
log.Printf("cbauth: key: %s registered for tls config updates", key)
}
func (c *SecurityContext) refresh(code uint64) error {
log.Printf("cbauth: received security change notification, code: %v", code)
newSetting := SecuritySetting{}
var encryptionEnabled, disableNonSSLPorts bool
oldSetting := GetSecuritySetting()
if oldSetting != nil {
temp := *oldSetting
newSetting = temp
encryptionEnabled = oldSetting.EncryptionEnabled
disableNonSSLPorts = oldSetting.DisableNonSSLPorts
}
if code&cbauthimpl.CFG_CHANGE_CERTS_TLSCONFIG != 0 {
if err := c.refreshConfigAndCert(&newSetting); err != nil {
return err
}
}
if code&cbauthimpl.CFG_CHANGE_CLUSTER_ENCRYPTION != 0 {
if err := c.refreshEncryption(&newSetting); err != nil {
return err
}
}
if code&cbauthimpl.CFG_CHANGE_CLIENT_CERTS_TLSCONFIG != 0 {
if err := c.refreshClientCert(&newSetting); err != nil {
return err
}
}
atomic.StorePointer(¤tSetting, unsafe.Pointer(&newSetting))
if code&cbauthimpl.CFG_CHANGE_CERTS_TLSCONFIG != 0 ||
encryptionEnabled != newSetting.EncryptionEnabled ||
code&cbauthimpl.CFG_CHANGE_CLIENT_CERTS_TLSCONFIG != 0 {
if err := c.refreshClients(GetSecuritySetting()); err != nil {
return err
}
}
c.mutex.RLock()
// This will notify tlsConfig/limits changes to all the subscribers like
// dcp feeds, http servers and grpc servers;
// Notifying every certificate change irrespective of the encryption status.
var status int
if encryptionEnabled != newSetting.EncryptionEnabled {
status |= AuthChange_encryption
}
if disableNonSSLPorts != newSetting.DisableNonSSLPorts {
status |= AuthChange_nonSSLPorts
}
if code&cbauthimpl.CFG_CHANGE_CERTS_TLSCONFIG != 0 {
status |= AuthChange_certificates
}
if code&cbauthimpl.CFG_CHANGE_CLIENT_CERTS_TLSCONFIG != 0 {
status |= AuthChange_clientCertificates
}
if status != 0 {
for key, notifier := range c.notifiers {
go func(key string, notify ConfigRefreshNotifier) {
log.Printf("cbauth: notifying configs change for key: %v", key)
if err := notify(status); err != nil {
log.Errorf("cbauth: notify failed, for key: %v: err: %v", key, err)
}
}(key, notifier)
}
} else {
log.Printf("cbauth: encryption settings not affected")
}
c.mutex.RUnlock()
return nil
}
func (c *SecurityContext) refreshConfigAndCert(configs *SecuritySetting) error {
tlsConfig, err := cbauth.GetTLSConfig()
if err != nil {
log.Warnf("cbauth: GetTLSConfig failed, err: %v", err)
return err
}
clientAuthType, err := cbauth.GetClientCertAuthType()
if err != nil {
log.Warnf("cbauth: GetClientCertAuthType failed, err: %v", err)
return err
}
configs.TLSConfig = &tlsConfig
configs.ClientAuthType = &clientAuthType
if len(TLSCAFile) == 0 || len(TLSCertFile) == 0 || len(TLSKeyFile) == 0 {
return nil
}
var privateKeyPassphrase []byte
if configs.TLSConfig != nil {
privateKeyPassphrase = configs.TLSConfig.PrivateKeyPassphrase
}
cert, err := cbtls.LoadX509KeyPair(TLSCertFile, TLSKeyFile, privateKeyPassphrase)
if err != nil {
log.Errorf("cbauth: LoadX509KeyPair err: %v", err)
return err
}
certInBytes, err := os.ReadFile(TLSCAFile)
if err != nil {
log.Errorf("cbauth: Certificate read err: %v", err)
return err
}
configs.ServerCertificate = cert
configs.CACertInBytes = certInBytes
return nil
}
func (c *SecurityContext) refreshEncryption(configs *SecuritySetting) error {
cfg, err := cbauth.GetClusterEncryptionConfig()
if err != nil {
log.Warnf("cbauth: GetClusterEncryptionConfig err: %v", err)
return err
}
configs.EncryptionEnabled = cfg.EncryptData
configs.DisableNonSSLPorts = cfg.DisableNonSSLPorts
return nil
}
func (c *SecurityContext) refreshClientCert(configs *SecuritySetting) error {
if len(ClientCertFile) == 0 || len(ClientKeyFile) == 0 {
return nil
}
var clientPrivateKeyPassPhrase []byte
if configs.TLSConfig != nil {
clientPrivateKeyPassPhrase = configs.TLSConfig.ClientPrivateKeyPassphrase
}
cert, err := cbtls.LoadX509KeyPair(ClientCertFile, ClientKeyFile,
clientPrivateKeyPassPhrase)
if err != nil {
log.Errorf("cbauth: LoadX509KeyPair (client cert) err: %v", err)
}
configs.ClientCertificate = cert
return nil
}
func (c *SecurityContext) refreshClients(configs *SecuritySetting) error {
if err := updateSecurityConfig(
configs.EncryptionEnabled, configs.CACertInBytes); err != nil {
log.Warnf("cbauth: Error updating TLS data, err: %v", err)
return err
}
if err := cbdatasource.UpdateSecurityConfig(&cbdatasource.SecurityConfig{
EncryptData: configs.EncryptionEnabled,
DisableNonSSLPorts: configs.DisableNonSSLPorts,
Certificates: []tls.Certificate{configs.ClientCertificate},
RootCAs: FetchRootCAs(),
}); err != nil {
log.Warnf("cbauth: Error updating go-couchbase/cbdatasource's"+
" TLS data, err: %v", err)
return err
}
// Close all cached couchbase.Bucket instances, so new ones can be
// setup with the new config.
statsCBBktMap.closeAllCouchbaseBuckets()
return nil
}
// ----------------------------------------------------------------
// security config for gocbcore DCP Agents
type securityConfig struct {
encryptData bool
rootCAs *x509.CertPool
}
var currSecurityConfigMutex sync.RWMutex
var currSecurityConfig *securityConfig
func init() {
currSecurityConfig = &securityConfig{}
}
func updateSecurityConfig(encryptData bool, caCertInBytes []byte) error {
currSecurityConfigMutex.Lock()
defer currSecurityConfigMutex.Unlock()
currSecurityConfig.encryptData = encryptData
if encryptData {
rootCAs := x509.NewCertPool()
ok := rootCAs.AppendCertsFromPEM(caCertInBytes)
if !ok || rootCAs == nil {
log.Warnf("updateSecurityConfig: error appending certificate(s): %v", ok)
return fmt.Errorf("error obtaining certificate(s)")
}
currSecurityConfig.rootCAs = rootCAs
}
// force reconnect cached gocbcore.Agents and DCPAgents
statsAgentsMap.forceReconnectAgents()
dcpAgentMap.forceReconnectAgents()
return nil
}
func FetchRootCAs() *x509.CertPool {
var rootCAs *x509.CertPool
currSecurityConfigMutex.RLock()
if currSecurityConfig.encryptData {
rootCAs = currSecurityConfig.rootCAs
}
currSecurityConfigMutex.RUnlock()
return rootCAs
}