Skip to content

Commit

Permalink
Fix rolling update for StatefulSet integration (#3684)
Browse files Browse the repository at this point in the history
* Fixed unit test.

Finalize only Succeeded and Failed pods.

* Add e2e test.

* Add kueue.x-k8s.io/pod-group-serving annotation.

* Review remarks.

* Update test/e2e/singlecluster/statefulset_test.go

Co-authored-by: Michał Woźniak <[email protected]>

* Update test/e2e/singlecluster/statefulset_test.go

Co-authored-by: Michał Woźniak <[email protected]>

* Update test/integration/controller/jobs/pod/pod_controller_test.go

Co-authored-by: Michał Woźniak <[email protected]>

* Update pkg/controller/jobs/pod/pod_controller_test.go

Co-authored-by: Michał Woźniak <[email protected]>

---------

Co-authored-by: Michał Woźniak <[email protected]>
  • Loading branch information
mbobrovskyi and mimowo authored Dec 4, 2024
1 parent b5800f5 commit 18874ed
Show file tree
Hide file tree
Showing 14 changed files with 332 additions and 39 deletions.
5 changes: 5 additions & 0 deletions hack/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ if [[ -n ${KUBEFLOW_MPI_VERSION:-} ]]; then
fi

# sleep image to use for testing.
export E2E_TEST_IMAGE_OLD=gcr.io/k8s-staging-perf-tests/sleep:v0.0.3@sha256:00ae8e01dd4439edfb7eb9f1960ac28eba16e952956320cce7f2ac08e3446e6b
E2E_TEST_IMAGE_OLD_WITHOUT_SHA=${E2E_TEST_IMAGE_OLD%%@*}
export E2E_TEST_IMAGE=gcr.io/k8s-staging-perf-tests/sleep:v0.1.0@sha256:8d91ddf9f145b66475efda1a1b52269be542292891b5de2a7fad944052bab6ea
E2E_TEST_IMAGE_WITHOUT_SHA=${E2E_TEST_IMAGE%%@*}

Expand All @@ -64,11 +66,13 @@ function cluster_create {
}

function prepare_docker_images {
docker pull "$E2E_TEST_IMAGE_OLD"
docker pull "$E2E_TEST_IMAGE"

# We can load image by a digest but we cannot reference it by the digest that we pulled.
# For more information https://github.com/kubernetes-sigs/kind/issues/2394#issuecomment-888713831.
# Manually create tag for image with digest which is already pulled
docker tag $E2E_TEST_IMAGE_OLD "$E2E_TEST_IMAGE_OLD_WITHOUT_SHA"
docker tag $E2E_TEST_IMAGE "$E2E_TEST_IMAGE_WITHOUT_SHA"

if [[ -n ${JOBSET_VERSION:-} ]]; then
Expand All @@ -84,6 +88,7 @@ function prepare_docker_images {

# $1 cluster
function cluster_kind_load {
cluster_kind_load_image "$1" "${E2E_TEST_IMAGE_OLD_WITHOUT_SHA}"
cluster_kind_load_image "$1" "${E2E_TEST_IMAGE_WITHOUT_SHA}"
cluster_kind_load_image "$1" "$IMAGE_TAG"
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,8 +1222,16 @@ func (p *Pod) equivalentToWorkload(wl *kueue.Workload, jobPodSets []kueue.PodSet
return true
}

func (p *Pod) isServing() bool {
return p.isGroup && p.pod.Annotations[GroupServingAnnotation] == "true"
}

func (p *Pod) isReclaimable() bool {
return p.isGroup && !p.isServing()
}

func (p *Pod) ReclaimablePods() ([]kueue.ReclaimablePod, error) {
if !p.isGroup {
if !p.isReclaimable() {
return []kueue.ReclaimablePod{}, nil
}

Expand Down
117 changes: 117 additions & 0 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2643,6 +2643,123 @@ func TestReconciler(t *testing.T) {
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"reclaimablePods field is not updated for a serving pod group": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodFailed).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodRunning).
Obj(),
*basePodWrapper.
Clone().
Name("pod3").
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
Request(corev1.ResourceMemory, "1Gi").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodFailed).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodRunning).
Obj(),
*basePodWrapper.
Clone().
Name("pod3").
Label(constants.ManagedByKueueLabel, "true").
KueueFinalizer().
Group("test-group").
Request(corev1.ResourceMemory, "1Gi").
GroupTotalCount("3").
PodGroupServingAnnotation(true).
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").
PodSets(
*utiltesting.MakePodSet("4ebdd4a6", 1).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1Gi").
Obj(),
*utiltesting.MakePodSet(podUID, 2).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod3", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Condition(metav1.Condition{
Type: WorkloadWaitingForReplacementPods,
Status: metav1.ConditionTrue,
Reason: WorkloadPodsFailed,
Message: "Some Failed pods need replacement",
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").
PodSets(
*utiltesting.MakePodSet("4ebdd4a6", 1).
Request(corev1.ResourceCPU, "1").
Request(corev1.ResourceMemory, "1Gi").
Obj(),
*utiltesting.MakePodSet(podUID, 2).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod3", "test-uid").
Condition(metav1.Condition{
Type: WorkloadWaitingForReplacementPods,
Status: metav1.ConditionTrue,
Reason: WorkloadPodsFailed,
Message: "Some Failed pods need replacement",
}).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"excess pods before wl creation, youngest pods are deleted": {
pods: []corev1.Pod{
*basePodWrapper.
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
GroupNameLabel = "kueue.x-k8s.io/pod-group-name"
GroupTotalCountAnnotation = "kueue.x-k8s.io/pod-group-total-count"
GroupFastAdmissionAnnotation = "kueue.x-k8s.io/pod-group-fast-admission"
GroupServingAnnotation = "kueue.x-k8s.io/pod-group-serving"
RoleHashAnnotation = "kueue.x-k8s.io/role-hash"
RetriableInGroupAnnotation = "kueue.x-k8s.io/retriable-in-group"
)
Expand Down
9 changes: 3 additions & 6 deletions pkg/controller/jobs/statefulset/statefulset_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -57,11 +56,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling StatefulSet")

// For now, handle only scaling down to zero.
if ptr.Deref(sts.Spec.Replicas, 1) != 0 {
return ctrl.Result{}, nil
}

err = r.fetchAndFinalizePods(ctx, req.Namespace, req.Name)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -84,6 +78,9 @@ func (r *Reconciler) finalizePods(ctx context.Context, pods []corev1.Pod) error
log := ctrl.LoggerFrom(ctx)
return parallelize.Until(ctx, len(pods), func(i int) error {
p := &pods[i]
if p.Status.Phase != corev1.PodSucceeded && p.Status.Phase != corev1.PodFailed {
return nil
}
err := clientutil.Patch(ctx, r.client, p, true, func() (bool, error) {
removed := controllerutil.RemoveFinalizer(p, pod.PodFinalizer)
if removed {
Expand Down
53 changes: 21 additions & 32 deletions pkg/controller/jobs/statefulset/statefulset_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,57 +48,46 @@ func TestReconciler(t *testing.T) {
wantPods []corev1.Pod
wantErr error
}{
"statefulset with replicas != zero": {
"statefulset with finished pods": {
statefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(1).
Replicas(0).
Queue("lq").
DeepCopy(),
wantStatefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(1).
Replicas(0).
Queue("lq").
DeepCopy(),
pods: []corev1.Pod{
*testingjobspod.MakePod("pod", "ns").
*testingjobspod.MakePod("pod1", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
Finalizer(pod.PodFinalizer).
KueueFinalizer().
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*testingjobspod.MakePod("pod", "ns").
*testingjobspod.MakePod("pod2", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
Finalizer(pod.PodFinalizer).
KueueFinalizer().
StatusPhase(corev1.PodFailed).
Obj(),
},
},
"statefulset with replicas = 0": {
statefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(0).
Queue("lq").
DeepCopy(),
wantStatefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(0).
Queue("lq").
DeepCopy(),
pods: []corev1.Pod{
*testingjobspod.MakePod("pod", "ns").
*testingjobspod.MakePod("pod3", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
Finalizer(pod.PodFinalizer).
KueueFinalizer().
Obj(),
},
wantPods: []corev1.Pod{
*testingjobspod.MakePod("pod", "ns").
*testingjobspod.MakePod("pod1", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
StatusPhase(corev1.PodSucceeded).
Obj(),
*testingjobspod.MakePod("pod2", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
StatusPhase(corev1.PodFailed).
Obj(),
*testingjobspod.MakePod("pod3", "ns").
Label(pod.GroupNameLabel, GetWorkloadName("sts")).
KueueFinalizer().
Obj(),
},
},
"statefulset with no pods": {
statefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(0).
Queue("lq").DeepCopy(),
wantStatefulSet: *statefulsettesting.MakeStatefulSet("sts", "ns").
Replicas(0).
Queue("lq").DeepCopy(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/statefulset/statefulset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
}
ss.Spec.Template.Annotations[pod.GroupTotalCountAnnotation] = fmt.Sprint(ptr.Deref(ss.Spec.Replicas, 1))
ss.Spec.Template.Annotations[pod.GroupFastAdmissionAnnotation] = "true"
ss.Spec.Template.Annotations[pod.GroupServingAnnotation] = "true"

return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/jobs/statefulset/statefulset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestDefault(t *testing.T) {
PodTemplateSpecPodGroupNameLabel("test-pod", "", gvk).
PodTemplateSpecPodGroupTotalCountAnnotation(10).
PodTemplateSpecPodGroupFastAdmissionAnnotation(true).
PodTemplateSpecPodGroupServingAnnotation(true).
Obj(),
},
"statefulset without replicas": {
Expand All @@ -69,6 +70,7 @@ func TestDefault(t *testing.T) {
PodTemplateSpecPodGroupTotalCountAnnotation(1).
PodTemplateSpecQueue("test-queue").
PodTemplateSpecPodGroupFastAdmissionAnnotation(true).
PodTemplateSpecPodGroupServingAnnotation(true).
Obj(),
},
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (p *PodWrapper) Annotation(key, content string) *PodWrapper {
return p
}

func (p *PodWrapper) PodGroupServingAnnotation(enabled bool) *PodWrapper {
return p.Annotation("kueue.x-k8s.io/pod-group-serving", strconv.FormatBool(enabled))
}

// RoleHash updates the pod.RoleHashAnnotation of the pod
func (p *PodWrapper) RoleHash(h string) *PodWrapper {
return p.Annotation("kueue.x-k8s.io/role-hash", h)
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/testingjobs/statefulset/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (ss *StatefulSetWrapper) PodTemplateSpecPodGroupFastAdmissionAnnotation(ena
return ss.PodTemplateSpecAnnotation(pod.GroupFastAdmissionAnnotation, strconv.FormatBool(enabled))
}

func (ss *StatefulSetWrapper) PodTemplateSpecPodGroupServingAnnotation(enabled bool) *StatefulSetWrapper {
return ss.PodTemplateSpecAnnotation(pod.GroupServingAnnotation, strconv.FormatBool(enabled))
}

func (ss *StatefulSetWrapper) Image(image string, args []string) *StatefulSetWrapper {
ss.Spec.Template.Spec.Containers[0].Image = image
ss.Spec.Template.Spec.Containers[0].Args = args
Expand Down
11 changes: 11 additions & 0 deletions site/content/en/docs/reference/labels-and-annotations.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ Used on: [Plain Pods](/docs/tasks/run/plain_pods/).
The label key indicates the name of the group of Pods that should be admitted together.


### kueue.x-k8s.io/pod-group-serving

Type: Annotation

Example: `kueue.x-k8s.io/pod-group-serving: "true"`

Used on: [Plain Pods](/docs/tasks/run/plain_pods/).

The annotation key is used to indicate whether the pod group is being used as serving workload.


### kueue.x-k8s.io/pod-group-total-count

Type: Annotation
Expand Down
Loading

0 comments on commit 18874ed

Please sign in to comment.