From 4e479a6273581a03f0d8669a588cfe76fa1e1749 Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Mon, 4 Mar 2024 07:59:16 +0000 Subject: [PATCH] Make podRef to be unique We used podRef with format / to identify the IP allocation. However, in scenarios like replicaSet, a different pod can reuse the pod name. This patch changes the format of podRef to /: to ensure each ip allocation will have a unique identifier. The legacy podRef format will be updated automatically during an upgrade. Signed-off-by: Peng Liu --- cmd/controlloop/controlloop.go | 3 + pkg/config/config.go | 1 + pkg/controlloop/entity_generators.go | 6 +- pkg/controlloop/pod.go | 25 +++--- pkg/controlloop/pod_controller_test.go | 10 +-- pkg/reconciler/ip_test.go | 62 +++++++------- pkg/reconciler/iploop.go | 77 ++++++++++++++++-- pkg/reconciler/iploop_test.go | 108 +++++++++++++++++++++++++ pkg/reconciler/wrappedPod.go | 6 +- pkg/reconciler/wrappedPod_test.go | 14 +++- pkg/storage/kubernetes/client.go | 9 +++ pkg/storage/kubernetes/ipam.go | 6 +- pkg/types/types.go | 6 +- 13 files changed, 259 insertions(+), 74 deletions(-) create mode 100644 pkg/reconciler/iploop_test.go diff --git a/cmd/controlloop/controlloop.go b/cmd/controlloop/controlloop.go index f1c9a50fa..d33aa2090 100644 --- a/cmd/controlloop/controlloop.go +++ b/cmd/controlloop/controlloop.go @@ -79,6 +79,9 @@ func main() { } defer watcher.Close() + // trigger one immediate reconcile before cron job start + reconciler.ReconcileIPs(errorChan) + reconcilerConfigWatcher, err := reconciler.NewConfigWatcher( reconcilerCronConfiguration, s, diff --git a/pkg/config/config.go b/pkg/config/config.go index ba94ce61c..e43bbba34 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -52,6 +52,7 @@ func LoadIPAMConfig(bytes []byte, envArgs string, extraConfigPaths ...string) (* } n.IPAM.PodName = string(args.K8S_POD_NAME) n.IPAM.PodNamespace = string(args.K8S_POD_NAMESPACE) + n.IPAM.PodUID = string(args.K8S_POD_UID) flatipam, foundflatfile, err := GetFlatIPAM(false, n.IPAM, extraConfigPaths...) if err != nil { diff --git a/pkg/controlloop/entity_generators.go b/pkg/controlloop/entity_generators.go index efceb9e6f..aeadf1f60 100644 --- a/pkg/controlloop/entity_generators.go +++ b/pkg/controlloop/entity_generators.go @@ -1,5 +1,3 @@ -//go:build test -// +build test package controlloop @@ -10,6 +8,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" nad "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" @@ -58,12 +57,13 @@ func nodeSpec(name string) *v1.Node { } } -func podSpec(name string, namespace string, nodeName string, networks ...string) *v1.Pod { +func podSpec(name string, namespace string, uid string, nodeName string, networks ...string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, Annotations: podNetworkSelectionElements(networks...), + UID: types.UID(uid), }, Spec: v1.PodSpec{ NodeName: nodeName, diff --git a/pkg/controlloop/pod.go b/pkg/controlloop/pod.go index 78839cd3b..65efbe089 100644 --- a/pkg/controlloop/pod.go +++ b/pkg/controlloop/pod.go @@ -36,6 +36,7 @@ import ( "github.com/k8snetworkplumbingwg/whereabouts/pkg/logging" wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes" "github.com/k8snetworkplumbingwg/whereabouts/pkg/types" + "github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler" ) const ( @@ -177,6 +178,7 @@ func (pc *PodController) processNextWorkItem() bool { func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error { podNamespace := pod.GetNamespace() podName := pod.GetName() + podUID := pod.GetUID() ifaceStatuses, err := podNetworkStatus(pod) if err != nil { @@ -199,12 +201,12 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error { } logging.Verbosef("the NAD's config: %s", nad.Spec) - ipamConfig, err := ipamConfiguration(nad, podNamespace, podName, mountPath) + ipamConfig, err := ipamConfiguration(nad, podNamespace, podName, string(podUID), mountPath) if err != nil && isInvalidPluginType(err) { logging.Debugf("error while computing something: %v", err) continue } else if err != nil { - return fmt.Errorf("failed to create an IPAM configuration for the pod %s iface %s: %+v", podID(podNamespace, podName), ifaceStatus.Name, err) + return fmt.Errorf("failed to create an IPAM configuration for the pod %s iface %s: %+v", reconciler.ComposePodRef(*pod), ifaceStatus.Name, err) } var pools []*whereaboutsv1alpha1.IPPool @@ -222,7 +224,7 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error { for _, pool := range pools { for allocationIndex, allocation := range pool.Spec.Allocations { - if allocation.PodRef == podID(podNamespace, podName) { + if allocation.PodRef == reconciler.ComposePodRef(*pod) { logging.Verbosef("stale allocation to cleanup: %+v", allocation) client := *wbclient.NewKubernetesClient(nil, pc.k8sClient, 0) @@ -260,13 +262,11 @@ func (pc *PodController) handleResult(pod *v1.Pod, err error) { return } - podNamespace := pod.GetNamespace() - podName := pod.GetName() currentRetries := pc.workqueue.NumRequeues(pod) if currentRetries <= maxRetries { logging.Verbosef( "re-queuing IP address reconciliation request for pod %s; retry #: %d", - podID(podNamespace, podName), + reconciler.ComposePodRef(*pod), currentRetries) pc.workqueue.AddRateLimited(pod) return @@ -329,7 +329,7 @@ func (pc *PodController) addressGarbageCollected(pod *v1.Pod, networkName string func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error) { logging.Errorf( "dropping pod [%s] deletion out of the queue - could not reconcile IP: %+v", - podID(pod.GetNamespace(), pod.GetName()), + reconciler.ComposePodRef(*pod), err) pc.workqueue.Forget(pod) @@ -340,7 +340,7 @@ func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error) v1.EventTypeWarning, addressGarbageCollectionFailed, "failed to garbage collect addresses for pod %s", - podID(pod.GetNamespace(), pod.GetName())) + reconciler.ComposePodRef(*pod)) } } @@ -351,14 +351,10 @@ func onPodDelete(queue workqueue.RateLimitingInterface, obj interface{}) { return } - logging.Verbosef("deleted pod [%s]", podID(pod.GetNamespace(), pod.GetName())) + logging.Verbosef("deleted pod [%s]", reconciler.ComposePodRef(*pod)) queue.Add(stripPod(pod)) // we only need the pod's metadata & its network-status annotations. Hence we strip it. } -func podID(podNamespace string, podName string) string { - return fmt.Sprintf("%s/%s", podNamespace, podName) -} - func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) { var ifaceStatuses []nadv1.NetworkStatus networkStatus, found := pod.Annotations[nadv1.NetworkStatusAnnot] @@ -370,7 +366,7 @@ func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) { return ifaceStatuses, nil } -func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace string, podName string, mountPath string) (*types.IPAMConfig, error) { +func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace string, podName string, podUID string, mountPath string) (*types.IPAMConfig, error) { mounterWhereaboutsConfigFilePath := mountPath + whereaboutsConfigPath ipamConfig, err := config.LoadIPAMConfiguration([]byte(nad.Spec.Config), "", mounterWhereaboutsConfigFilePath) @@ -379,6 +375,7 @@ func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace stri } ipamConfig.PodName = podName ipamConfig.PodNamespace = podNamespace + ipamConfig.PodUID = podUID ipamConfig.Kubernetes.KubeConfigPath = mountPath + ipamConfig.Kubernetes.KubeConfigPath // must use the mount path return ipamConfig, nil diff --git a/pkg/controlloop/pod_controller_test.go b/pkg/controlloop/pod_controller_test.go index 4d295b335..56a3d24ca 100644 --- a/pkg/controlloop/pod_controller_test.go +++ b/pkg/controlloop/pod_controller_test.go @@ -26,6 +26,7 @@ import ( "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1" wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned" fakewbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned/fake" + "github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler" "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes" ) @@ -62,6 +63,7 @@ var _ = Describe("IPControlLoop", func() { const ( networkName = "meganet" podName = "tiny-winy-pod" + podUID = "tiny-winy-pod-uid" nodeName = "hypernode" ) @@ -74,7 +76,7 @@ var _ = Describe("IPControlLoop", func() { ) BeforeEach(func() { - pod = podSpec(podName, namespace, nodeName, networkName) + pod = podSpec(podName, namespace, nodeName, podUID, networkName) node = nodeSpec(nodeName) k8sClient = fakek8sclient.NewSimpleClientset(pod, node) os.Setenv("NODENAME", nodeName) @@ -120,7 +122,7 @@ var _ = Describe("IPControlLoop", func() { ) BeforeEach(func() { - dummyNetworkPool = ipPool(kubernetes.PoolIdentifier{IpRange: dummyNetIPRange, NetworkName: kubernetes.UnnamedNetwork}, ipPoolsNamespace(), podReference(pod)) + dummyNetworkPool = ipPool(kubernetes.PoolIdentifier{IpRange: dummyNetIPRange, NetworkName: kubernetes.UnnamedNetwork}, ipPoolsNamespace(), reconciler.ComposePodRef(*pod)) wbClient = fakewbclient.NewSimpleClientset(dummyNetworkPool) }) @@ -288,10 +290,6 @@ func newFakeNetAttachDefClient(namespace string, networkAttachments ...nad.Netwo return netAttachDefClient, nil } -func podReference(pod *v1.Pod) string { - return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) -} - func dummyWhereaboutsConfig() string { return `{ "datastore": "kubernetes", diff --git a/pkg/reconciler/ip_test.go b/pkg/reconciler/ip_test.go index 7e54d4e2d..12265296c 100644 --- a/pkg/reconciler/ip_test.go +++ b/pkg/reconciler/ip_test.go @@ -8,6 +8,8 @@ import ( "strings" "testing" + "github.com/google/uuid" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -18,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + kubetypes "k8s.io/apimachinery/pkg/types" k8sclient "k8s.io/client-go/kubernetes" fakek8sclient "k8s.io/client-go/kubernetes/fake" @@ -44,11 +47,9 @@ var _ = Describe("Whereabouts IP reconciler", func() { var ( reconcileLooper *ReconcileLooper k8sClientSet k8sclient.Interface + pod *v1.Pod ) - Context("reconciling IP pools with a single running pod", func() { - var pod *v1.Pod - BeforeEach(func() { pod = generatePod(namespace, podName, ipInNetwork{ip: firstIPInRange, networkName: networkName}) k8sClientSet = fakek8sclient.NewSimpleClientset(pod) @@ -63,7 +64,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { ) BeforeEach(func() { - pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name) + pool = generateIPPoolSpec(ipRange, namespace, poolName, pod) wbClient = fakewbclient.NewSimpleClientset(pool) }) @@ -102,7 +103,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { secondIPInRange = "10.10.10.2" ) - var pods []v1.Pod + var pods []*v1.Pod BeforeEach(func() { pods = nil @@ -115,7 +116,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { if i == livePodIndex { k8sClientSet = fakek8sclient.NewSimpleClientset(pod) } - pods = append(pods, *pod) + pods = append(pods, pod) } }) @@ -128,11 +129,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { ) BeforeEach(func() { - var podNames []string - for _, pod := range pods { - podNames = append(podNames, pod.Name) - } - pool = generateIPPoolSpec(ipRange, namespace, poolName, podNames...) + pool = generateIPPoolSpec(ipRange, namespace, poolName, pods...) wbClient = fakewbclient.NewSimpleClientset(pool) }) @@ -159,7 +156,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { remainingAllocation := map[string]v1alpha1.IPAllocation{ "2": { - PodRef: fmt.Sprintf("%s/%s", namespace, pods[livePodIndex].Name), + PodRef: ComposePodRef(*pods[livePodIndex]), }, } Expect(poolAfterCleanup.Spec.Allocations).To(Equal(remainingAllocation)) @@ -211,8 +208,8 @@ var _ = Describe("Whereabouts IP reconciler", func() { }) BeforeEach(func() { - firstPool := generateIPPoolSpec(firstNetworkRange, namespace, firstPoolName, pods[0].GetName(), pods[2].GetName()) - secondPool := generateIPPoolSpec(secondNetworkRange, namespace, secondPoolName, pods[1].GetName()) + firstPool := generateIPPoolSpec(firstNetworkRange, namespace, firstPoolName, pods[0], pods[2]) + secondPool := generateIPPoolSpec(secondNetworkRange, namespace, secondPoolName, pods[1]) wbClient = fakewbclient.NewSimpleClientset(firstPool, secondPool) pools = append(pools, *firstPool, *secondPool) }) @@ -221,7 +218,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { podIPs := []string{firstIPInRange, secondIPInRange, thirdIPInRange} for i := 0; i < numberOfPods; i++ { var clusterWideIP v1alpha1.OverlappingRangeIPReservation - ownerPodRef := fmt.Sprintf("%s/%s", namespace, pods[i].GetName()) + ownerPodRef := ComposePodRef(*pods[i]) _, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).Create(context.TODO(), generateClusterWideIPReservation(namespace, podIPs[i], ownerPodRef), metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) clusterWideIPs = append(clusterWideIPs, clusterWideIP) @@ -279,7 +276,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { }) BeforeEach(func() { - firstPool := generateIPPoolSpec(networkRange, namespace, poolName, pods[0].GetName()) + firstPool := generateIPPoolSpec(networkRange, namespace, poolName, pods[0]) wbClient = fakewbclient.NewSimpleClientset(firstPool) pools = append(pools, *firstPool) }) @@ -288,7 +285,7 @@ var _ = Describe("Whereabouts IP reconciler", func() { podIPs := []string{ipv6FirstIPInRange} for i := 0; i < numberOfPods; i++ { var clusterWideIP v1alpha1.OverlappingRangeIPReservation - ownerPodRef := fmt.Sprintf("%s/%s", namespace, pods[i].GetName()) + ownerPodRef := ComposePodRef(*pods[i]) _, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).Create(context.TODO(), generateClusterWideIPReservation(namespace, podIPs[i], ownerPodRef), metav1.CreateOptions{}) Expect(err).NotTo(HaveOccurred()) clusterWideIPs = append(clusterWideIPs, clusterWideIP) @@ -319,20 +316,19 @@ var _ = Describe("Whereabouts IP reconciler", func() { BeforeEach(func() { var err error - pod, err = k8sClientSet.CoreV1().Pods(namespace).Create( - context.TODO(), - generatePendingPod(namespace, podName), - metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) + pod = generatePendingPod(namespace, podName) + k8sClientSet = fakek8sclient.NewSimpleClientset(pod) - pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name) + pool = generateIPPoolSpec(ipRange, namespace, poolName, pod) wbClient = fakewbclient.NewSimpleClientset(pool) reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout) Expect(err).NotTo(HaveOccurred()) }) It("can be reconciled", func() { - Expect(reconcileLooper.ReconcileIPPools(context.TODO())).NotTo(BeEmpty()) + ips, err := reconcileLooper.ReconcileIPPools(context.TODO()) + Expect(ips).NotTo(BeEmpty()) + Expect(err).NotTo(HaveOccurred()) }) }) }) @@ -380,10 +376,11 @@ var _ = Describe("IPReconciler", func() { ipCIDR = "192.168.14.0/24" namespace = "default" podName = "pod1" + podUID = "pod1-uid" ) BeforeEach(func() { - podRef := "default/pod1" + podRef := "default/pod1:pod1-uid" reservations := generateIPReservation(firstIPInRange, podRef) pool := generateIPPool(ipCIDR, podRef) @@ -403,10 +400,10 @@ var _ = Describe("IPReconciler", func() { Context("and they are actually multiple IPs", func() { BeforeEach(func() { - podRef := "default/pod2" + podRef := "default/pod2:pod2-uid" reservations := generateIPReservation("192.168.14.2", podRef) - pool := generateIPPool(ipCIDR, podRef, "default/pod2", "default/pod3") + pool := generateIPPool(ipCIDR, podRef, "default/pod2:pod2-uid", "default/pod3") orphanedIPAddr := OrphanedIPReservations{ Pool: dummyPool{orphans: reservations, pool: pool}, Allocations: reservations, @@ -425,8 +422,8 @@ var _ = Describe("IPReconciler", func() { Context("but the IP reservation owner does not match", func() { var reservationPodRef string BeforeEach(func() { - reservationPodRef = "default/pod2" - podRef := "default/pod1" + reservationPodRef = "default/pod2:pod2-uid" + podRef := "default/pod1:pod1-uid" reservations := generateIPReservation(firstIPInRange, podRef) erroredReservations := generateIPReservation(firstIPInRange, reservationPodRef) @@ -448,11 +445,11 @@ var _ = Describe("IPReconciler", func() { }) }) -func generateIPPoolSpec(ipRange string, namespace string, poolName string, podNames ...string) *v1alpha1.IPPool { +func generateIPPoolSpec(ipRange string, namespace string, poolName string, pods ...*v1.Pod) *v1alpha1.IPPool { allocations := map[string]v1alpha1.IPAllocation{} - for i, podName := range podNames { + for i, pod := range pods { allocations[fmt.Sprintf("%d", i+1)] = v1alpha1.IPAllocation{ - PodRef: fmt.Sprintf("%s/%s", namespace, podName), + PodRef: ComposePodRef(*pod), } } return &v1alpha1.IPPool{ @@ -484,6 +481,7 @@ func generatePod(namespace string, podName string, ipNetworks ...ipInNetwork) *v Name: podName, Namespace: namespace, Annotations: generatePodAnnotations(ipNetworks...), + UID: kubetypes.UID(uuid.NewString()), }, Spec: v1.PodSpec{ Containers: []v1.Container{ diff --git a/pkg/reconciler/iploop.go b/pkg/reconciler/iploop.go index cc2b8fd65..2cf16b634 100644 --- a/pkg/reconciler/iploop.go +++ b/pkg/reconciler/iploop.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "regexp" "strings" "time" @@ -54,17 +55,22 @@ func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Cli return nil, logging.Errorf("failed to retrieve all IP pools: %v", err) } - pods, err := k8sClient.ListPods(ctx) - if err != nil { + looper := &ReconcileLooper{ + k8sClient: *k8sClient, + requestTimeout: timeout, + } + + // migrate the podRef format if needed + if err := looper.migrationPodRef(ctx, ipPools); err != nil { return nil, err } whereaboutsPodRefs := getPodRefsServedByWhereabouts(ipPools) - looper := &ReconcileLooper{ - k8sClient: *k8sClient, - liveWhereaboutsPods: indexPods(pods, whereaboutsPodRefs), - requestTimeout: timeout, + pods, err := k8sClient.ListPods(ctx) + if err != nil { + return nil, err } + looper.liveWhereaboutsPods = indexPods(pods, whereaboutsPodRefs) if err := looper.findOrphanedIPsPerPool(ipPools); err != nil { return nil, err @@ -170,8 +176,63 @@ func splitPodRef(podRef string) (string, string) { return namespacedName[0], namespacedName[1] } -func composePodRef(pod v1.Pod) string { - return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) +func ComposePodRef(pod v1.Pod) string { + return fmt.Sprintf("%s/%s:%s", pod.GetNamespace(), pod.GetName(), pod.GetUID()) +} + +func (rl ReconcileLooper) convertLegacyPodRef(podRef string) (string, error) { + podInfo := strings.Split(podRef, "/") + if len(podInfo) != 2 { + return "", fmt.Errorf("podRef %s is invalid", podRef) + } + pod, err := rl.k8sClient.GetPod(podInfo[0], podInfo[1]) + if err != nil { + return "", err + } + return ComposePodRef(*pod), nil +} + +// migrationPodRef will migrate the podRef format from 'Namespace/PodName' to +// 'Namespace/PodName:PodUID' for existing pools. This shall be a one-time job. +func (rl ReconcileLooper) migrationPodRef(ctx context.Context, ipPools []storage.IPPool) error { + pattern := `^([^\s/]+)/([^\s/]+):([^\s/]+)$` + + reservations, err := rl.k8sClient.ListOverlappingIPs(ctx) + if err != nil { + return fmt.Errorf("failed to list overlappingrangeipreservations %v", err) + } + for _, ip := range reservations { + if regexp.MustCompile(pattern).MatchString(ip.Spec.PodRef) { + continue + } + ip.Spec.PodRef, err = rl.convertLegacyPodRef(ip.Spec.PodRef) + if err != nil { + return err + } + _, err := rl.k8sClient.UpdateOverlappingIP(ctx, &ip) + if err != nil { + return fmt.Errorf("failed to update overlappingrangeipreservation %s: %v", ip.Spec.PodRef, err) + } + } + + var ipReservations []types.IPReservation + for _, pool := range ipPools { + ipReservations = nil + for _, r := range pool.Allocations() { + if !regexp.MustCompile(pattern).MatchString(r.PodRef) { + r.PodRef, err = rl.convertLegacyPodRef(r.PodRef) + if err != nil { + return err + } + } + ipReservations = append(ipReservations, r) + } + if len(ipReservations) > 0 { + return pool.Update(ctx, ipReservations) + } + } + + return nil } func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error) { diff --git a/pkg/reconciler/iploop_test.go b/pkg/reconciler/iploop_test.go new file mode 100644 index 000000000..32d5eca35 --- /dev/null +++ b/pkg/reconciler/iploop_test.go @@ -0,0 +1,108 @@ +package reconciler + +import ( + "context" + "fmt" + "regexp" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sclient "k8s.io/client-go/kubernetes" + fakek8sclient "k8s.io/client-go/kubernetes/fake" + + v1alpha1 "github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1" + "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes" + + wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned" + fakewbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned/fake" +) + +var _ = FDescribe("MigrationPodRef", func() { + const ( + firstIPInRange = "10.10.10.1" + ipRange = "10.10.10.0/16" + namespace = "default" + networkName = "net1" + podName = "pod1" + timeout = 10 + poolName = "pool1" + ) + + var ( + reconcileLooper *ReconcileLooper + k8sClientSet k8sclient.Interface + pod *v1.Pod + ) + + Context("When migration is needed", func() { + var ( + pool *v1alpha1.IPPool + wbClient wbclient.Interface + ctx context.Context + ) + + BeforeEach(func() { + ctx = context.TODO() + pod = generatePod(namespace, podName, ipInNetwork{ip: firstIPInRange, networkName: networkName}) + k8sClientSet = fakek8sclient.NewSimpleClientset(pod) + pool = generateLegacyIPPoolSpec(ipRange, namespace, poolName, pod) + wbClient = fakewbclient.NewSimpleClientset(pool) + + ownerPodRef := ComposePodRef(*pod) + _, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).Create(context.TODO(), generateClusterWideIPReservation(namespace, firstIPInRange, ownerPodRef), metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should update PodRef for overlappingrangeipreservations and pools", func() { + ipPools, err := reconcileLooper.k8sClient.ListIPPools(ctx) + Expect(err).NotTo(HaveOccurred()) + err = reconcileLooper.migrationPodRef(ctx, ipPools) + Expect(err).NotTo(HaveOccurred()) + + found := false + pattern := `^([^\s/]+)/([^\s/]+):([^\s/]+)$` + // Allippools shall have been updated + ipPools, err = reconcileLooper.k8sClient.ListIPPools(ctx) + Expect(err).NotTo(HaveOccurred()) + for _, pool := range ipPools { + for _, r := range pool.Allocations() { + if !regexp.MustCompile(pattern).MatchString(r.PodRef) { + found = true + } + } + } + Expect(found).To(BeFalse()) + // All overlappingrangeipreservations shall have been updated + ips, err := reconcileLooper.k8sClient.ListOverlappingIPs(ctx) + Expect(err).NotTo(HaveOccurred()) + for _, ip := range ips { + if !regexp.MustCompile(pattern).MatchString(ip.Spec.PodRef) { + found = true + } + } + Expect(found).To(BeFalse()) + }) + }) +}) + +func generateLegacyIPPoolSpec(ipRange string, namespace string, poolName string, pods ...*v1.Pod) *v1alpha1.IPPool { + allocations := map[string]v1alpha1.IPAllocation{} + for i, pod := range pods { + allocations[fmt.Sprintf("%d", i+1)] = v1alpha1.IPAllocation{ + PodRef: fmt.Sprintf("%s/%s", pod.Namespace, pod.Name), + } + } + return &v1alpha1.IPPool{ + ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: poolName, ResourceVersion: "1"}, + Spec: v1alpha1.IPPoolSpec{ + Range: ipRange, + Allocations: allocations, + }, + } +} diff --git a/pkg/reconciler/wrappedPod.go b/pkg/reconciler/wrappedPod.go index cb5e871df..d243108ca 100644 --- a/pkg/reconciler/wrappedPod.go +++ b/pkg/reconciler/wrappedPod.go @@ -42,7 +42,7 @@ func indexPods(livePodList []v1.Pod, whereaboutsPodNames map[string]void) map[st podMap := map[string]podWrapper{} for _, pod := range livePodList { - podRef := composePodRef(pod) + podRef := ComposePodRef(pod) if _, isWhereaboutsPod := whereaboutsPodNames[podRef]; !isWhereaboutsPod { continue } @@ -64,7 +64,7 @@ func getFlatIPSet(pod v1.Pod) (map[string]void, error) { return ipSet, logging.Errorf( "could not parse network annotation %s for pod: %s; error: %v", networkStatusAnnotationValue, - composePodRef(pod), + ComposePodRef(pod), err) } @@ -76,7 +76,7 @@ func getFlatIPSet(pod v1.Pod) (map[string]void, error) { for _, ip := range network.IPs { ipSet[ip] = empty - logging.Debugf("Added IP %s for pod %s", ip, composePodRef(pod)) + logging.Debugf("Added IP %s for pod %s", ip, ComposePodRef(pod)) } } return ipSet, nil diff --git a/pkg/reconciler/wrappedPod_test.go b/pkg/reconciler/wrappedPod_test.go index aba34e686..8aecf9a79 100644 --- a/pkg/reconciler/wrappedPod_test.go +++ b/pkg/reconciler/wrappedPod_test.go @@ -12,6 +12,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) var _ = Describe("Pod Wrapper operations", func() { @@ -72,12 +73,13 @@ var _ = Describe("Pod Wrapper operations", func() { } } - generatePodSpecWithNameAndNamespace := func(name string, namespace string, ips ...string) v1.Pod { + generatePodSpecWithNameAndNamespace := func(name string, namespace string, uid string, ips ...string) v1.Pod { return v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Annotations: generateMultusNetworkStatusAnnotationFromIPs(ips...), Name: name, Namespace: namespace, + UID: types.UID(uid), }, } } @@ -138,6 +140,7 @@ var _ = Describe("Pod Wrapper operations", func() { ips []string name string namespace string + uid string } table.DescribeTable("", func(podsInfo ...podInfo) { @@ -145,9 +148,9 @@ var _ = Describe("Pod Wrapper operations", func() { whereaboutsPods := map[string]void{} for _, info := range podsInfo { - newPod := generatePodSpecWithNameAndNamespace(info.name, info.namespace, info.ips...) + newPod := generatePodSpecWithNameAndNamespace(info.name, info.namespace, info.uid, info.ips...) pods = append(pods, newPod) - whereaboutsPods[composePodRef(newPod)] = void{} + whereaboutsPods[ComposePodRef(newPod)] = void{} } expectedPodWrapper := map[string]podWrapper{} for _, info := range podsInfo { @@ -155,7 +158,7 @@ var _ = Describe("Pod Wrapper operations", func() { for _, ip := range info.ips { indexedPodIPs[ip] = void{} } - expectedPodWrapper[fmt.Sprintf("%s/%s", info.namespace, info.name)] = podWrapper{ips: indexedPodIPs} + expectedPodWrapper[fmt.Sprintf("%s/%s:%s", info.namespace, info.name, info.uid)] = podWrapper{ips: indexedPodIPs} } Expect(indexPods(pods, whereaboutsPods)).To(Equal(expectedPodWrapper)) @@ -165,17 +168,20 @@ var _ = Describe("Pod Wrapper operations", func() { ips: []string{"10.10.10.10"}, name: "pod1", namespace: "default", + uid: "pod1uid", }), table.Entry("when multiple pods are passed", podInfo{ ips: []string{"10.10.10.10"}, name: "pod1", namespace: "default", + uid: "pod1uid", }, podInfo{ ips: []string{"192.168.14.14", "200.200.200.200s"}, name: "pod200", namespace: "secretns", + uid: "pod2uid", })) }) }) diff --git a/pkg/storage/kubernetes/client.go b/pkg/storage/kubernetes/client.go index 2fdc517af..8720b66d9 100644 --- a/pkg/storage/kubernetes/client.go +++ b/pkg/storage/kubernetes/client.go @@ -135,3 +135,12 @@ func (i *Client) DeleteOverlappingIP(ctx context.Context, clusterWideIP *whereab return i.client.WhereaboutsV1alpha1().OverlappingRangeIPReservations(clusterWideIP.GetNamespace()).Delete( ctxWithTimeout, clusterWideIP.GetName(), metav1.DeleteOptions{}) } + +func (i *Client) UpdateOverlappingIP(ctx context.Context, clusterWideIP *whereaboutsv1alpha1.OverlappingRangeIPReservation) (*whereaboutsv1alpha1.OverlappingRangeIPReservation, error) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, storage.RequestTimeout) + defer cancel() + + clusterWideIP, err := i.client.WhereaboutsV1alpha1().OverlappingRangeIPReservations(clusterWideIP.GetNamespace()).Update( + ctxWithTimeout, clusterWideIP, metav1.UpdateOptions{}) + return clusterWideIP, err +} diff --git a/pkg/storage/kubernetes/ipam.go b/pkg/storage/kubernetes/ipam.go index 299c3ee58..ae5c358e1 100644 --- a/pkg/storage/kubernetes/ipam.go +++ b/pkg/storage/kubernetes/ipam.go @@ -345,7 +345,7 @@ func (p *KubernetesIPPool) Update(ctx context.Context, reservations []whereabout // newLeaderElector creates a new leaderelection.LeaderElector and associated // channels by which to observe elections and depositions. -func newLeaderElector(clientset kubernetes.Interface, namespace string, podNamespace string, podID string, leaseDuration int, renewDeadline int, retryPeriod int) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) { +func newLeaderElector(clientset kubernetes.Interface, namespace string, podNamespace string, podID string, podUID string, leaseDuration int, renewDeadline int, retryPeriod int) (*leaderelection.LeaderElector, chan struct{}, chan struct{}) { //log.WithField("context", "leaderelection") // leaderOK will block gRPC startup until it's closed. leaderOK := make(chan struct{}) @@ -360,7 +360,7 @@ func newLeaderElector(clientset kubernetes.Interface, namespace string, podNames }, Client: clientset.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ - Identity: fmt.Sprintf("%s/%s", podNamespace, podID), + Identity: fmt.Sprintf("%s/%s:%s", podNamespace, podID, podUID), }, } @@ -401,7 +401,7 @@ func IPManagement(ctx context.Context, mode int, ipamConf whereaboutstypes.IPAMC } // setup leader election - le, leader, deposed := newLeaderElector(client.clientSet, client.namespace, ipamConf.PodNamespace, ipamConf.PodName, ipamConf.LeaderLeaseDuration, ipamConf.LeaderRenewDeadline, ipamConf.LeaderRetryPeriod) + le, leader, deposed := newLeaderElector(client.clientSet, client.namespace, ipamConf.PodNamespace, ipamConf.PodName, ipamConf.PodUID, ipamConf.LeaderLeaseDuration, ipamConf.LeaderRenewDeadline, ipamConf.LeaderRetryPeriod) var wg sync.WaitGroup wg.Add(2) diff --git a/pkg/types/types.go b/pkg/types/types.go index 519a094a5..d91492904 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -70,6 +70,7 @@ type IPAMConfig struct { ConfigurationPath string `json:"configuration_path"` PodName string PodNamespace string + PodUID string NetworkName string `json:"network_name,omitempty"` } @@ -106,6 +107,7 @@ func (ic *IPAMConfig) UnmarshalJSON(data []byte) error { ConfigurationPath string `json:"configuration_path"` PodName string PodNamespace string + PodUID string NetworkName string `json:"network_name,omitempty"` } @@ -142,13 +144,14 @@ func (ic *IPAMConfig) UnmarshalJSON(data []byte) error { ConfigurationPath: ipamConfigAlias.ConfigurationPath, PodName: ipamConfigAlias.PodName, PodNamespace: ipamConfigAlias.PodNamespace, + PodUID: ipamConfigAlias.PodUID, NetworkName: ipamConfigAlias.NetworkName, } return nil } func (ic *IPAMConfig) GetPodRef() string { - return fmt.Sprintf("%s/%s", ic.PodNamespace, ic.PodName) + return fmt.Sprintf("%s/%s:%s", ic.PodNamespace, ic.PodName, ic.PodUID) } func backwardsCompatibleIPAddress(ip string) net.IP { @@ -166,6 +169,7 @@ type IPAMEnvArgs struct { GATEWAY cnitypes.UnmarshallableString `json:"gateway,omitempty"` K8S_POD_NAME cnitypes.UnmarshallableString //revive:disable-line K8S_POD_NAMESPACE cnitypes.UnmarshallableString //revive:disable-line + K8S_POD_UID cnitypes.UnmarshallableString //revive:disable-line K8S_POD_INFRA_CONTAINER_ID cnitypes.UnmarshallableString //revive:disable-line }