Skip to content

Commit

Permalink
Addressing comments and bug-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
berkayoz committed Mar 29, 2024
1 parent 3270731 commit 63f1f69
Show file tree
Hide file tree
Showing 13 changed files with 113 additions and 103 deletions.
1 change: 0 additions & 1 deletion src/k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/canonical/go-dqlite v1.21.0
github.com/canonical/lxd v0.0.0-20231002162033-38796399c135
github.com/canonical/microcluster v0.0.0-20240122235408-1525f8ea8d7a
github.com/mitchellh/mapstructure v1.5.0
github.com/moby/sys/mountinfo v0.7.1
github.com/onsi/gomega v1.30.0
github.com/pelletier/go-toml v1.9.5
Expand Down
2 changes: 0 additions & 2 deletions src/k8s/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,6 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/component/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func ReconcileDNSComponent(ctx context.Context, s snap.Snap, alreadyEnabled *boo
if err != nil {
return "", "", fmt.Errorf("failed to disable dns: %w", err)
}
return "", "", nil
return clusterConfig.Kubelet.ClusterDNS, clusterConfig.Kubelet.ClusterDomain, nil
}
return "", "", nil
}
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/cluster_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func postClusterBootstrap(m *microcluster.MicroCluster, s *state.State, r *http.
}

// Set timeout
timeout := utils.TimeoutFromCtx(s.Context, 60*time.Second)
timeout := utils.TimeoutFromCtx(r.Context(), 30*time.Second)

// Bootstrap the cluster
if err := m.NewCluster(hostname, req.Address, config, timeout); err != nil {
Expand Down
29 changes: 13 additions & 16 deletions src/k8s/pkg/k8sd/api/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"

api "github.com/canonical/k8s/api/v1"
"github.com/mitchellh/mapstructure"

"github.com/canonical/k8s/pkg/component"
"github.com/canonical/k8s/pkg/k8sd/database"
Expand Down Expand Up @@ -152,8 +151,9 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
}
}

var dnsIP = newConfig.Kubelet.ClusterDNS
if req.Config.DNS != nil {
dnsIP, _, err := component.ReconcileDNSComponent(r.Context(), snap, oldConfig.DNS.Enabled, req.Config.DNS.Enabled, newConfig)
dnsIP, _, err = component.ReconcileDNSComponent(r.Context(), snap, oldConfig.DNS.Enabled, req.Config.DNS.Enabled, newConfig)
if err != nil {
return response.InternalError(fmt.Errorf("failed to reconcile dns: %w", err))
}
Expand All @@ -170,23 +170,20 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
}); err != nil {
return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err))
}
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: newConfig.Kubelet.ClusterDomain,
}, &data); err != nil {
return response.InternalError(fmt.Errorf("failed to encode node config: %w", err))
}
cmData := types.MapFromNodeConfig(types.NodeConfig{
ClusterDNS: &dnsIP,
ClusterDomain: &newConfig.Kubelet.ClusterDomain,
})

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}

if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", data); err != nil {
return response.InternalError(fmt.Errorf("failed to update node config: %w", err))
}
if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", cmData); err != nil {
return response.InternalError(fmt.Errorf("failed to update node config: %w", err))
}

if req.Config.LocalStorage != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/cluster_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func postClusterJoin(m *microcluster.MicroCluster, s *state.State, r *http.Reque
return response.BadRequest(fmt.Errorf("invalid hostname %q: %w", req.Name, err))
}

timeout := utils.TimeoutFromCtx(s.Context, 30*time.Second)
timeout := utils.TimeoutFromCtx(r.Context(), 30*time.Second)

internalToken := types.InternalWorkerNodeToken{}
// Check if token is worker token
Expand Down
29 changes: 13 additions & 16 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
"github.com/canonical/microcluster/state"
"github.com/mitchellh/mapstructure"
)

// onBootstrap is called after we bootstrap the first cluster node.
Expand Down Expand Up @@ -290,8 +289,9 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error
}
}

var dnsIP = cfg.Kubelet.ClusterDNS
if cfg.DNS.Enabled != nil {
dnsIP, _, err := component.ReconcileDNSComponent(s.Context, snap, vals.Pointer(false), cfg.DNS.Enabled, cfg)
dnsIP, _, err = component.ReconcileDNSComponent(s.Context, snap, vals.Pointer(false), cfg.DNS.Enabled, cfg)
if err != nil {
return fmt.Errorf("failed to reconcile dns: %w", err)
}
Expand All @@ -307,23 +307,20 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error
}); err != nil {
return fmt.Errorf("database transaction to update cluster configuration failed: %w", err)
}
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: cfg.Kubelet.ClusterDomain,
}, &data); err != nil {
return fmt.Errorf("failed to encode node config: %w", err)
}
cmData := types.MapFromNodeConfig(types.NodeConfig{
ClusterDNS: &dnsIP,
ClusterDomain: &cfg.Kubelet.ClusterDomain,
})

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", data); err != nil {
return fmt.Errorf("failed to update node configs: %w", err)
}
if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", cmData); err != nil {
return fmt.Errorf("failed to update node configs: %w", err)
}

if cfg.LocalStorage.Enabled != nil {
Expand Down
16 changes: 15 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/snap"
Expand All @@ -13,7 +14,20 @@ func onStart(s *state.State) error {
snap := snap.SnapFromContext(s.Context)

configController := controllers.NewNodeConfigurationController(snap, func(ctx context.Context) *k8s.Client {
return k8s.RetryNewClient(ctx, snap.KubernetesNodeRESTClientGetter("kube-system"))
for {
select {
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
default:
}

client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system"))
if err != nil {
continue
}
return client
}
})
go configController.Run(s.Context)

Expand Down
16 changes: 7 additions & 9 deletions src/k8s/pkg/k8sd/controllers/node_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/mitchellh/mapstructure"
v1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -45,22 +44,21 @@ func (c *NodeConfigurationController) Run(ctx context.Context) {
}

func (c *NodeConfigurationController) reconcile(ctx context.Context, configMap *v1.ConfigMap) error {
var nodeConfig types.NodeConfig
if err := mapstructure.Decode(configMap.Data, &nodeConfig); err != nil {
return fmt.Errorf("failed to decode node config: %w", err)
}
nodeConfig := types.NodeConfigFromMap(configMap.Data)

kubeletUpdateMap := make(map[string]string)
var kubeletDeleteList []string

if nodeConfig.ClusterDNS != "" {
kubeletUpdateMap["--cluster-dns"] = nodeConfig.ClusterDNS
if nodeConfig.ClusterDNS != nil && *nodeConfig.ClusterDNS != "" {
kubeletUpdateMap["--cluster-dns"] = *nodeConfig.ClusterDNS
} else {
kubeletDeleteList = append(kubeletDeleteList, "--cluster-dns")
}

if nodeConfig.ClusterDomain != "" {
kubeletUpdateMap["--cluster-domain"] = nodeConfig.ClusterDomain
if nodeConfig.ClusterDomain != nil && *nodeConfig.ClusterDomain != "" {
kubeletUpdateMap["--cluster-domain"] = *nodeConfig.ClusterDomain
} else {
kubeletUpdateMap["--cluster-domain"] = "cluster.local"
}

mustRestartKubelet, err := snaputil.UpdateServiceArguments(c.snap, "kubelet", kubeletUpdateMap, kubeletDeleteList)
Expand Down
45 changes: 42 additions & 3 deletions src/k8s/pkg/k8sd/types/node_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,46 @@
package types

type NodeConfig struct {
CloudProvider string `mapstructure:"cloud-provider,omitempty"`
ClusterDNS string `mapstructure:"cluster-dns,omitempty"`
ClusterDomain string `mapstructure:"cluster-domain,omitempty"`
CloudProvider *string
ClusterDNS *string
ClusterDomain *string
}

func NodeConfigFromMap(data map[string]string) NodeConfig {
nodeConfig := NodeConfig{}

cloudProvider, ok := data["cloud-provider"]
if ok {
nodeConfig.CloudProvider = &cloudProvider
}

clusterDNS, ok := data["cluster-dns"]
if ok {
nodeConfig.ClusterDNS = &clusterDNS
}

clusterDomain, ok := data["cluster-domain"]
if ok {
nodeConfig.ClusterDomain = &clusterDomain
}

return nodeConfig
}

func MapFromNodeConfig(nodeConfig NodeConfig) map[string]string {
data := make(map[string]string)

if nodeConfig.CloudProvider != nil {
data["cloud-provider"] = *nodeConfig.CloudProvider
}

if nodeConfig.ClusterDNS != nil {
data["cluster-dns"] = *nodeConfig.ClusterDNS
}

if nodeConfig.ClusterDomain != nil {
data["cluster-domain"] = *nodeConfig.ClusterDomain
}

return data
}
19 changes: 0 additions & 19 deletions src/k8s/pkg/utils/k8s/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package k8s

import (
"context"
"fmt"
"time"

"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
Expand All @@ -25,20 +23,3 @@ func NewClient(restClientGetter genericclioptions.RESTClientGetter) (*Client, er
}
return &Client{clientset}, nil
}

func RetryNewClient(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter) *Client {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
default:
}

client, err := NewClient(restClientGetter)
if err != nil {
continue
}
return client
}
}
30 changes: 17 additions & 13 deletions src/k8s/pkg/utils/k8s/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func TestWatchConfigMap(t *testing.T) {
name string
configmap *corev1.ConfigMap
}{
{
name: "pass nil object",
configmap: nil,
},
{
name: "example configmap with values",
configmap: &corev1.ConfigMap{
Expand All @@ -40,10 +44,6 @@ func TestWatchConfigMap(t *testing.T) {
Data: map[string]string{},
},
},
{
name: "pass nil object",
configmap: nil,
},
}

clientset := fake.NewSimpleClientset()
Expand All @@ -52,10 +52,10 @@ func TestWatchConfigMap(t *testing.T) {

client := &Client{Interface: clientset}

var receivedMaps []*corev1.ConfigMap
doneCh := make(chan *corev1.ConfigMap)

go client.WatchConfigMap(ctx, "kube-system", "test-config", func(configMap *corev1.ConfigMap) error {
receivedMaps = append(receivedMaps, configMap)
doneCh <- configMap
if configMap == nil {
return fmt.Errorf("unexpected nil map test case error")
}
Expand All @@ -64,18 +64,22 @@ func TestWatchConfigMap(t *testing.T) {

defer watcher.Stop()

for i, tc := range tests {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

watcher.Add(tc.configmap)
time.Sleep(100 * time.Millisecond)

if tc.configmap != nil {
g.Expect(receivedMaps[i].Data).To(Equal(tc.configmap.Data))
g.Expect(receivedMaps[i].Name).To(Equal(tc.configmap.Name))
g.Expect(receivedMaps[i].Namespace).To(Equal(tc.configmap.Namespace))
select {
case recv := <-doneCh:
if tc.configmap != nil {
g.Expect(recv.Data).To(Equal(tc.configmap.Data))
g.Expect(recv.Name).To(Equal(tc.configmap.Name))
g.Expect(recv.Namespace).To(Equal(tc.configmap.Namespace))
}
case <-time.After(time.Second):
t.Fatal("Timed out waiting for watch to complete")
}

})
}
}
Expand Down
23 changes: 3 additions & 20 deletions tests/integration/tests/test_config_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,12 @@ def test_config_propagation(instances: List[harness.Instance]):
nodes = util.ready_nodes(initial_node)
assert len(nodes) == 3, "all nodes should have joined cluster"

p = (
util.stubbornly(retries=5, delay_s=3)
.on(initial_node)
.until(lambda p: len(p.stdout.decode().replace("'", "")) > 0)
.exec(
[
"k8s",
"kubectl",
"get",
"service",
"coredns",
"-n",
"kube-system",
"-o=jsonpath='{.spec.clusterIP}'",
],
)
)
service_ip = p.stdout.decode().replace("'", "")
initial_node.exec(["k8s", "set", "dns.cluster-domain=integration.local"])

util.stubbornly(retries=5, delay_s=10).on(joining_cplane_node).until(
lambda p: f"--cluster-dns={service_ip}" in p.stdout.decode()
lambda p: f"--cluster-domain=integration.local" in p.stdout.decode()
).exec(["cat", "/var/snap/k8s/common/args/kubelet"])

util.stubbornly(retries=5, delay_s=10).on(joining_worker_node).until(
lambda p: f"--cluster-dns={service_ip}" in p.stdout.decode()
lambda p: f"--cluster-domain=integration.local" in p.stdout.decode()
).exec(["cat", "/var/snap/k8s/common/args/kubelet"])

0 comments on commit 63f1f69

Please sign in to comment.