Skip to content

Commit

Permalink
Pass localhost address from the start hook instead of embedding into …
Browse files Browse the repository at this point in the history
…cluster config (#775)
  • Loading branch information
berkayoz authored Nov 12, 2024
1 parent 13689bd commit 00ab5b0
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 57 deletions.
21 changes: 21 additions & 0 deletions src/k8s/pkg/k8sd/app/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package app
import (
"context"
"fmt"
"net"

"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
mctypes "github.com/canonical/microcluster/v3/rest/types"
)

func startControlPlaneServices(ctx context.Context, snap snap.Snap, datastore string) error {
Expand Down Expand Up @@ -58,3 +60,22 @@ func waitApiServerReady(ctx context.Context, snap snap.Snap) error {

return nil
}

func DetermineLocalhostAddress(clusterMembers []mctypes.ClusterMember) (string, error) {
// Check if any of the cluster members have an IPv6 address, if so return "::1"
// if one member has an IPv6 address, other members should also have IPv6 interfaces
for _, clusterMember := range clusterMembers {
memberAddress := clusterMember.Address.Addr().String()
nodeIP := net.ParseIP(memberAddress)
if nodeIP == nil {
return "", fmt.Errorf("failed to parse node IP address %q", memberAddress)
}

if nodeIP.To4() == nil {
return "[::1]", nil
}
}

// If no IPv6 addresses are found this means the cluster is IPv4 only
return "127.0.0.1", nil
}
120 changes: 120 additions & 0 deletions src/k8s/pkg/k8sd/app/cluster_util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package app_test

import (
"net/netip"
"testing"

"github.com/canonical/k8s/pkg/k8sd/app"
mctypes "github.com/canonical/microcluster/v3/rest/types"
. "github.com/onsi/gomega"
)

func TestDetermineLocalhostAddress(t *testing.T) {
t.Run("IPv4Only", func(t *testing.T) {
g := NewWithT(t)

mockMembers := []mctypes.ClusterMember{
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node1",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("10.1.0.1:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node2",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("10.1.0.2:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node3",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("10.1.0.3:1234"),
},
},
},
}

localhostAddress, err := app.DetermineLocalhostAddress(mockMembers)

g.Expect(err).NotTo(HaveOccurred())
g.Expect(localhostAddress).To(Equal("127.0.0.1"))
})

t.Run("IPv6Only", func(t *testing.T) {
g := NewWithT(t)

mockMembers := []mctypes.ClusterMember{
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node1",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("[fda1:8e75:b6ef::]:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node2",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("[fd51:d664:aca3::]:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node3",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("[fda3:c11d:3cda::]:1234"),
},
},
},
}

localhostAddress, err := app.DetermineLocalhostAddress(mockMembers)

g.Expect(err).NotTo(HaveOccurred())
g.Expect(localhostAddress).To(Equal("[::1]"))
})

t.Run("IPv4_IPv6_Mixed", func(t *testing.T) {
g := NewWithT(t)

mockMembers := []mctypes.ClusterMember{
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node1",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("10.1.0.1:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node2",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("[fd51:d664:aca3::]:1234"),
},
},
},
{
ClusterMemberLocal: mctypes.ClusterMemberLocal{
Name: "node3",
Address: mctypes.AddrPort{
AddrPort: netip.MustParseAddrPort("10.1.0.3:1234"),
},
},
},
}

localhostAddress, err := app.DetermineLocalhostAddress(mockMembers)

g.Expect(err).NotTo(HaveOccurred())
g.Expect(localhostAddress).To(Equal("[::1]"))
})
}
2 changes: 0 additions & 2 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,6 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
localhostAddress = "127.0.0.1"
}

cfg.Network.LocalhostAddress = utils.Pointer(localhostAddress)

// Create directories
if err := setup.EnsureAllDirectories(snap); err != nil {
return fmt.Errorf("failed to create directories: %w", err)
Expand Down
18 changes: 18 additions & 0 deletions src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,24 @@ func (a *App) onStart(ctx context.Context, s state.State) error {
func(ctx context.Context) (types.ClusterConfig, error) {
return databaseutil.GetClusterConfig(ctx, s)
},
func() (string, error) {
c, err := s.Leader()
if err != nil {
return "", fmt.Errorf("failed to get leader client: %w", err)
}

clusterMembers, err := c.GetClusterMembers(ctx)
if err != nil {
return "", fmt.Errorf("failed to get cluster members: %w", err)
}

localhostAddress, err := DetermineLocalhostAddress(clusterMembers)
if err != nil {
return "", fmt.Errorf("failed to determine localhost address: %w", err)
}

return localhostAddress, nil
},
func(ctx context.Context, dnsIP string) error {
if err := s.Database().Transaction(ctx, func(ctx context.Context, tx *sql.Tx) error {
if _, err := database.SetClusterConfig(ctx, tx, types.ClusterConfig{
Expand Down
7 changes: 6 additions & 1 deletion src/k8s/pkg/k8sd/controllers/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,19 @@ func NewFeatureController(opts FeatureControllerOpts) *FeatureController {
func (c *FeatureController) Run(
ctx context.Context,
getClusterConfig func(context.Context) (types.ClusterConfig, error),
getLocalhostAddress func() (string, error),
notifyDNSChangedIP func(ctx context.Context, dnsIP string) error,
setFeatureStatus func(ctx context.Context, name types.FeatureName, featureStatus types.FeatureStatus) error,
) {
c.waitReady()
ctx = log.NewContext(ctx, log.FromContext(ctx).WithValues("controller", "feature"))

go c.reconcileLoop(ctx, getClusterConfig, setFeatureStatus, features.Network, c.triggerNetworkCh, c.reconciledNetworkCh, func(cfg types.ClusterConfig) (types.FeatureStatus, error) {
return features.Implementation.ApplyNetwork(ctx, c.snap, cfg.APIServer, cfg.Network, cfg.Annotations)
localhostAddress, err := getLocalhostAddress()
if err != nil {
return types.FeatureStatus{Enabled: false, Message: "failed to determine the localhost address"}, fmt.Errorf("failed to get localhost address: %w", err)
}
return features.Implementation.ApplyNetwork(ctx, c.snap, localhostAddress, cfg.APIServer, cfg.Network, cfg.Annotations)
})

go c.reconcileLoop(ctx, getClusterConfig, setFeatureStatus, features.Gateway, c.triggerGatewayCh, c.reconciledGatewayCh, func(cfg types.ClusterConfig) (types.FeatureStatus, error) {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/features/calico/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
// deployment.
// ApplyNetwork returns an error if anything fails. The error is also wrapped in the .Message field of the
// returned FeatureStatus.
func ApplyNetwork(ctx context.Context, snap snap.Snap, apiserver types.APIServer, network types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
func ApplyNetwork(ctx context.Context, snap snap.Snap, _ string, apiserver types.APIServer, network types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
m := snap.HelmClient()

if !network.GetEnabled() {
Expand Down
12 changes: 6 additions & 6 deletions src/k8s/pkg/k8sd/features/calico/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDisabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, nil)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, nil)

g.Expect(err).To(MatchError(applyErr))
g.Expect(status.Enabled).To(BeFalse())
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestDisabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, nil)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, nil)

g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestEnabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, defaultAnnotations)

g.Expect(err).To(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand All @@ -132,7 +132,7 @@ func TestEnabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, defaultAnnotations)

g.Expect(err).To(HaveOccurred())
g.Expect(status.Enabled).To(BeFalse())
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestEnabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, defaultAnnotations)

g.Expect(err).To(MatchError(applyErr))
g.Expect(status.Enabled).To(BeFalse())
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestEnabled(t *testing.T) {
SecurePort: ptr.To(6443),
}

status, err := calico.ApplyNetwork(context.Background(), snapM, apiserver, network, defaultAnnotations)
status, err := calico.ApplyNetwork(context.Background(), snapM, "127.0.0.1", apiserver, network, defaultAnnotations)

g.Expect(err).ToNot(HaveOccurred())
g.Expect(status.Enabled).To(BeTrue())
Expand Down
5 changes: 3 additions & 2 deletions src/k8s/pkg/k8sd/features/cilium/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cilium
import (
"context"
"fmt"
"strings"

"github.com/canonical/k8s/pkg/client/helm"
"github.com/canonical/k8s/pkg/k8sd/types"
Expand Down Expand Up @@ -31,7 +32,7 @@ var (
// deployment.
// ApplyNetwork returns an error if anything fails. The error is also wrapped in the .Message field of the
// returned FeatureStatus.
func ApplyNetwork(ctx context.Context, snap snap.Snap, apiserver types.APIServer, network types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
func ApplyNetwork(ctx context.Context, snap snap.Snap, localhostAddress string, apiserver types.APIServer, network types.Network, annotations types.Annotations) (types.FeatureStatus, error) {
m := snap.HelmClient()

if !network.GetEnabled() {
Expand Down Expand Up @@ -124,7 +125,7 @@ func ApplyNetwork(ctx context.Context, snap snap.Snap, apiserver types.APIServer
"disableEnvoyVersionCheck": true,
// socketLB requires an endpoint to the apiserver that's not managed by the kube-proxy
// so we point to the localhost:secureport to talk to either the kube-apiserver or the kube-apiserver-proxy
"k8sServiceHost": network.GetLocalhostAddress(),
"k8sServiceHost": strings.Trim(localhostAddress, "[]"), // Cilium already adds the brackets for ipv6 addresses, so we need to remove them
"k8sServicePort": apiserver.GetSecurePort(),
// This flag enables the runtime device detection which is set to true by default in Cilium 1.16+
"enableRuntimeDeviceDetection": true,
Expand Down
Loading

0 comments on commit 00ab5b0

Please sign in to comment.