Skip to content

Commit

Permalink
Make podRef to be unique
Browse files Browse the repository at this point in the history
We used podRef with format <namespace>/<pod_name> to identify the IP
allocation. However, in scenarios like replicaSet, a different
pod can reuse the pod name. This patch changes the format of podRef
to <namespace>/<pod_name>:<pod_uid> to ensure each ip allocation
will have a unique identifier. The legacy podRef format will be
updated automatically during an upgrade.

Signed-off-by: Peng Liu <[email protected]>
  • Loading branch information
pliurh committed Mar 5, 2024
1 parent c9e55dd commit 9fa1805
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 82 deletions.
3 changes: 3 additions & 0 deletions cmd/controlloop/controlloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func main() {
}
defer watcher.Close()

// trigger one immediate reconcile before cron job start
reconciler.ReconcileIPs(errorChan)

reconcilerConfigWatcher, err := reconciler.NewConfigWatcher(
reconcilerCronConfiguration,
s,
Expand Down
18 changes: 10 additions & 8 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,9 @@ var _ = Describe("Whereabouts functionality", func() {

Context("deleting a pod from the statefulset", func() {
var (
containerID string
podRef string
containerID string
podRef string
decomposedPodRef []string
)

BeforeEach(func() {
Expand All @@ -443,9 +444,11 @@ var _ = Describe("Whereabouts functionality", func() {
containerID = ipPool.Spec.Allocations["1"].ContainerID
podRef = ipPool.Spec.Allocations["1"].PodRef

decomposedPodRef := strings.Split(podRef, "/")
decomposedPodRef = strings.Split(podRef, ":")
Expect(decomposedPodRef).To(HaveLen(2))
podName := decomposedPodRef[1]
podNameNamespace := strings.Split(decomposedPodRef[0], "/")
Expect(podNameNamespace).To(HaveLen(2))
podName := podNameNamespace[1]

rightNow := int64(0)
Expect(clientInfo.Client.CoreV1().Pods(namespace).Delete(
Expand Down Expand Up @@ -476,8 +479,7 @@ var _ = Describe("Whereabouts functionality", func() {
metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(ipPool.Spec.Allocations).NotTo(BeEmpty())

Expect(allocationForPodRef(podRef, *ipPool).ContainerID).NotTo(Equal(containerID))
Expect(allocationForPodName(decomposedPodRef[0], *ipPool).ContainerID).NotTo(Equal(containerID))
})
})
})
Expand Down Expand Up @@ -669,9 +671,9 @@ var _ = Describe("Whereabouts functionality", func() {
})
})

func allocationForPodRef(podRef string, ipPool v1alpha1.IPPool) *v1alpha1.IPAllocation {
func allocationForPodName(podName string, ipPool v1alpha1.IPPool) *v1alpha1.IPAllocation {
for _, allocation := range ipPool.Spec.Allocations {
if allocation.PodRef == podRef {
if strings.Contains(allocation.PodRef, podName) {
return &allocation
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func LoadIPAMConfig(bytes []byte, envArgs string, extraConfigPaths ...string) (*
}
n.IPAM.PodName = string(args.K8S_POD_NAME)
n.IPAM.PodNamespace = string(args.K8S_POD_NAMESPACE)
n.IPAM.PodUID = string(args.K8S_POD_UID)

flatipam, foundflatfile, err := GetFlatIPAM(false, n.IPAM, extraConfigPaths...)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controlloop/entity_generators.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test
// +build test

package controlloop

Expand All @@ -10,6 +8,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"

nad "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"

Expand Down Expand Up @@ -58,12 +57,13 @@ func nodeSpec(name string) *v1.Node {
}
}

func podSpec(name string, namespace string, nodeName string, networks ...string) *v1.Pod {
func podSpec(name string, namespace string, uid string, nodeName string, networks ...string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Annotations: podNetworkSelectionElements(networks...),
UID: types.UID(uid),
},
Spec: v1.PodSpec{
NodeName: nodeName,
Expand Down
25 changes: 11 additions & 14 deletions pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/types"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler"
)

const (
Expand Down Expand Up @@ -177,6 +178,7 @@ func (pc *PodController) processNextWorkItem() bool {
func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
podNamespace := pod.GetNamespace()
podName := pod.GetName()
podUID := pod.GetUID()

ifaceStatuses, err := podNetworkStatus(pod)
if err != nil {
Expand All @@ -199,12 +201,12 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
}

logging.Verbosef("the NAD's config: %s", nad.Spec)
ipamConfig, err := ipamConfiguration(nad, podNamespace, podName, mountPath)
ipamConfig, err := ipamConfiguration(nad, podNamespace, podName, string(podUID), mountPath)
if err != nil && isInvalidPluginType(err) {
logging.Debugf("error while computing something: %v", err)
continue
} else if err != nil {
return fmt.Errorf("failed to create an IPAM configuration for the pod %s iface %s: %+v", podID(podNamespace, podName), ifaceStatus.Name, err)
return fmt.Errorf("failed to create an IPAM configuration for the pod %s iface %s: %+v", reconciler.ComposePodRef(*pod), ifaceStatus.Name, err)
}

var pools []*whereaboutsv1alpha1.IPPool
Expand All @@ -222,7 +224,7 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {

for _, pool := range pools {
for allocationIndex, allocation := range pool.Spec.Allocations {
if allocation.PodRef == podID(podNamespace, podName) {
if allocation.PodRef == reconciler.ComposePodRef(*pod) {
logging.Verbosef("stale allocation to cleanup: %+v", allocation)

client := *wbclient.NewKubernetesClient(nil, pc.k8sClient, 0)
Expand Down Expand Up @@ -260,13 +262,11 @@ func (pc *PodController) handleResult(pod *v1.Pod, err error) {
return
}

podNamespace := pod.GetNamespace()
podName := pod.GetName()
currentRetries := pc.workqueue.NumRequeues(pod)
if currentRetries <= maxRetries {
logging.Verbosef(
"re-queuing IP address reconciliation request for pod %s; retry #: %d",
podID(podNamespace, podName),
reconciler.ComposePodRef(*pod),
currentRetries)
pc.workqueue.AddRateLimited(pod)
return
Expand Down Expand Up @@ -329,7 +329,7 @@ func (pc *PodController) addressGarbageCollected(pod *v1.Pod, networkName string
func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error) {
logging.Errorf(
"dropping pod [%s] deletion out of the queue - could not reconcile IP: %+v",
podID(pod.GetNamespace(), pod.GetName()),
reconciler.ComposePodRef(*pod),
err)

pc.workqueue.Forget(pod)
Expand All @@ -340,7 +340,7 @@ func (pc *PodController) addressGarbageCollectionFailed(pod *v1.Pod, err error)
v1.EventTypeWarning,
addressGarbageCollectionFailed,
"failed to garbage collect addresses for pod %s",
podID(pod.GetNamespace(), pod.GetName()))
reconciler.ComposePodRef(*pod))
}
}

Expand All @@ -351,14 +351,10 @@ func onPodDelete(queue workqueue.RateLimitingInterface, obj interface{}) {
return
}

logging.Verbosef("deleted pod [%s]", podID(pod.GetNamespace(), pod.GetName()))
logging.Verbosef("deleted pod [%s]", reconciler.ComposePodRef(*pod))
queue.Add(stripPod(pod)) // we only need the pod's metadata & its network-status annotations. Hence we strip it.
}

func podID(podNamespace string, podName string) string {
return fmt.Sprintf("%s/%s", podNamespace, podName)
}

func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) {
var ifaceStatuses []nadv1.NetworkStatus
networkStatus, found := pod.Annotations[nadv1.NetworkStatusAnnot]
Expand All @@ -370,7 +366,7 @@ func podNetworkStatus(pod *v1.Pod) ([]nadv1.NetworkStatus, error) {
return ifaceStatuses, nil
}

func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace string, podName string, mountPath string) (*types.IPAMConfig, error) {
func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace string, podName string, podUID string, mountPath string) (*types.IPAMConfig, error) {
mounterWhereaboutsConfigFilePath := mountPath + whereaboutsConfigPath

ipamConfig, err := config.LoadIPAMConfiguration([]byte(nad.Spec.Config), "", mounterWhereaboutsConfigFilePath)
Expand All @@ -379,6 +375,7 @@ func ipamConfiguration(nad *nadv1.NetworkAttachmentDefinition, podNamespace stri
}
ipamConfig.PodName = podName
ipamConfig.PodNamespace = podNamespace
ipamConfig.PodUID = podUID
ipamConfig.Kubernetes.KubeConfigPath = mountPath + ipamConfig.Kubernetes.KubeConfigPath // must use the mount path

return ipamConfig, nil
Expand Down
10 changes: 4 additions & 6 deletions pkg/controlloop/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/k8snetworkplumbingwg/whereabouts/pkg/api/whereabouts.cni.cncf.io/v1alpha1"
wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned"
fakewbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned/fake"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler"
"github.com/k8snetworkplumbingwg/whereabouts/pkg/storage/kubernetes"
)

Expand Down Expand Up @@ -62,6 +63,7 @@ var _ = Describe("IPControlLoop", func() {
const (
networkName = "meganet"
podName = "tiny-winy-pod"
podUID = "tiny-winy-pod-uid"
nodeName = "hypernode"
)

Expand All @@ -74,7 +76,7 @@ var _ = Describe("IPControlLoop", func() {
)

BeforeEach(func() {
pod = podSpec(podName, namespace, nodeName, networkName)
pod = podSpec(podName, namespace, nodeName, podUID, networkName)
node = nodeSpec(nodeName)
k8sClient = fakek8sclient.NewSimpleClientset(pod, node)
os.Setenv("NODENAME", nodeName)
Expand Down Expand Up @@ -120,7 +122,7 @@ var _ = Describe("IPControlLoop", func() {
)

BeforeEach(func() {
dummyNetworkPool = ipPool(kubernetes.PoolIdentifier{IpRange: dummyNetIPRange, NetworkName: kubernetes.UnnamedNetwork}, ipPoolsNamespace(), podReference(pod))
dummyNetworkPool = ipPool(kubernetes.PoolIdentifier{IpRange: dummyNetIPRange, NetworkName: kubernetes.UnnamedNetwork}, ipPoolsNamespace(), reconciler.ComposePodRef(*pod))
wbClient = fakewbclient.NewSimpleClientset(dummyNetworkPool)
})

Expand Down Expand Up @@ -288,10 +290,6 @@ func newFakeNetAttachDefClient(namespace string, networkAttachments ...nad.Netwo
return netAttachDefClient, nil
}

func podReference(pod *v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

func dummyWhereaboutsConfig() string {
return `{
"datastore": "kubernetes",
Expand Down
Loading

0 comments on commit 9fa1805

Please sign in to comment.