From a423fc86cb77c04fc271b5a84c57bd70a679f58c Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Wed, 27 Nov 2024 19:08:01 +0000 Subject: [PATCH] PreInitChecks: check non-default Kubernetes service ports Previously, we've only checked if the default Kubernetes service prots are open. However, they can be changed, or disabled, which we did not take into account. With this, we will. --- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 18 ++++- src/k8s/pkg/k8sd/app/hooks_join.go | 11 ++- src/k8s/pkg/k8sd/types/k8s_service_configs.go | 72 +++++++++++++++++++ src/k8s/pkg/snap/interface.go | 2 +- src/k8s/pkg/snap/mock/mock.go | 2 +- src/k8s/pkg/snap/snap.go | 55 ++++++++------ src/k8s/pkg/snap/snap_test.go | 15 ++-- src/k8s/pkg/utils/net.go | 7 +- 8 files changed, 147 insertions(+), 35 deletions(-) create mode 100644 src/k8s/pkg/k8sd/types/k8s_service_configs.go diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 2a5c1a2dc1..112c021169 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -231,8 +231,14 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT Annotations: response.Annotations, } + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: false, + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, false); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } @@ -419,8 +425,16 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey) cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: true, + ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, true); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index 2c8bbc37a2..3cdfd5315e 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -9,6 +9,7 @@ import ( databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" "github.com/canonical/k8s/pkg/k8sd/pki" "github.com/canonical/k8s/pkg/k8sd/setup" + "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" "github.com/canonical/k8s/pkg/utils/control" @@ -137,8 +138,16 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri return fmt.Errorf("failed to initialize control plane certificates: %w", err) } + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: true, + ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, true); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } diff --git a/src/k8s/pkg/k8sd/types/k8s_service_configs.go b/src/k8s/pkg/k8sd/types/k8s_service_configs.go new file mode 100644 index 0000000000..693b60f72c --- /dev/null +++ b/src/k8s/pkg/k8sd/types/k8s_service_configs.go @@ -0,0 +1,72 @@ +package types + +import ( + "net" +) + +const ( + // Default values for Kubernetes services. + KubeControllerManagerPort = "10257" + KubeSchedulerPort = "10259" + KubeletPort = "10250" + KubeletHealthzPort = "10248" + KubeletReadOnlyPort = "10255" + KubeProxyHealthzPort = "10256" + KubeProxyMetricsPort = "10249" +) + +type K8sServiceConfigs struct { + IsControlPlane bool + ExtraNodeKubeControllerManagerArgs map[string]*string + ExtraNodeKubeSchedulerArgs map[string]*string + ExtraNodeKubeletArgs map[string]*string + ExtraNodeKubeProxyArgs map[string]*string +} + +func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort) +} + +func (s *K8sServiceConfigs) GetKubeSchedulerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort) +} + +func (s *K8sServiceConfigs) GetKubeletPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort) +} + +func (s *K8sServiceConfigs) GetKubeletHealthzPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort) +} + +func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort) +} + +func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "") + if address == "" { + return KubeProxyHealthzPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "") + if address == "" { + return KubeProxyMetricsPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string { + if serviceArgs == nil { + return defaultValue + } else if val, ok := serviceArgs[optionName]; !ok || val == nil { + return defaultValue + } else { + return *val + } +} diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index 553e65da38..48906f39f1 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -66,5 +66,5 @@ type Snap interface { K8sdClient(address string) (k8sd.Client, error) // k8sd client - PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error // pre-init checks before k8s-snap can start + PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error // pre-init checks before k8s-snap can start } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 1e4071f86a..72ce49a5d7 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error { return s.SnapctlSetErr } -func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { +func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config) return s.PreInitChecksErr } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index 7aebd11f91..804e51965d 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "github.com/canonical/k8s/pkg/client/dqlite" @@ -328,11 +329,13 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error { return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...)) } -func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { - if err := checkK8sServicePorts(config, isControlPlane); err != nil { +func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { + if err := checkK8sServicePorts(config, serviceConfigs); err != nil { return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err) } + // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time + // This works around the issue by running them once during the install hook for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { if err := s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", binary), "--version"}); err != nil { return fmt.Errorf("%q binary could not run: %w", binary, err) @@ -354,37 +357,47 @@ func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, is return nil } -func checkK8sServicePorts(config types.ClusterConfig, isControlPlane bool) error { - // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time - // This works around the issue by running them once during the install hook - ports := map[string]int{ +func checkK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { + var allErrors []error + ports := map[string]string{ // Default values from official Kubernetes documentation. - "kubelet": 10250, - "kubelet-healthz": 10248, - "kube-proxy-healhz": 10256, - "kube-proxy-metrics": 10249, - "k8s-dqlite": config.Datastore.GetK8sDqlitePort(), - "loadbalancer": config.LoadBalancer.GetBGPPeerPort(), + "kubelet": serviceConfigs.GetKubeletPort(), + "kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(), + "kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(), + "k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()), + "loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()), } - if isControlPlane { - ports["kube-apiserver"] = config.APIServer.GetSecurePort() - ports["kube-scheduler"] = 10259 - ports["kube-controller-manager"] = 10257 + if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil { + allErrors = append(allErrors, err) } else { - ports["kube-apiserver-proxy"] = config.APIServer.GetSecurePort() + ports["kube-proxy-healhz"] = port + } + + if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-metrics"] = port + } + + if serviceConfigs.IsControlPlane { + ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort()) + ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort() + ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort() + } else { + ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort()) } - var allErrors []error for service, port := range ports { - if port == 0 { + if port == "0" { + // Some ports may be set to 0 in order to disable them. No need to check. continue } if open, err := utils.IsLocalPortOpen(port); err != nil { // Could not open port due to error. - allErrors = append(allErrors, fmt.Errorf("could not check port %d (needed by: %s): %w", port, service, err)) + allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) } else if open { - allErrors = append(allErrors, fmt.Errorf("port %d (needed by: %s) is already in use.", port, service)) + allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) } } diff --git a/src/k8s/pkg/snap/snap_test.go b/src/k8s/pkg/snap/snap_test.go index 76eda2a8a1..95b8348490 100644 --- a/src/k8s/pkg/snap/snap_test.go +++ b/src/k8s/pkg/snap/snap_test.go @@ -142,8 +142,9 @@ func TestSnap(t *testing.T) { ContainerdBaseDir: containerdDir, }) conf := types.ClusterConfig{} + serviceConfigs := types.K8sServiceConfigs{} - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(Not(HaveOccurred())) expectedCalls := []string{} for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { @@ -154,11 +155,15 @@ func TestSnap(t *testing.T) { t.Run("Fail port already in use", func(t *testing.T) { g := NewWithT(t) // Open a port which will be checked (kubelet). - l, err := net.Listen("tcp", ":10250") + port := "9999" + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeletArgs: map[string]*string{"--port": &port}, + } + l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) g.Expect(err).To(Not(HaveOccurred())) defer l.Close() - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) @@ -172,7 +177,7 @@ func TestSnap(t *testing.T) { f.Close() defer os.Remove(f.Name()) - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) @@ -180,7 +185,7 @@ func TestSnap(t *testing.T) { g := NewWithT(t) mockRunner.Err = fmt.Errorf("some error") - err := snap.PreInitChecks(context.Background(), conf, true) + err := snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) }) diff --git a/src/k8s/pkg/utils/net.go b/src/k8s/pkg/utils/net.go index 78b564c864..453b442893 100644 --- a/src/k8s/pkg/utils/net.go +++ b/src/k8s/pkg/utils/net.go @@ -4,13 +4,12 @@ import ( "errors" "net" "os" - "strconv" "syscall" "time" ) // IsLocalPortOpen checks if the given local port is already open or not. -func IsLocalPortOpen(port int) (bool, error) { +func IsLocalPortOpen(port string) (bool, error) { if err := checkPort("localhost", port, 500*time.Millisecond); err == nil { return true, nil } else if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNREFUSED) { @@ -21,8 +20,8 @@ func IsLocalPortOpen(port int) (bool, error) { } } -func checkPort(host string, port int, timeout time.Duration) error { - conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout) +func checkPort(host, port string, timeout time.Duration) error { + conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), timeout) if err != nil { return err }