From b50cf5cbf86de4b7451e71164ee1b20169999c35 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Tue, 26 Nov 2024 14:27:35 +0000 Subject: [PATCH 1/4] Checks k8s-related port availability in PreInitChecks PreInitChecks is called on bootstrap or when joining another Kubernetes cluster. Kubernetes and its services open up several ports; if they're already in use, we cannot progress. Adding these checks will make these error cases more explainable to the user, rather than a generic bootstrap / join error. --- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 4 +-- src/k8s/pkg/k8sd/app/hooks_join.go | 2 +- src/k8s/pkg/snap/interface.go | 2 +- src/k8s/pkg/snap/mock/mock.go | 2 +- src/k8s/pkg/snap/snap.go | 45 ++++++++++++++++++++++--- src/k8s/pkg/snap/snap_test.go | 18 ++++++++-- src/k8s/pkg/utils/net.go | 31 +++++++++++++++++ 7 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 src/k8s/pkg/utils/net.go diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 1386edca9..2a5c1a2dc 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -232,7 +232,7 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT } // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, false); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } @@ -420,7 +420,7 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, true); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index ac67b526f..2c8bbc37a 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -138,7 +138,7 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri } // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, true); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index f2ea533df..553e65da3 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -66,5 +66,5 @@ type Snap interface { K8sdClient(address string) (k8sd.Client, error) // k8sd client - PreInitChecks(ctx context.Context, config types.ClusterConfig) error // pre-init checks before k8s-snap can start + PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error // pre-init checks before k8s-snap can start } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 21846253d..1e4071f86 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error { return s.SnapctlSetErr } -func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error { +func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config) return s.PreInitChecksErr } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index 057d30cde..7aebd11f9 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -328,11 +328,11 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error { return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...)) } -func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error { - // TODO: check for available ports for k8s-dqlite, apiserver, containerd, etc +func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { + if err := checkK8sServicePorts(config, isControlPlane); err != nil { + return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err) + } - // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time - // This works around the issue by running them once during the install hook for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { if err := s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", binary), "--version"}); err != nil { return fmt.Errorf("%q binary could not run: %w", binary, err) @@ -354,4 +354,41 @@ func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) er return nil } +func checkK8sServicePorts(config types.ClusterConfig, isControlPlane bool) error { + // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time + // This works around the issue by running them once during the install hook + ports := map[string]int{ + // Default values from official Kubernetes documentation. + "kubelet": 10250, + "kubelet-healthz": 10248, + "kube-proxy-healhz": 10256, + "kube-proxy-metrics": 10249, + "k8s-dqlite": config.Datastore.GetK8sDqlitePort(), + "loadbalancer": config.LoadBalancer.GetBGPPeerPort(), + } + + if isControlPlane { + ports["kube-apiserver"] = config.APIServer.GetSecurePort() + ports["kube-scheduler"] = 10259 + ports["kube-controller-manager"] = 10257 + } else { + ports["kube-apiserver-proxy"] = config.APIServer.GetSecurePort() + } + + var allErrors []error + for service, port := range ports { + if port == 0 { + continue + } + if open, err := utils.IsLocalPortOpen(port); err != nil { + // Could not open port due to error. + allErrors = append(allErrors, fmt.Errorf("could not check port %d (needed by: %s): %w", port, service, err)) + } else if open { + allErrors = append(allErrors, fmt.Errorf("port %d (needed by: %s) is already in use.", port, service)) + } + } + + return errors.Join(allErrors...) +} + var _ Snap = &snap{} diff --git a/src/k8s/pkg/snap/snap_test.go b/src/k8s/pkg/snap/snap_test.go index e99ff0384..76eda2a8a 100644 --- a/src/k8s/pkg/snap/snap_test.go +++ b/src/k8s/pkg/snap/snap_test.go @@ -3,6 +3,7 @@ package snap_test import ( "context" "fmt" + "net" "os" "path/filepath" "testing" @@ -142,7 +143,7 @@ func TestSnap(t *testing.T) { }) conf := types.ClusterConfig{} - err = snap.PreInitChecks(context.Background(), conf) + err = snap.PreInitChecks(context.Background(), conf, true) g.Expect(err).To(Not(HaveOccurred())) expectedCalls := []string{} for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { @@ -150,6 +151,17 @@ func TestSnap(t *testing.T) { } g.Expect(mockRunner.CalledWithCommand).To(ConsistOf(expectedCalls)) + t.Run("Fail port already in use", func(t *testing.T) { + g := NewWithT(t) + // Open a port which will be checked (kubelet). + l, err := net.Listen("tcp", ":10250") + g.Expect(err).To(Not(HaveOccurred())) + defer l.Close() + + err = snap.PreInitChecks(context.Background(), conf, true) + g.Expect(err).To(HaveOccurred()) + }) + t.Run("Fail socket exists", func(t *testing.T) { g := NewWithT(t) // Create the containerd.sock file, which should cause the check to fail. @@ -160,7 +172,7 @@ func TestSnap(t *testing.T) { f.Close() defer os.Remove(f.Name()) - err = snap.PreInitChecks(context.Background(), conf) + err = snap.PreInitChecks(context.Background(), conf, true) g.Expect(err).To(HaveOccurred()) }) @@ -168,7 +180,7 @@ func TestSnap(t *testing.T) { g := NewWithT(t) mockRunner.Err = fmt.Errorf("some error") - err := snap.PreInitChecks(context.Background(), conf) + err := snap.PreInitChecks(context.Background(), conf, true) g.Expect(err).To(HaveOccurred()) }) }) diff --git a/src/k8s/pkg/utils/net.go b/src/k8s/pkg/utils/net.go new file mode 100644 index 000000000..78b564c86 --- /dev/null +++ b/src/k8s/pkg/utils/net.go @@ -0,0 +1,31 @@ +package utils + +import ( + "errors" + "net" + "os" + "strconv" + "syscall" + "time" +) + +// IsLocalPortOpen checks if the given local port is already open or not. +func IsLocalPortOpen(port int) (bool, error) { + if err := checkPort("localhost", port, 500*time.Millisecond); err == nil { + return true, nil + } else if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNREFUSED) { + return false, nil + } else { + // could not open due to error, couldn't check. + return false, err + } +} + +func checkPort(host string, port int, timeout time.Duration) error { + conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout) + if err != nil { + return err + } + conn.Close() + return nil +} From a5ed1484d3c0755cf82cabe9ad064466024718d4 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Wed, 27 Nov 2024 19:08:01 +0000 Subject: [PATCH 2/4] PreInitChecks: check non-default Kubernetes service ports Previously, we've only checked if the default Kubernetes service prots are open. However, they can be changed, or disabled, which we did not take into account. With this, we will. --- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 18 ++++- src/k8s/pkg/k8sd/app/hooks_join.go | 11 ++- src/k8s/pkg/k8sd/types/k8s_service_configs.go | 72 +++++++++++++++++++ src/k8s/pkg/snap/interface.go | 2 +- src/k8s/pkg/snap/mock/mock.go | 2 +- src/k8s/pkg/snap/snap.go | 55 ++++++++------ src/k8s/pkg/snap/snap_test.go | 15 ++-- src/k8s/pkg/utils/net.go | 7 +- 8 files changed, 147 insertions(+), 35 deletions(-) create mode 100644 src/k8s/pkg/k8sd/types/k8s_service_configs.go diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 2a5c1a2dc..112c02116 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -231,8 +231,14 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT Annotations: response.Annotations, } + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: false, + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, false); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } @@ -419,8 +425,16 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey) cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: true, + ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, true); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index 2c8bbc37a..3cdfd5315 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -9,6 +9,7 @@ import ( databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" "github.com/canonical/k8s/pkg/k8sd/pki" "github.com/canonical/k8s/pkg/k8sd/setup" + "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" "github.com/canonical/k8s/pkg/utils/control" @@ -137,8 +138,16 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri return fmt.Errorf("failed to initialize control plane certificates: %w", err) } + serviceConfigs := types.K8sServiceConfigs{ + IsControlPlane: true, + ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, true); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } diff --git a/src/k8s/pkg/k8sd/types/k8s_service_configs.go b/src/k8s/pkg/k8sd/types/k8s_service_configs.go new file mode 100644 index 000000000..693b60f72 --- /dev/null +++ b/src/k8s/pkg/k8sd/types/k8s_service_configs.go @@ -0,0 +1,72 @@ +package types + +import ( + "net" +) + +const ( + // Default values for Kubernetes services. + KubeControllerManagerPort = "10257" + KubeSchedulerPort = "10259" + KubeletPort = "10250" + KubeletHealthzPort = "10248" + KubeletReadOnlyPort = "10255" + KubeProxyHealthzPort = "10256" + KubeProxyMetricsPort = "10249" +) + +type K8sServiceConfigs struct { + IsControlPlane bool + ExtraNodeKubeControllerManagerArgs map[string]*string + ExtraNodeKubeSchedulerArgs map[string]*string + ExtraNodeKubeletArgs map[string]*string + ExtraNodeKubeProxyArgs map[string]*string +} + +func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort) +} + +func (s *K8sServiceConfigs) GetKubeSchedulerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort) +} + +func (s *K8sServiceConfigs) GetKubeletPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort) +} + +func (s *K8sServiceConfigs) GetKubeletHealthzPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort) +} + +func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort) +} + +func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "") + if address == "" { + return KubeProxyHealthzPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "") + if address == "" { + return KubeProxyMetricsPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string { + if serviceArgs == nil { + return defaultValue + } else if val, ok := serviceArgs[optionName]; !ok || val == nil { + return defaultValue + } else { + return *val + } +} diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index 553e65da3..48906f39f 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -66,5 +66,5 @@ type Snap interface { K8sdClient(address string) (k8sd.Client, error) // k8sd client - PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error // pre-init checks before k8s-snap can start + PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error // pre-init checks before k8s-snap can start } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 1e4071f86..72ce49a5d 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error { return s.SnapctlSetErr } -func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { +func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config) return s.PreInitChecksErr } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index 7aebd11f9..804e51965 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -8,6 +8,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "github.com/canonical/k8s/pkg/client/dqlite" @@ -328,11 +329,13 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error { return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...)) } -func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, isControlPlane bool) error { - if err := checkK8sServicePorts(config, isControlPlane); err != nil { +func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { + if err := checkK8sServicePorts(config, serviceConfigs); err != nil { return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err) } + // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time + // This works around the issue by running them once during the install hook for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { if err := s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", binary), "--version"}); err != nil { return fmt.Errorf("%q binary could not run: %w", binary, err) @@ -354,37 +357,47 @@ func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, is return nil } -func checkK8sServicePorts(config types.ClusterConfig, isControlPlane bool) error { - // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time - // This works around the issue by running them once during the install hook - ports := map[string]int{ +func checkK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { + var allErrors []error + ports := map[string]string{ // Default values from official Kubernetes documentation. - "kubelet": 10250, - "kubelet-healthz": 10248, - "kube-proxy-healhz": 10256, - "kube-proxy-metrics": 10249, - "k8s-dqlite": config.Datastore.GetK8sDqlitePort(), - "loadbalancer": config.LoadBalancer.GetBGPPeerPort(), + "kubelet": serviceConfigs.GetKubeletPort(), + "kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(), + "kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(), + "k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()), + "loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()), } - if isControlPlane { - ports["kube-apiserver"] = config.APIServer.GetSecurePort() - ports["kube-scheduler"] = 10259 - ports["kube-controller-manager"] = 10257 + if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil { + allErrors = append(allErrors, err) } else { - ports["kube-apiserver-proxy"] = config.APIServer.GetSecurePort() + ports["kube-proxy-healhz"] = port + } + + if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-metrics"] = port + } + + if serviceConfigs.IsControlPlane { + ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort()) + ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort() + ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort() + } else { + ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort()) } - var allErrors []error for service, port := range ports { - if port == 0 { + if port == "0" { + // Some ports may be set to 0 in order to disable them. No need to check. continue } if open, err := utils.IsLocalPortOpen(port); err != nil { // Could not open port due to error. - allErrors = append(allErrors, fmt.Errorf("could not check port %d (needed by: %s): %w", port, service, err)) + allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) } else if open { - allErrors = append(allErrors, fmt.Errorf("port %d (needed by: %s) is already in use.", port, service)) + allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) } } diff --git a/src/k8s/pkg/snap/snap_test.go b/src/k8s/pkg/snap/snap_test.go index 76eda2a8a..95b834849 100644 --- a/src/k8s/pkg/snap/snap_test.go +++ b/src/k8s/pkg/snap/snap_test.go @@ -142,8 +142,9 @@ func TestSnap(t *testing.T) { ContainerdBaseDir: containerdDir, }) conf := types.ClusterConfig{} + serviceConfigs := types.K8sServiceConfigs{} - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(Not(HaveOccurred())) expectedCalls := []string{} for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { @@ -154,11 +155,15 @@ func TestSnap(t *testing.T) { t.Run("Fail port already in use", func(t *testing.T) { g := NewWithT(t) // Open a port which will be checked (kubelet). - l, err := net.Listen("tcp", ":10250") + port := "9999" + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeletArgs: map[string]*string{"--port": &port}, + } + l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) g.Expect(err).To(Not(HaveOccurred())) defer l.Close() - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) @@ -172,7 +177,7 @@ func TestSnap(t *testing.T) { f.Close() defer os.Remove(f.Name()) - err = snap.PreInitChecks(context.Background(), conf, true) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) @@ -180,7 +185,7 @@ func TestSnap(t *testing.T) { g := NewWithT(t) mockRunner.Err = fmt.Errorf("some error") - err := snap.PreInitChecks(context.Background(), conf, true) + err := snap.PreInitChecks(context.Background(), conf, serviceConfigs) g.Expect(err).To(HaveOccurred()) }) }) diff --git a/src/k8s/pkg/utils/net.go b/src/k8s/pkg/utils/net.go index 78b564c86..453b44289 100644 --- a/src/k8s/pkg/utils/net.go +++ b/src/k8s/pkg/utils/net.go @@ -4,13 +4,12 @@ import ( "errors" "net" "os" - "strconv" "syscall" "time" ) // IsLocalPortOpen checks if the given local port is already open or not. -func IsLocalPortOpen(port int) (bool, error) { +func IsLocalPortOpen(port string) (bool, error) { if err := checkPort("localhost", port, 500*time.Millisecond); err == nil { return true, nil } else if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNREFUSED) { @@ -21,8 +20,8 @@ func IsLocalPortOpen(port int) (bool, error) { } } -func checkPort(host string, port int, timeout time.Duration) error { - conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, strconv.Itoa(port)), timeout) +func checkPort(host, port string, timeout time.Duration) error { + conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), timeout) if err != nil { return err } From b97b1b2ab736905b3c901422156746efaf091bf1 Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Fri, 29 Nov 2024 09:22:14 +0000 Subject: [PATCH 3/4] Replaces IsLocalPortOpen check Trying to listen on the given port will instead allow us to check against all addresses. --- src/k8s/pkg/snap/snap.go | 2 +- src/k8s/pkg/utils/net.go | 23 +++++++---------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index 804e51965..bfc4157ad 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -396,7 +396,7 @@ func checkK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sSe if open, err := utils.IsLocalPortOpen(port); err != nil { // Could not open port due to error. allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) - } else if open { + } else if !open { allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) } } diff --git a/src/k8s/pkg/utils/net.go b/src/k8s/pkg/utils/net.go index 453b44289..1a573875c 100644 --- a/src/k8s/pkg/utils/net.go +++ b/src/k8s/pkg/utils/net.go @@ -2,29 +2,20 @@ package utils import ( "errors" + "fmt" "net" - "os" "syscall" - "time" ) // IsLocalPortOpen checks if the given local port is already open or not. func IsLocalPortOpen(port string) (bool, error) { - if err := checkPort("localhost", port, 500*time.Millisecond); err == nil { - return true, nil - } else if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, syscall.ECONNREFUSED) { + // Without an address, Listen will listen on all addresses. + if l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)); errors.Is(err, syscall.EADDRINUSE) { return false, nil - } else { - // could not open due to error, couldn't check. + } else if err != nil { return false, err + } else { + l.Close() + return true, nil } } - -func checkPort(host, port string, timeout time.Duration) error { - conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), timeout) - if err != nil { - return err - } - conn.Close() - return nil -} From 3b34268847fe6ab6255b12ef97116be6f0df397c Mon Sep 17 00:00:00 2001 From: Claudiu Belu Date: Mon, 2 Dec 2024 13:01:01 +0000 Subject: [PATCH 4/4] Address comments --- src/k8s/pkg/k8sd/app/hooks_bootstrap.go | 6 +- src/k8s/pkg/k8sd/app/hooks_join.go | 3 +- src/k8s/pkg/k8sd/types/k8s_service_configs.go | 1 - src/k8s/pkg/snap/interface.go | 2 +- src/k8s/pkg/snap/mock/mock.go | 2 +- src/k8s/pkg/snap/snap.go | 53 +---------------- src/k8s/pkg/snap/snap_test.go | 8 +-- src/k8s/pkg/utils/checks/checks.go | 59 +++++++++++++++++++ 8 files changed, 71 insertions(+), 63 deletions(-) create mode 100644 src/k8s/pkg/utils/checks/checks.go diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 112c02116..ec93418a9 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -232,13 +232,12 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT } serviceConfigs := types.K8sServiceConfigs{ - IsControlPlane: false, ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, } // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, false); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } @@ -426,7 +425,6 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) serviceConfigs := types.K8sServiceConfigs{ - IsControlPlane: true, ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs, ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs, ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs, @@ -434,7 +432,7 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst } // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index 3cdfd5315..5bf692f05 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -139,7 +139,6 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri } serviceConfigs := types.K8sServiceConfigs{ - IsControlPlane: true, ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs, ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs, ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, @@ -147,7 +146,7 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri } // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg, serviceConfigs); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } diff --git a/src/k8s/pkg/k8sd/types/k8s_service_configs.go b/src/k8s/pkg/k8sd/types/k8s_service_configs.go index 693b60f72..fbaf0e4de 100644 --- a/src/k8s/pkg/k8sd/types/k8s_service_configs.go +++ b/src/k8s/pkg/k8sd/types/k8s_service_configs.go @@ -16,7 +16,6 @@ const ( ) type K8sServiceConfigs struct { - IsControlPlane bool ExtraNodeKubeControllerManagerArgs map[string]*string ExtraNodeKubeSchedulerArgs map[string]*string ExtraNodeKubeletArgs map[string]*string diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index 48906f39f..ed1d8d223 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -66,5 +66,5 @@ type Snap interface { K8sdClient(address string) (k8sd.Client, error) // k8sd client - PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error // pre-init checks before k8s-snap can start + PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error // pre-init checks before k8s-snap can start } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 72ce49a5d..22abd0c6e 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error { return s.SnapctlSetErr } -func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { +func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config) return s.PreInitChecksErr } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index bfc4157ad..ea61ec862 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -8,7 +8,6 @@ import ( "os" "os/exec" "path/filepath" - "strconv" "strings" "github.com/canonical/k8s/pkg/client/dqlite" @@ -19,6 +18,7 @@ import ( "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/k8s/pkg/utils/checks" "github.com/moby/sys/mountinfo" "gopkg.in/yaml.v2" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -329,8 +329,8 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error { return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...)) } -func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { - if err := checkK8sServicePorts(config, serviceConfigs); err != nil { +func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { + if err := checks.CheckK8sServicePorts(config, serviceConfigs, isControlPlane); err != nil { return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err) } @@ -357,51 +357,4 @@ func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, se return nil } -func checkK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs) error { - var allErrors []error - ports := map[string]string{ - // Default values from official Kubernetes documentation. - "kubelet": serviceConfigs.GetKubeletPort(), - "kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(), - "kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(), - "k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()), - "loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()), - } - - if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil { - allErrors = append(allErrors, err) - } else { - ports["kube-proxy-healhz"] = port - } - - if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil { - allErrors = append(allErrors, err) - } else { - ports["kube-proxy-metrics"] = port - } - - if serviceConfigs.IsControlPlane { - ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort()) - ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort() - ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort() - } else { - ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort()) - } - - for service, port := range ports { - if port == "0" { - // Some ports may be set to 0 in order to disable them. No need to check. - continue - } - if open, err := utils.IsLocalPortOpen(port); err != nil { - // Could not open port due to error. - allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) - } else if !open { - allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) - } - } - - return errors.Join(allErrors...) -} - var _ Snap = &snap{} diff --git a/src/k8s/pkg/snap/snap_test.go b/src/k8s/pkg/snap/snap_test.go index 95b834849..0f66857a7 100644 --- a/src/k8s/pkg/snap/snap_test.go +++ b/src/k8s/pkg/snap/snap_test.go @@ -144,7 +144,7 @@ func TestSnap(t *testing.T) { conf := types.ClusterConfig{} serviceConfigs := types.K8sServiceConfigs{} - err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(Not(HaveOccurred())) expectedCalls := []string{} for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { @@ -163,7 +163,7 @@ func TestSnap(t *testing.T) { g.Expect(err).To(Not(HaveOccurred())) defer l.Close() - err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(HaveOccurred()) }) @@ -177,7 +177,7 @@ func TestSnap(t *testing.T) { f.Close() defer os.Remove(f.Name()) - err = snap.PreInitChecks(context.Background(), conf, serviceConfigs) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(HaveOccurred()) }) @@ -185,7 +185,7 @@ func TestSnap(t *testing.T) { g := NewWithT(t) mockRunner.Err = fmt.Errorf("some error") - err := snap.PreInitChecks(context.Background(), conf, serviceConfigs) + err := snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(HaveOccurred()) }) }) diff --git a/src/k8s/pkg/utils/checks/checks.go b/src/k8s/pkg/utils/checks/checks.go new file mode 100644 index 000000000..f8c1fe16c --- /dev/null +++ b/src/k8s/pkg/utils/checks/checks.go @@ -0,0 +1,59 @@ +package checks + +import ( + "errors" + "fmt" + "strconv" + + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/utils" +) + +// CheckK8sServicePorts verifies that the Kubernetes-related ports are free to be used. +// The ports checked depends on whether a node is a control plane node, or a worker node. +func CheckK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { + var allErrors []error + ports := map[string]string{ + // Default values from official Kubernetes documentation. + "kubelet": serviceConfigs.GetKubeletPort(), + "kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(), + "kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(), + "k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()), + "loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()), + } + + if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-healhz"] = port + } + + if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-metrics"] = port + } + + if isControlPlane { + ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort()) + ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort() + ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort() + } else { + ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort()) + } + + for service, port := range ports { + if port == "0" { + // Some ports may be set to 0 in order to disable them. No need to check. + continue + } + if open, err := utils.IsLocalPortOpen(port); err != nil { + // Could not open port due to error. + allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) + } else if !open { + allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) + } + } + + return errors.Join(allErrors...) +}