From db9fdd621975611f6a6545f96fc819a89528b3f5 Mon Sep 17 00:00:00 2001 From: Mateo Florido <32885896+mateoflorido@users.noreply.github.com> Date: Thu, 17 Oct 2024 21:32:30 -0500 Subject: [PATCH] Add Approve Worker Nodes CSRs (#713) This commit adds an endpoint to approve CSRs from worker nodes via the CAPI provider --- src/k8s/go.mod | 2 +- src/k8s/go.sum | 4 +- .../pkg/k8sd/api/capi_certificate_refresh.go | 87 +++++++++++++++++++ src/k8s/pkg/k8sd/api/certificates_refresh.go | 68 ++++++++++++--- src/k8s/pkg/k8sd/api/endpoints.go | 5 ++ 5 files changed, 152 insertions(+), 14 deletions(-) create mode 100644 src/k8s/pkg/k8sd/api/capi_certificate_refresh.go diff --git a/src/k8s/go.mod b/src/k8s/go.mod index 1b3a893fb..00d2f0681 100644 --- a/src/k8s/go.mod +++ b/src/k8s/go.mod @@ -5,7 +5,7 @@ go 1.22.6 require ( dario.cat/mergo v1.0.0 github.com/canonical/go-dqlite v1.22.0 - github.com/canonical/k8s-snap-api v1.0.9 + github.com/canonical/k8s-snap-api v1.0.10 github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230 github.com/canonical/microcluster/v3 v3.0.0-20240827143335-f7a4d3984970 github.com/go-logr/logr v1.4.2 diff --git a/src/k8s/go.sum b/src/k8s/go.sum index 9620b105b..0ce367e78 100644 --- a/src/k8s/go.sum +++ b/src/k8s/go.sum @@ -99,8 +99,8 @@ github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0 h1:nvj0OLI3YqYXe github.com/bugsnag/panicwrap v0.0.0-20151223152923-e2c28503fcd0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywRkfhyM/+dE= github.com/canonical/go-dqlite v1.22.0 h1:DuJmfcREl4gkQJyvZzjl2GHFZROhbPyfdjDRQXpkOyw= github.com/canonical/go-dqlite v1.22.0/go.mod h1:Uvy943N8R4CFUAs59A1NVaziWY9nJ686lScY7ywurfg= -github.com/canonical/k8s-snap-api v1.0.9 h1:WhbyVtnR0GIAdY1UYBIzkspfgodxrHjlpT9FbG4NIu4= -github.com/canonical/k8s-snap-api v1.0.9/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY= +github.com/canonical/k8s-snap-api v1.0.10 h1:BoAw4Vr8mR8MWTKeZZxH5LmrF3JYGSZHDv+KEo5ifoU= +github.com/canonical/k8s-snap-api v1.0.10/go.mod h1:LDPoIYCeYnfgOFrwVPJ/4edGU264w7BB7g0GsVi36AY= github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230 h1:YOqZ+/14OPZ+/TOXpRHIX3KLT0C+wZVpewKIwlGUmW0= github.com/canonical/lxd v0.0.0-20240822122218-e7b2a7a83230/go.mod h1:YVGI7HStOKsV+cMyXWnJ7RaMPaeWtrkxyIPvGWbgACc= github.com/canonical/microcluster/v3 v3.0.0-20240827143335-f7a4d3984970 h1:UrnpglbXELlxtufdk6DGDytu2JzyzuS3WTsOwPrkQLI= diff --git a/src/k8s/pkg/k8sd/api/capi_certificate_refresh.go b/src/k8s/pkg/k8sd/api/capi_certificate_refresh.go new file mode 100644 index 000000000..51ed19c75 --- /dev/null +++ b/src/k8s/pkg/k8sd/api/capi_certificate_refresh.go @@ -0,0 +1,87 @@ +package api + +import ( + "fmt" + "net/http" + + apiv1 "github.com/canonical/k8s-snap-api/api/v1" + "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/lxd/lxd/response" + "github.com/canonical/microcluster/v3/state" + "golang.org/x/sync/errgroup" + certv1 "k8s.io/api/certificates/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// postApproveWorkerCSR approves the worker node CSR for the specified seed. +// The certificate approval process follows these steps: +// 1. The CAPI provider calls the /x/capi/refresh-certs/plan endpoint from a +// worker node, which generates a CSR and creates a CertificateSigningRequest +// object in the cluster. +// 2. The CAPI provider then calls the /k8sd/refresh-certs/run endpoint with +// the seed. This endpoint waits until the CSR is approved and the certificate +// is signed. Note that this is a blocking call. +// 3. The CAPI provider calls the /x/capi/refresh-certs/approve endpoint from +// any control plane node to approve the CSR. +// 4. The /x/capi/refresh-certs/run endpoint completes and returns once the +// certificate is approved and signed. +func (e *Endpoints) postApproveWorkerCSR(s state.State, r *http.Request) response.Response { + snap := e.provider.Snap() + + req := apiv1.ClusterAPIApproveWorkerCSRRequest{} + + if err := utils.NewStrictJSONDecoder(r.Body).Decode(&req); err != nil { + return response.BadRequest(fmt.Errorf("failed to parse request: %w", err)) + } + + if err := r.Body.Close(); err != nil { + return response.InternalError(fmt.Errorf("failed to close request body: %w", err)) + } + + client, err := snap.KubernetesClient("") + if err != nil { + return response.InternalError(fmt.Errorf("failed to get Kubernetes client: %w", err)) + } + + g, ctx := errgroup.WithContext(r.Context()) + + // CSR names + csrNames := []string{ + fmt.Sprintf("k8sd-%d-worker-kubelet-serving", req.Seed), + fmt.Sprintf("k8sd-%d-worker-kubelet-client", req.Seed), + fmt.Sprintf("k8sd-%d-worker-kube-proxy-client", req.Seed), + } + + for _, csrName := range csrNames { + g.Go(func() error { + if err := client.WatchCertificateSigningRequest( + ctx, + csrName, + func(request *certv1.CertificateSigningRequest) (bool, error) { + request.Status.Conditions = append(request.Status.Conditions, certv1.CertificateSigningRequestCondition{ + Type: certv1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "ApprovedByCK8sCAPI", + Message: "This CSR was approved by the Canonical Kubernetes CAPI Provider", + LastUpdateTime: metav1.Now(), + }) + _, err := client.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csrName, request, metav1.UpdateOptions{}) + if err != nil { + return false, fmt.Errorf("failed to update CSR %s: %w", csrName, err) + } + return true, nil + }, + ); err != nil { + return fmt.Errorf("certificate signing request failed: %w", err) + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return response.InternalError(fmt.Errorf("failed to approve worker node CSR: %w", err)) + } + + return response.SyncResponse(true, apiv1.ClusterAPIApproveWorkerCSRResponse{}) +} diff --git a/src/k8s/pkg/k8sd/api/certificates_refresh.go b/src/k8s/pkg/k8sd/api/certificates_refresh.go index 0c6a6749c..804c5bf81 100644 --- a/src/k8s/pkg/k8sd/api/certificates_refresh.go +++ b/src/k8s/pkg/k8sd/api/certificates_refresh.go @@ -143,10 +143,17 @@ func refreshCertsRunControlPlane(s state.State, r *http.Request, snap snap.Snap) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - if err := <-readyCh; err != nil { - log.Error(err, "Failed to refresh certificates") + select { + case err := <-readyCh: + if err != nil { + log.Error(err, "Failed to refresh certificates") + return + } + case <-ctx.Done(): + log.Error(ctx.Err(), "Timeout waiting for certificates to be refreshed") return } + if err := snaputil.RestartControlPlaneServices(ctx, snap); err != nil { log.Error(err, "Failed to restart control plane services") } @@ -325,13 +332,34 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo return response.InternalError(fmt.Errorf("failed to generate kube-proxy kubeconfig: %w", err)) } - // Restart the services - if err := snap.RestartService(r.Context(), "kubelet"); err != nil { - return response.InternalError(fmt.Errorf("failed to restart kubelet: %w", err)) - } - if err := snap.RestartService(r.Context(), "kube-proxy"); err != nil { - return response.InternalError(fmt.Errorf("failed to restart kube-proxy: %w", err)) - } + // NOTE: Restart the worker services in a separate goroutine to avoid + // restarting the kube-proxy and kubelet, which would break the + // proxy connection and cause missed responses in the proxy side. + readyCh := make(chan error, 1) + go func() { + // NOTE: Create a new context independent of the request context to ensure + // the restart process is not cancelled by the client. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + select { + case err := <-readyCh: + if err != nil { + log.Error(err, "Failed to refresh certificates") + return + } + case <-ctx.Done(): + log.Error(ctx.Err(), "Timeout waiting for certificates to be refreshed") + return + } + + if err := snap.RestartService(ctx, "kubelet"); err != nil { + log.Error(err, "Failed to restart kubelet") + } + if err := snap.RestartService(ctx, "kube-proxy"); err != nil { + log.Error(err, "Failed to restart kube-proxy") + } + }() cert, _, err := pkiutil.LoadCertificate(certificates.KubeletCert, "") if err != nil { @@ -339,8 +367,26 @@ func refreshCertsRunWorker(s state.State, r *http.Request, snap snap.Snap) respo } expirationTimeUNIX := cert.NotAfter.Unix() - return response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{ - ExpirationSeconds: int(expirationTimeUNIX), + + return response.ManualResponse(func(w http.ResponseWriter) (rerr error) { + defer func() { + readyCh <- rerr + close(readyCh) + }() + err := response.SyncResponse(true, apiv1.RefreshCertificatesRunResponse{ + ExpirationSeconds: int(expirationTimeUNIX), + }).Render(w) + if err != nil { + return fmt.Errorf("failed to render response: %w", err) + } + + f, ok := w.(http.Flusher) + if !ok { + return fmt.Errorf("ResponseWriter is not type http.Flusher") + } + + f.Flush() + return nil }) } diff --git a/src/k8s/pkg/k8sd/api/endpoints.go b/src/k8s/pkg/k8sd/api/endpoints.go index 69a9753a4..4ae9946e3 100644 --- a/src/k8s/pkg/k8sd/api/endpoints.go +++ b/src/k8s/pkg/k8sd/api/endpoints.go @@ -155,6 +155,11 @@ func (e *Endpoints) Endpoints() []rest.Endpoint { Path: apiv1.ClusterAPICertificatesRunRPC, Post: rest.EndpointAction{Handler: e.postRefreshCertsRun, AccessHandler: e.ValidateNodeTokenAccessHandler("node-token"), AllowUntrusted: true}, }, + { + Name: "ClusterAPI/RefreshCerts/Approve", + Path: apiv1.ClusterAPIApproveWorkerCSRRPC, + Post: rest.EndpointAction{Handler: e.postApproveWorkerCSR, AccessHandler: ValidateCAPIAuthTokenAccessHandler("capi-auth-token"), AllowUntrusted: true}, + }, // Snap refreshes { Name: "Snap/Refresh",