diff --git a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go index 1386edca9..ec93418a9 100644 --- a/src/k8s/pkg/k8sd/app/hooks_bootstrap.go +++ b/src/k8s/pkg/k8sd/app/hooks_bootstrap.go @@ -231,8 +231,13 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT Annotations: response.Annotations, } + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, false); err != nil { return fmt.Errorf("pre-init checks failed for worker node: %w", err) } @@ -419,8 +424,15 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst cfg.Certificates.K8sdPublicKey = utils.Pointer(certificates.K8sdPublicKey) cfg.Certificates.K8sdPrivateKey = utils.Pointer(certificates.K8sdPrivateKey) + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeSchedulerArgs: bootstrapConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: bootstrapConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: bootstrapConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: bootstrapConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil { return fmt.Errorf("pre-init checks failed for bootstrap node: %w", err) } diff --git a/src/k8s/pkg/k8sd/app/hooks_join.go b/src/k8s/pkg/k8sd/app/hooks_join.go index ac67b526f..5bf692f05 100644 --- a/src/k8s/pkg/k8sd/app/hooks_join.go +++ b/src/k8s/pkg/k8sd/app/hooks_join.go @@ -9,6 +9,7 @@ import ( databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util" "github.com/canonical/k8s/pkg/k8sd/pki" "github.com/canonical/k8s/pkg/k8sd/setup" + "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" "github.com/canonical/k8s/pkg/utils/control" @@ -137,8 +138,15 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri return fmt.Errorf("failed to initialize control plane certificates: %w", err) } + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeSchedulerArgs: joinConfig.ExtraNodeKubeSchedulerArgs, + ExtraNodeKubeControllerManagerArgs: joinConfig.ExtraNodeKubeControllerManagerArgs, + ExtraNodeKubeletArgs: joinConfig.ExtraNodeKubeletArgs, + ExtraNodeKubeProxyArgs: joinConfig.ExtraNodeKubeProxyArgs, + } + // Pre-init checks - if err := snap.PreInitChecks(ctx, cfg); err != nil { + if err := snap.PreInitChecks(ctx, cfg, serviceConfigs, true); err != nil { return fmt.Errorf("pre-init checks failed for joining node: %w", err) } diff --git a/src/k8s/pkg/k8sd/types/k8s_service_configs.go b/src/k8s/pkg/k8sd/types/k8s_service_configs.go new file mode 100644 index 000000000..fbaf0e4de --- /dev/null +++ b/src/k8s/pkg/k8sd/types/k8s_service_configs.go @@ -0,0 +1,71 @@ +package types + +import ( + "net" +) + +const ( + // Default values for Kubernetes services. + KubeControllerManagerPort = "10257" + KubeSchedulerPort = "10259" + KubeletPort = "10250" + KubeletHealthzPort = "10248" + KubeletReadOnlyPort = "10255" + KubeProxyHealthzPort = "10256" + KubeProxyMetricsPort = "10249" +) + +type K8sServiceConfigs struct { + ExtraNodeKubeControllerManagerArgs map[string]*string + ExtraNodeKubeSchedulerArgs map[string]*string + ExtraNodeKubeletArgs map[string]*string + ExtraNodeKubeProxyArgs map[string]*string +} + +func (s *K8sServiceConfigs) GetKubeControllerManagerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeControllerManagerArgs, "--secure-port", KubeControllerManagerPort) +} + +func (s *K8sServiceConfigs) GetKubeSchedulerPort() string { + return getConfigOrDefault(s.ExtraNodeKubeSchedulerArgs, "--secure-port", KubeSchedulerPort) +} + +func (s *K8sServiceConfigs) GetKubeletPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--port", KubeletPort) +} + +func (s *K8sServiceConfigs) GetKubeletHealthzPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--healthz-port", KubeletHealthzPort) +} + +func (s *K8sServiceConfigs) GetKubeletReadOnlyPort() string { + return getConfigOrDefault(s.ExtraNodeKubeletArgs, "--read-only-port", KubeletReadOnlyPort) +} + +func (s *K8sServiceConfigs) GetKubeProxyHealthzPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--healthz-bind-address", "") + if address == "" { + return KubeProxyHealthzPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func (s *K8sServiceConfigs) GetKubeProxyMetricsPort() (string, error) { + address := getConfigOrDefault(s.ExtraNodeKubeProxyArgs, "--metrics-bind-address", "") + if address == "" { + return KubeProxyMetricsPort, nil + } + _, port, err := net.SplitHostPort(address) + return port, err +} + +func getConfigOrDefault(serviceArgs map[string]*string, optionName, defaultValue string) string { + if serviceArgs == nil { + return defaultValue + } else if val, ok := serviceArgs[optionName]; !ok || val == nil { + return defaultValue + } else { + return *val + } +} diff --git a/src/k8s/pkg/snap/interface.go b/src/k8s/pkg/snap/interface.go index f2ea533df..ed1d8d223 100644 --- a/src/k8s/pkg/snap/interface.go +++ b/src/k8s/pkg/snap/interface.go @@ -66,5 +66,5 @@ type Snap interface { K8sdClient(address string) (k8sd.Client, error) // k8sd client - PreInitChecks(ctx context.Context, config types.ClusterConfig) error // pre-init checks before k8s-snap can start + PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error // pre-init checks before k8s-snap can start } diff --git a/src/k8s/pkg/snap/mock/mock.go b/src/k8s/pkg/snap/mock/mock.go index 21846253d..22abd0c6e 100644 --- a/src/k8s/pkg/snap/mock/mock.go +++ b/src/k8s/pkg/snap/mock/mock.go @@ -245,7 +245,7 @@ func (s *Snap) SnapctlSet(ctx context.Context, args ...string) error { return s.SnapctlSetErr } -func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error { +func (s *Snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { s.PreInitChecksCalledWith = append(s.PreInitChecksCalledWith, config) return s.PreInitChecksErr } diff --git a/src/k8s/pkg/snap/snap.go b/src/k8s/pkg/snap/snap.go index 057d30cde..ea61ec862 100644 --- a/src/k8s/pkg/snap/snap.go +++ b/src/k8s/pkg/snap/snap.go @@ -18,6 +18,7 @@ import ( "github.com/canonical/k8s/pkg/k8sd/types" "github.com/canonical/k8s/pkg/log" "github.com/canonical/k8s/pkg/utils" + "github.com/canonical/k8s/pkg/utils/checks" "github.com/moby/sys/mountinfo" "gopkg.in/yaml.v2" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -328,8 +329,10 @@ func (s *snap) SnapctlSet(ctx context.Context, args ...string) error { return s.runCommand(ctx, append([]string{"snapctl", "set"}, args...)) } -func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig) error { - // TODO: check for available ports for k8s-dqlite, apiserver, containerd, etc +func (s *snap) PreInitChecks(ctx context.Context, config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { + if err := checks.CheckK8sServicePorts(config, serviceConfigs, isControlPlane); err != nil { + return fmt.Errorf("Encountered error(s) while verifying port availability for Kubernetes services: %w", err) + } // NOTE(neoaggelos): in some environments the Kubernetes might hang when running for the first time // This works around the issue by running them once during the install hook diff --git a/src/k8s/pkg/snap/snap_test.go b/src/k8s/pkg/snap/snap_test.go index e99ff0384..0f66857a7 100644 --- a/src/k8s/pkg/snap/snap_test.go +++ b/src/k8s/pkg/snap/snap_test.go @@ -3,6 +3,7 @@ package snap_test import ( "context" "fmt" + "net" "os" "path/filepath" "testing" @@ -141,8 +142,9 @@ func TestSnap(t *testing.T) { ContainerdBaseDir: containerdDir, }) conf := types.ClusterConfig{} + serviceConfigs := types.K8sServiceConfigs{} - err = snap.PreInitChecks(context.Background(), conf) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(Not(HaveOccurred())) expectedCalls := []string{} for _, binary := range []string{"kube-apiserver", "kube-controller-manager", "kube-scheduler", "kube-proxy", "kubelet"} { @@ -150,6 +152,21 @@ func TestSnap(t *testing.T) { } g.Expect(mockRunner.CalledWithCommand).To(ConsistOf(expectedCalls)) + t.Run("Fail port already in use", func(t *testing.T) { + g := NewWithT(t) + // Open a port which will be checked (kubelet). + port := "9999" + serviceConfigs := types.K8sServiceConfigs{ + ExtraNodeKubeletArgs: map[string]*string{"--port": &port}, + } + l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)) + g.Expect(err).To(Not(HaveOccurred())) + defer l.Close() + + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) + g.Expect(err).To(HaveOccurred()) + }) + t.Run("Fail socket exists", func(t *testing.T) { g := NewWithT(t) // Create the containerd.sock file, which should cause the check to fail. @@ -160,7 +177,7 @@ func TestSnap(t *testing.T) { f.Close() defer os.Remove(f.Name()) - err = snap.PreInitChecks(context.Background(), conf) + err = snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(HaveOccurred()) }) @@ -168,7 +185,7 @@ func TestSnap(t *testing.T) { g := NewWithT(t) mockRunner.Err = fmt.Errorf("some error") - err := snap.PreInitChecks(context.Background(), conf) + err := snap.PreInitChecks(context.Background(), conf, serviceConfigs, true) g.Expect(err).To(HaveOccurred()) }) }) diff --git a/src/k8s/pkg/utils/checks/checks.go b/src/k8s/pkg/utils/checks/checks.go new file mode 100644 index 000000000..f8c1fe16c --- /dev/null +++ b/src/k8s/pkg/utils/checks/checks.go @@ -0,0 +1,59 @@ +package checks + +import ( + "errors" + "fmt" + "strconv" + + "github.com/canonical/k8s/pkg/k8sd/types" + "github.com/canonical/k8s/pkg/utils" +) + +// CheckK8sServicePorts verifies that the Kubernetes-related ports are free to be used. +// The ports checked depends on whether a node is a control plane node, or a worker node. +func CheckK8sServicePorts(config types.ClusterConfig, serviceConfigs types.K8sServiceConfigs, isControlPlane bool) error { + var allErrors []error + ports := map[string]string{ + // Default values from official Kubernetes documentation. + "kubelet": serviceConfigs.GetKubeletPort(), + "kubelet-healthz": serviceConfigs.GetKubeletHealthzPort(), + "kubelet-read-only": serviceConfigs.GetKubeletReadOnlyPort(), + "k8s-dqlite": strconv.Itoa(config.Datastore.GetK8sDqlitePort()), + "loadbalancer": strconv.Itoa(config.LoadBalancer.GetBGPPeerPort()), + } + + if port, err := serviceConfigs.GetKubeProxyHealthzPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-healhz"] = port + } + + if port, err := serviceConfigs.GetKubeProxyMetricsPort(); err != nil { + allErrors = append(allErrors, err) + } else { + ports["kube-proxy-metrics"] = port + } + + if isControlPlane { + ports["kube-apiserver"] = strconv.Itoa(config.APIServer.GetSecurePort()) + ports["kube-scheduler"] = serviceConfigs.GetKubeSchedulerPort() + ports["kube-controller-manager"] = serviceConfigs.GetKubeControllerManagerPort() + } else { + ports["kube-apiserver-proxy"] = strconv.Itoa(config.APIServer.GetSecurePort()) + } + + for service, port := range ports { + if port == "0" { + // Some ports may be set to 0 in order to disable them. No need to check. + continue + } + if open, err := utils.IsLocalPortOpen(port); err != nil { + // Could not open port due to error. + allErrors = append(allErrors, fmt.Errorf("could not check port %s (needed by: %s): %w", port, service, err)) + } else if !open { + allErrors = append(allErrors, fmt.Errorf("port %s (needed by: %s) is already in use.", port, service)) + } + } + + return errors.Join(allErrors...) +} diff --git a/src/k8s/pkg/utils/net.go b/src/k8s/pkg/utils/net.go new file mode 100644 index 000000000..1a573875c --- /dev/null +++ b/src/k8s/pkg/utils/net.go @@ -0,0 +1,21 @@ +package utils + +import ( + "errors" + "fmt" + "net" + "syscall" +) + +// IsLocalPortOpen checks if the given local port is already open or not. +func IsLocalPortOpen(port string) (bool, error) { + // Without an address, Listen will listen on all addresses. + if l, err := net.Listen("tcp", fmt.Sprintf(":%s", port)); errors.Is(err, syscall.EADDRINUSE) { + return false, nil + } else if err != nil { + return false, err + } else { + l.Close() + return true, nil + } +}