Skip to content

Commit

Permalink
Add Approve Worker Nodes CSRs (#713)
Browse files Browse the repository at this point in the history
This commit adds an endpoint to approve CSRs from worker nodes via the CAPI provider
  • Loading branch information
mateoflorido authored Oct 18, 2024
1 parent 82985e3 commit db9fdd6
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22.6
require (
dario.cat/mergo v1.0.0
github.com/canonical/go-dqlite v1.22.0
github.com/canonical/k8s-snap-api v1.0.9
github.com/canonical/k8s-snap-api v1.0.10
github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230
github.com/canonical/microcluster/v3 v3.0.0-20240827143335-f7a4d3984970
github.com/go-logr/logr v1.4.2
Expand Down
4 changes: 2 additions & 2 deletions src/k8s/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 h1:nvj0OLI3YqYXe
github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE=
github.com/canonical/go-dqlite v1.22.0 h1:DuJmfcREl4gkQJyvZzjl2GHFZROhbPyfdjDRQXpkOyw=
github.com/canonical/go-dqlite v1.22.0/go.mod h1:Uvy943N8R4CFUAs59A1NVaziWY9nJ686lScY7ywurfg=
github.com/canonical/k8s-snap-api v1.0.9 h1:WhbyVtnR0GIAdY1UYBIzkspfgodxrHjlpT9FbG4NIu4=
github.com/canonical/k8s-snap-api v1.0.9/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY=
github.com/canonical/k8s-snap-api v1.0.10 h1:BoAw4Vr8mR8MWTKeZZxH5LmrF3JYGSZHDv+KEo5ifoU=
github.com/canonical/k8s-snap-api v1.0.10/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY=
github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230 h1:YOqZ+/14OPZ+/TOXpRHIX3KLT0C+wZVpewKIwlGUmW0=
github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230/go.mod h1:YVGI7HStOKsV+cMyXWnJ7RaMPaeWtrkxyIPvGWbgACc=
github.com/canonical/microcluster/v3 v3.0.0-20240827143335-f7a4d3984970 h1:UrnpglbXELlxtufdk6DGDytu2JzyzuS3WTsOwPrkQLI=
Expand Down
87 changes: 87 additions & 0 deletions src/k8s/pkg/k8sd/api/capi_certificate_refresh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package api

import (
"fmt"
"net/http"

apiv1 "github.com/canonical/k8s-snap-api/api/v1"
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/lxd/lxd/response"
"github.com/canonical/microcluster/v3/state"
"golang.org/x/sync/errgroup"
certv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// postApproveWorkerCSR approves the worker node CSR for the specified seed.
// The certificate approval process follows these steps:
// 1. The CAPI provider calls the /x/capi/refresh-certs/plan endpoint from a
// worker node, which generates a CSR and creates a CertificateSigningRequest
// object in the cluster.
// 2. The CAPI provider then calls the /k8sd/refresh-certs/run endpoint with
// the seed. This endpoint waits until the CSR is approved and the certificate
// is signed. Note that this is a blocking call.
// 3. The CAPI provider calls the /x/capi/refresh-certs/approve endpoint from
// any control plane node to approve the CSR.
// 4. The /x/capi/refresh-certs/run endpoint completes and returns once the
// certificate is approved and signed.
func (e *Endpoints) postApproveWorkerCSR(s state.State, r *http.Request) response.Response {
snap := e.provider.Snap()

req := apiv1.ClusterAPIApproveWorkerCSRRequest{}

if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil {
return response.BadRequest(fmt.Errorf("failed to parse request: %w", err))
}

if err := r.Body.Close(); err != nil {
return response.InternalError(fmt.Errorf("failed to close request body: %w", err))
}

client, err := snap.KubernetesClient("")
if err != nil {
return response.InternalError(fmt.Errorf("failed to get Kubernetes client: %w", err))
}

g, ctx := errgroup.WithContext(r.Context())

// CSR names
csrNames := []string{
fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed),
fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed),
fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed),
}

for _, csrName := range csrNames {
g.Go(func() error {
if err := client.WatchCertificateSigningRequest(
ctx,
csrName,
func(request *certv1.CertificateSigningRequest) (bool, error) {
request.Status.Conditions = append(request.Status.Conditions, certv1.CertificateSigningRequestCondition{
Type: certv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "ApprovedByCK8sCAPI",
Message: "This CSR was approved by the Canonical Kubernetes CAPI Provider",
LastUpdateTime: metav1.Now(),
})
_, err := client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csrName, request, metav1.UpdateOptions{})
if err != nil {
return false, fmt.Errorf("failed to update CSR %s: %w", csrName, err)
}
return true, nil
},
); err != nil {
return fmt.Errorf("certificate signing request failed: %w", err)
}
return nil
})
}

if err := g.Wait(); err != nil {
return response.InternalError(fmt.Errorf("failed to approve worker node CSR: %w", err))
}

return response.SyncResponse(true, apiv1.ClusterAPIApproveWorkerCSRResponse{})
}
68 changes: 57 additions & 11 deletions src/k8s/pkg/k8sd/api/certificates_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,17 @@ func refreshCertsRunControlPlane(s state.State, r *http.Request, snap snap.Snap)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

if err := <-readyCh; err != nil {
log.Error(err, "Failed to refresh certificates")
select {
case err := <-readyCh:
if err != nil {
log.Error(err, "Failed to refresh certificates")
return
}
case <-ctx.Done():
log.Error(ctx.Err(), "Timeout waiting for certificates to be refreshed")
return
}

if err := snaputil.RestartControlPlaneServices(ctx, snap); err != nil {
log.Error(err, "Failed to restart control plane services")
}
Expand Down Expand Up @@ -325,22 +332,61 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo
return response.InternalError(fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err))
}

// Restart the services
if err := snap.RestartService(r.Context(), "kubelet"); err != nil {
return response.InternalError(fmt.Errorf("failed to restart kubelet: %w", err))
}
if err := snap.RestartService(r.Context(), "kube-proxy"); err != nil {
return response.InternalError(fmt.Errorf("failed to restart kube-proxy: %w", err))
}
// NOTE: Restart the worker services in a separate goroutine to avoid
// restarting the kube-proxy and kubelet, which would break the
// proxy connection and cause missed responses in the proxy side.
readyCh := make(chan error, 1)
go func() {
// NOTE: Create a new context independent of the request context to ensure
// the restart process is not cancelled by the client.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

select {
case err := <-readyCh:
if err != nil {
log.Error(err, "Failed to refresh certificates")
return
}
case <-ctx.Done():
log.Error(ctx.Err(), "Timeout waiting for certificates to be refreshed")
return
}

if err := snap.RestartService(ctx, "kubelet"); err != nil {
log.Error(err, "Failed to restart kubelet")
}
if err := snap.RestartService(ctx, "kube-proxy"); err != nil {
log.Error(err, "Failed to restart kube-proxy")
}
}()

cert, _, err := pkiutil.LoadCertificate(certificates.KubeletCert, "")
if err != nil {
return response.InternalError(fmt.Errorf("failed to load kubelet certificate: %w", err))
}

expirationTimeUNIX := cert.NotAfter.Unix()
return response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{
ExpirationSeconds: int(expirationTimeUNIX),

return response.ManualResponse(func(w http.ResponseWriter) (rerr error) {
defer func() {
readyCh <- rerr
close(readyCh)
}()
err := response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{
ExpirationSeconds: int(expirationTimeUNIX),
}).Render(w)
if err != nil {
return fmt.Errorf("failed to render response: %w", err)
}

f, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("ResponseWriter is not type http.Flusher")
}

f.Flush()
return nil
})

}
Expand Down
5 changes: 5 additions & 0 deletions src/k8s/pkg/k8sd/api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ func (e *Endpoints) Endpoints() []rest.Endpoint {
Path: apiv1.ClusterAPICertificatesRunRPC,
Post: rest.EndpointAction{Handler: e.postRefreshCertsRun, AccessHandler: e.ValidateNodeTokenAccessHandler("node-token"), AllowUntrusted: true},
},
{
Name: "ClusterAPI/RefreshCerts/Approve",
Path: apiv1.ClusterAPIApproveWorkerCSRRPC,
Post: rest.EndpointAction{Handler: e.postApproveWorkerCSR, AccessHandler: ValidateCAPIAuthTokenAccessHandler("capi-auth-token"), AllowUntrusted: true},
},
// Snap refreshes
{
Name: "Snap/Refresh",
Expand Down

0 comments on commit db9fdd6

Please sign in to comment.