Skip to content

Commit

Permalink
feat: node ipam controller
Browse files Browse the repository at this point in the history
Add node IPAM controller.

It supports two modes:
* RangeAllocator - classic mode (kubernetes does the same)
* CloudAllocator - Talos is responsible for setting PodCIDRs

Signed-off-by: Serge Logvinov <[email protected]>
  • Loading branch information
sergelogvinov committed Jun 28, 2024
1 parent 3b20bb0 commit ca3f706
Show file tree
Hide file tree
Showing 33 changed files with 4,048 additions and 194 deletions.
21 changes: 18 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ run:
# won't be reported. Default value is empty list, but there is
# no need to include all autogenerated files, we confidently recognize
# autogenerated files. If it's not please let us know.
exclude-files:
- charts/
- docs/
exclude-dirs:
- charts
- docs

# list of build tags, all linters use it. Default is empty list.
build-tags:
Expand Down Expand Up @@ -179,6 +179,8 @@ linters:
- perfsprint
- execinquery

- nlreturn

disable-all: false
fast: false

Expand All @@ -199,6 +201,19 @@ issues:
# Default value for this option is true.
exclude-use-default: false

# Which dirs to exclude: issues from them won't be reported.
# Can use regexp here: `generated.*`, regexp is applied on full path,
# including the path prefix if one is set.
exclude-dirs:
# copied from kubernetes repo
- pkg/names
- pkg/nodeipam/config
- pkg/utils/controller/node
- pkg/nodeipam/ipam/cidrset
exclude-files:
- cmd/talos-cloud-controller-manager/options/nodeipamcontroller.go
- pkg/nodeipam/ipam/range_allocator.go

# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

Expand Down
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ build: ## Build

.PHONY: run
run: build
./talos-cloud-controller-manager-$(ARCH) --v=5 --kubeconfig=kubeconfig --cloud-config=hack/ccm-config.yaml --controllers=cloud-node \
--use-service-account-credentials --leader-elect=false --bind-address=127.0.0.1 --authorization-always-allow-paths=/healthz,/livez,/readyz,/metrics
./talos-cloud-controller-manager-$(ARCH) --v=5 --kubeconfig=kubeconfig --cloud-config=hack/ccm-config.yaml --controllers=cloud-node,node-ipam-controller \
--allocate-node-cidrs \
--node-cidr-mask-size-ipv4=24 --node-cidr-mask-size-ipv6=112 \
--cidr-allocator-type=CloudAllocator \
--use-service-account-credentials --leader-elect=false --bind-address=127.0.0.1 --secure-port=0 --authorization-always-allow-paths=/healthz,/livez,/readyz,/metrics

.PHONY: lint
lint: ## Lint Code
Expand Down
25 changes: 20 additions & 5 deletions cmd/talos-cloud-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// This file should be written by each cloud provider.
// For an minimal working example, please refer to k8s.io/cloud-provider/sample/basic_main.go
// For more details, please refer to k8s.io/kubernetes/cmd/cloud-controller-manager/main.go

// Package main provides the CCM implementation.
package main

Expand All @@ -26,6 +22,7 @@ import (

"github.com/spf13/pflag"

kcmnames "github.com/siderolabs/talos-cloud-controller-manager/pkg/names"
"github.com/siderolabs/talos-cloud-controller-manager/pkg/talos"

"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -48,8 +45,26 @@ func main() {
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}

controllerInitializers := app.DefaultInitFuncConstructors
controllerAliases := names.CCMControllerAliases()

nodeIpamController := nodeIPAMController{}
nodeIpamController.nodeIPAMControllerOptions.NodeIPAMControllerConfiguration = &nodeIpamController.nodeIPAMControllerConfiguration
fss := cliflag.NamedFlagSets{}
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, app.DefaultInitFuncConstructors, names.CCMControllerAliases(), fss, wait.NeverStop)
nodeIpamController.nodeIPAMControllerOptions.AddFlags(fss.FlagSet(kcmnames.NodeIpamController))

controllerInitializers[kcmnames.NodeIpamController] = app.ControllerInitFuncConstructor{
// "node-controller" is the shared identity of all node controllers, including node, node lifecycle, and node ipam.
// See https://github.com/kubernetes/kubernetes/pull/72764#issuecomment-453300990 for more context.
InitContext: app.ControllerInitContext{
ClientName: "node-controller",
},
Constructor: nodeIpamController.startNodeIpamControllerWrapper,
}

app.ControllersDisabledByDefault.Insert(kcmnames.NodeIpamController)
controllerAliases["nodeipam"] = kcmnames.NodeIpamController
command := app.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, controllerInitializers, controllerAliases, fss, wait.NeverStop)

command.Flags().VisitAll(func(flag *pflag.Flag) {
if flag.Name == "cloud-provider" {
Expand Down
251 changes: 251 additions & 0 deletions cmd/talos-cloud-controller-manager/nodeipamcontroller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// kubernetes/cmd/cloud-controller-manager/nodeipamcontroller.go
package main

import (
"context"
"errors"
"fmt"
"net"
"strings"

nodeipamcontrolleroptions "github.com/siderolabs/talos-cloud-controller-manager/cmd/talos-cloud-controller-manager/options"
nodeipamcontroller "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam"
nodeipamconfig "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/config"
ipam "github.com/siderolabs/talos-cloud-controller-manager/pkg/nodeipam/ipam"
talosclient "github.com/siderolabs/talos-cloud-controller-manager/pkg/talosclient"

cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
)

const (
// defaultNodeMaskCIDRIPv4 is default mask size for IPv4 node cidr.
defaultNodeMaskCIDRIPv4 = 24
// defaultNodeMaskCIDRIPv6 is default mask size for IPv6 node cidr.
defaultNodeMaskCIDRIPv6 = 64
)

type nodeIPAMController struct {
nodeIPAMControllerConfiguration nodeipamconfig.NodeIPAMControllerConfiguration
nodeIPAMControllerOptions nodeipamcontrolleroptions.NodeIPAMControllerOptions
}

func (nodeIpamController *nodeIPAMController) startNodeIpamControllerWrapper(
initContext app.ControllerInitContext,
completedConfig *cloudcontrollerconfig.CompletedConfig,
cloud cloudprovider.Interface,
) app.InitFunc {
klog.V(4).InfoS("nodeIpamController.startNodeIpamControllerWrapper() called")

allErrors := nodeIpamController.nodeIPAMControllerOptions.Validate()
if len(allErrors) > 0 {
klog.Fatal("NodeIPAM controller values are not properly set.")
}

nodeIpamController.nodeIPAMControllerOptions.ApplyTo(&nodeIpamController.nodeIPAMControllerConfiguration) //nolint:errcheck

return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startNodeIpamController(ctx, initContext, completedConfig, nodeIpamController.nodeIPAMControllerConfiguration, controllerContext, cloud)
}
}

func startNodeIpamController(
ctx context.Context,
initContext app.ControllerInitContext,
ccmConfig *cloudcontrollerconfig.CompletedConfig,
nodeIPAMConfig nodeipamconfig.NodeIPAMControllerConfiguration,
controllerCtx genericcontrollermanager.ControllerContext,
cloud cloudprovider.Interface,
) (controller.Interface, bool, error) {
// should we start nodeIPAM
if !ccmConfig.ComponentConfig.KubeCloudShared.AllocateNodeCIDRs {
return nil, false, nil
}

talos, err := talosclient.New(ctx)
if err != nil {
return nil, false, err
}

if ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR == "" {
clusterCIDRs, err := talos.GetPodCIDRs(ctx)
if err != nil {
return nil, false, err
}

ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR = strings.Join(clusterCIDRs, ",")
}

// failure: bad cidrs in config
clusterCIDRs, dualStack, err := processCIDRs(ccmConfig.ComponentConfig.KubeCloudShared.ClusterCIDR)
if err != nil {
return nil, false, err
}

// failure: more than one cidr but they are not configured as dual stack
if len(clusterCIDRs) > 1 && !dualStack {
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and they are not configured as dual stack (at least one from each IPFamily", len(clusterCIDRs))
}

// failure: more than cidrs is not allowed even with dual stack
if len(clusterCIDRs) > 2 {
return nil, false, fmt.Errorf("len of clusters is:%v > more than max allowed of 2", len(clusterCIDRs))
}

svcCIDRs, err := talos.GetServiceCIDRs(ctx)
if err != nil {
return nil, false, err
}

serviceCIDRs, err := netutils.ParseCIDRs(svcCIDRs)
if err != nil {
return nil, false, err
}

nodeIPAMConfig.ServiceCIDR = svcCIDRs[0]
if len(svcCIDRs) > 1 {
nodeIPAMConfig.SecondaryServiceCIDR = svcCIDRs[1]
}

nodeCIDRMaskSizes, err := setNodeCIDRMaskSizes(nodeIPAMConfig, clusterCIDRs)
if err != nil {
return nil, false, err
}

klog.V(4).InfoS("nodeIpamController called", "clusterCIDRs", clusterCIDRs, "serviceCIDRs", serviceCIDRs, "nodeCIDRMaskSizes", nodeCIDRMaskSizes)

nodeIpamController, err := nodeipamcontroller.NewNodeIpamController(
ctx,
controllerCtx.InformerFactory.Core().V1().Nodes(),
cloud,
controllerCtx.ClientBuilder.ClientOrDie(initContext.ClientName),
clusterCIDRs,
serviceCIDRs,
nodeCIDRMaskSizes,
ipam.CIDRAllocatorType(ccmConfig.ComponentConfig.KubeCloudShared.CIDRAllocatorType),
)
if err != nil {
return nil, true, err
}

go nodeIpamController.Run(ctx)

return nil, true, nil
}

// processCIDRs is a helper function that works on a comma separated cidrs and returns
// a list of typed cidrs
// a flag if cidrs represents a dual stack
// error if failed to parse any of the cidrs.
func processCIDRs(cidrsList string) ([]*net.IPNet, bool, error) {
cidrsSplit := strings.Split(strings.TrimSpace(cidrsList), ",")

cidrs, err := netutils.ParseCIDRs(cidrsSplit)
if err != nil {
return nil, false, err
}

// if cidrs has an error then the previous call will fail
// safe to ignore error checking on next call
dualstack, _ := netutils.IsDualStackCIDRs(cidrs) //nolint:errcheck

return cidrs, dualstack, nil
}

// setNodeCIDRMaskSizes returns the IPv4 and IPv6 node cidr mask sizes to the value provided
// for --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 respectively. If value not provided,
// then it will return default IPv4 and IPv6 cidr mask sizes.
func setNodeCIDRMaskSizes(cfg nodeipamconfig.NodeIPAMControllerConfiguration, clusterCIDRs []*net.IPNet) ([]int, error) {
sortedSizes := func(maskSizeIPv4, maskSizeIPv6 int) []int {
nodeMaskCIDRs := make([]int, len(clusterCIDRs))

for idx, clusterCIDR := range clusterCIDRs {
if netutils.IsIPv6CIDR(clusterCIDR) {
nodeMaskCIDRs[idx] = maskSizeIPv6
} else {
nodeMaskCIDRs[idx] = maskSizeIPv4
}
}

return nodeMaskCIDRs
}

// --node-cidr-mask-size flag is incompatible with dual stack clusters.
ipv4Mask, ipv6Mask := defaultNodeMaskCIDRIPv4, defaultNodeMaskCIDRIPv6
isDualstack := len(clusterCIDRs) > 1

// case one: cluster is dualstack (i.e, more than one cidr)
if isDualstack {
// if --node-cidr-mask-size then fail, user must configure the correct dual-stack mask sizes (or use default)
if cfg.NodeCIDRMaskSize != 0 {
return nil, errors.New("usage of --node-cidr-mask-size is not allowed with dual-stack clusters")
}

if cfg.NodeCIDRMaskSizeIPv4 != 0 {
ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
}

if cfg.NodeCIDRMaskSizeIPv6 != 0 {
ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
}

return sortedSizes(ipv4Mask, ipv6Mask), nil
}

maskConfigured := cfg.NodeCIDRMaskSize != 0
maskV4Configured := cfg.NodeCIDRMaskSizeIPv4 != 0
maskV6Configured := cfg.NodeCIDRMaskSizeIPv6 != 0
isSingleStackIPv6 := netutils.IsIPv6CIDR(clusterCIDRs[0])

// original flag is set
if maskConfigured {
// original mask flag is still the main reference.
if maskV4Configured || maskV6Configured {
return nil, errors.New("usage of --node-cidr-mask-size-ipv4 and --node-cidr-mask-size-ipv6 is not allowed if --node-cidr-mask-size is set. For dual-stack clusters please unset it and use IPFamily specific flags") //nolint:lll
}

mask := int(cfg.NodeCIDRMaskSize)

return sortedSizes(mask, mask), nil
}

if maskV4Configured {
if isSingleStackIPv6 {
return nil, errors.New("usage of --node-cidr-mask-size-ipv4 is not allowed for a single-stack IPv6 cluster")
}

ipv4Mask = int(cfg.NodeCIDRMaskSizeIPv4)
}

// !maskV4Configured && !maskConfigured && maskV6Configured
if maskV6Configured {
if !isSingleStackIPv6 {
return nil, errors.New("usage of --node-cidr-mask-size-ipv6 is not allowed for a single-stack IPv4 cluster")
}

ipv6Mask = int(cfg.NodeCIDRMaskSizeIPv6)
}

return sortedSizes(ipv4Mask, ipv6Mask), nil
}
Loading

0 comments on commit ca3f706

Please sign in to comment.