Skip to content

Commit

Permalink
fix TestRerun flakiness
Browse files Browse the repository at this point in the history
  • Loading branch information
xrstf committed Oct 12, 2024
1 parent 4a1667b commit 9102251
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 106 deletions.
188 changes: 95 additions & 93 deletions test/integration/test/deck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"net/http"
"sort"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -358,7 +357,10 @@ func TestDeckTenantIDs(t *testing.T) {
func TestRerun(t *testing.T) {
t.Parallel()
const rerunJobConfigFile = "rerun-test.yaml"

jobName := "rerun-test-job-" + RandomString(t)
prowJobSelector := labels.SelectorFromSet(map[string]string{kube.ProwJobAnnotation: jobName})

var rerunJobConfigTemplate = `periodics:
- interval: 1h
name: %s
Expand All @@ -379,137 +381,137 @@ func TestRerun(t *testing.T) {
t.Fatalf("Failed creating clients for cluster %q: %v", clusterContext, err)
}

rerunJobConfig := fmt.Sprintf(rerunJobConfigTemplate, jobName, "foo")
if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte(rerunJobConfig)); err != nil {
t.Fatalf("Failed update job config: %v", err)
}
ctx := context.Background()

// Now we are waiting on Horologium to create the first prow job so that we
// can rerun from.
// Horologium itself is pretty good at handling the configmap update, but
// not kubelet, according to
// https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs
// configmap updates on existing pods every minute, which is a long wait.
// The proposed fix in the issue was updating the deployment, which imo
// should be better handled by just refreshing pods.
// So here comes forcing restart of horologium pods.
if err := refreshProwPods(kubeClient, context.Background(), "horologium"); err != nil {
t.Fatalf("Failed refreshing horologium pods: %v", err)
}
// Same with deck
if err := refreshProwPods(kubeClient, context.Background(), "deck"); err != nil {
t.Fatalf("Failed refreshing deck pods: %v", err)
redeployJobConfig := func(jobConfig string) {
if err := updateJobConfig(ctx, kubeClient, rerunJobConfigFile, []byte(jobConfig)); err != nil {
t.Fatalf("Failed update job config: %v", err)
}

// Now we are waiting on Horologium to create the first prow job so that we
// can rerun from.
// Horologium itself is pretty good at handling the configmap update, but
// not kubelet, according to
// https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs
// configmap updates on existing pods every minute, which is a long wait.
// It's quicker to rollout the affected Deployments.
if err := rolloutDeployment(t, ctx, kubeClient, "horologium"); err != nil {
t.Fatalf("Failed rolling out Horologium: %v", err)
}
// Same with deck
if err := rolloutDeployment(t, ctx, kubeClient, "deck"); err != nil {
t.Fatalf("Failed rolling out Deck: %v", err)
}
}

// Deploy the initial template with "foo" as the label.
redeployJobConfig(fmt.Sprintf(rerunJobConfigTemplate, jobName, "foo"))

t.Cleanup(func() {
if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte{}); err != nil {
if err := updateJobConfig(ctx, kubeClient, rerunJobConfigFile, []byte{}); err != nil {
t.Logf("ERROR CLEANUP: %v", err)
}
labels, err := labels.Parse("prow.k8s.io/job = " + jobName)
if err != nil {
t.Logf("Skip cleaning up jobs, as failed parsing label: %v", err)
return
// Prevent horologium from immediately creating the "missing" ProwJob after the
// DeleteAll call further down, because horologium still runs with the last
// non-empty configuration (foo=bar).
if err := rolloutDeployment(t, ctx, kubeClient, "horologium"); err != nil {
t.Logf("Failed rolling out Horologium: %v", err)
}
if err := kubeClient.DeleteAllOf(context.Background(), &prowjobv1.ProwJob{}, &ctrlruntimeclient.DeleteAllOfOptions{
ListOptions: ctrlruntimeclient.ListOptions{LabelSelector: labels},
if err := kubeClient.DeleteAllOf(ctx, &prowjobv1.ProwJob{}, &ctrlruntimeclient.DeleteAllOfOptions{
ListOptions: ctrlruntimeclient.ListOptions{
Namespace: defaultNamespace,
LabelSelector: prowJobSelector,
},
}); err != nil {
t.Logf("ERROR CLEANUP: %v", err)
}
})
ctx := context.Background()

getLatestJob := func(t *testing.T, jobName string, lastRun *v1.Time) *prowjobv1.ProwJob {
var res *prowjobv1.ProwJob
if err := wait.PollUntilContextTimeout(ctx, time.Second, 90*time.Second, true, func(ctx context.Context) (bool, error) {
pjs := &prowjobv1.ProwJobList{}
err = kubeClient.List(ctx, pjs, &ctrlruntimeclient.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{kube.ProwJobAnnotation: jobName}),
err := kubeClient.List(ctx, pjs, &ctrlruntimeclient.ListOptions{
LabelSelector: prowJobSelector,
Namespace: defaultNamespace,
})
if err != nil {
return false, fmt.Errorf("failed listing prow jobs: %w", err)
}

sort.Slice(pjs.Items, func(i, j int) bool {
revi, _ := strconv.Atoi(pjs.Items[i].ResourceVersion)
revj, _ := strconv.Atoi(pjs.Items[j].ResourceVersion)
return revi > revj
createdi := pjs.Items[i].CreationTimestamp
createdj := pjs.Items[j].CreationTimestamp
return createdj.Before(&createdi)
})

if len(pjs.Items) > 0 {
if lastRun != nil && pjs.Items[0].CreationTimestamp.Before(lastRun) {
return false, nil
}
res = &pjs.Items[0]
}

return res != nil, nil
}); err != nil {
t.Fatalf("Failed waiting for job %q: %v", jobName, err)
}
return res
}
rerun := func(t *testing.T, jobName string, mode string) {
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost/rerun?mode=%v&prowjob=%v", mode, jobName), nil)
if err != nil {
t.Fatalf("Could not generate a request %v", err)
}

// Deck might not have been informed about the job config update, retry
// for this case.
waitDur := time.Second * 5
var lastErr error
for i := 0; i < 3; i++ {
lastErr = nil
res, err := http.DefaultClient.Do(req)
if err != nil {
lastErr = fmt.Errorf("could not make post request %v", err)
res.Body.Close()
break
}
// The only retry condition is status not ok
if res.StatusCode != http.StatusOK {
lastErr = fmt.Errorf("status not expected: %d", res.StatusCode)
res.Body.Close()
waitDur *= 2
time.Sleep(waitDur)
continue
}
body, err := io.ReadAll(res.Body)
if err != nil {
lastErr = fmt.Errorf("could not read body response %v", err)
res.Body.Close()
break
}
t.Logf("Response body: %s", string(body))
break
}
if lastErr != nil {
t.Fatalf("Failed trigger rerun: %v", lastErr)
}
}
jobToRerun := getLatestJob(t, jobName, nil)
rerunJobConfig = fmt.Sprintf(rerunJobConfigTemplate, jobName, "bar")
if err := updateJobConfig(context.Background(), kubeClient, rerunJobConfigFile, []byte(rerunJobConfig)); err != nil {
t.Fatalf("Failed update job config: %v", err)
}
var passed bool
// It may take some time for the new ProwJob to show up, so we will
// check every 30s interval three times for it to appear
latestRun := jobToRerun
for i := 0; i < 3; i++ {
time.Sleep(30 * time.Second)
rerun(t, jobToRerun.Name, "latest")
if latestRun = getLatestJob(t, jobName, &latestRun.CreationTimestamp); latestRun.Labels["foo"] == "bar" {
passed = true
break
}
}
if !passed {
t.Fatal("Expected updated job.")
// Wait for the first job to be created by horologium.
initialJob := getLatestJob(t, jobName, nil)

// Update the job configuration with a new label.
redeployJobConfig(fmt.Sprintf(rerunJobConfigTemplate, jobName, "bar"))

// Rerun the job using the latest config.
rerunJob(t, ctx, initialJob.Name, "latest")

// Wait until the desired ProwJob shows up.
latestJob := getLatestJob(t, jobName, &initialJob.CreationTimestamp)
if latestJob.Labels["foo"] != "bar" {
t.Fatalf("Failed waiting for ProwJob %q using latest config with foo=bar.", jobName)
}

// Prevent Deck from being too fast and recreating the new job in the same second
// as the previous one.
time.Sleep(1 * time.Second)

// Deck scheduled job from latest configuration, rerun with "original"
// should still go with original configuration.
rerun(t, jobToRerun.Name, "original")
if latestRun := getLatestJob(t, jobName, &latestRun.CreationTimestamp); latestRun.Labels["foo"] != "foo" {
t.Fatalf("Job label mismatch. Want: 'foo', got: '%s'", latestRun.Labels["foo"])
rerunJob(t, ctx, initialJob.Name, "original")

originalJob := getLatestJob(t, jobName, &latestJob.CreationTimestamp)
if originalJob.Labels["foo"] != "foo" {
t.Fatalf("Failed waiting for ProwJob %q using original config with foo=foo.", jobName)
}
}

func rerunJob(t *testing.T, ctx context.Context, jobName string, mode string) {
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://localhost/rerun?mode=%v&prowjob=%v", mode, jobName), nil)
if err != nil {
t.Fatalf("Could not generate a request %v", err)
}

// Deck might not be fully ready yet, so we must retry.
if err := wait.PollUntilContextTimeout(ctx, time.Second, 10*time.Second, true, func(ctx context.Context) (bool, error) {
res, err := http.DefaultClient.Do(req)
if err != nil {
return false, fmt.Errorf("could not make post request: %w", err)
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
t.Logf("Failed to read response body: %v", err)
return false, nil
}
t.Logf("Response body: %s", string(body))

return res.StatusCode == http.StatusOK, nil
}); err != nil {
t.Fatalf("Failed to rerun job %q with %s config: %v", jobName, mode, err)
}
}

Expand Down
6 changes: 2 additions & 4 deletions test/integration/test/horologium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ func TestLaunchProwJob(t *testing.T) {
// not kubelet, according to
// https://github.com/kubernetes/kubernetes/issues/30189 kubelet syncs
// configmap updates on existing pods every minute, which is a long wait.
// The proposed fix in the issue was updating the deployment, which imo
// should be better handled by just refreshing pods.
// So here comes forcing restart of horologium pods.
if err := refreshProwPods(kubeClient, context.Background(), "horologium"); err != nil {
// It's quicker to rollout the affected Deployments.
if err := rolloutDeployment(t, context.Background(), kubeClient, "horologium"); err != nil {
t.Fatalf("Failed refreshing horologium pods: %v", err)
}

Expand Down
57 changes: 48 additions & 9 deletions test/integration/test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import (
"context"
"crypto/rand"
"crypto/sha256"
"errors"
"flag"
"fmt"
"io"
"sync"
"testing"
"time"

appsv1 "k8s.io/api/apps/v1"
coreapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
Expand Down Expand Up @@ -106,20 +110,55 @@ func getPodLogs(clientset *kubernetes.Clientset, namespace, podName string, opts
return str, nil
}

func refreshProwPods(client ctrlruntimeclient.Client, ctx context.Context, name string) error {
func rolloutDeployment(t *testing.T, ctx context.Context, client ctrlruntimeclient.Client, name string) error {
prowComponentsMux.Lock()
defer prowComponentsMux.Unlock()

var pods coreapi.PodList
labels, _ := labels.Parse("app = " + name)
if err := client.List(ctx, &pods, &ctrlruntimeclient.ListOptions{LabelSelector: labels}); err != nil {
return err
var depl appsv1.Deployment
if err := client.Get(ctx, types.NamespacedName{Name: name, Namespace: defaultNamespace}, &depl); err != nil {
return fmt.Errorf("failed to get Deployment: %w", err)
}

if replicas := depl.Spec.Replicas; replicas == nil || *replicas < 1 {
return errors.New("cannot restart a Deployment with zero replicas.")
}

labels := depl.Spec.Template.Labels
if labels == nil {
// This should never happen.
labels = map[string]string{}
}
labels["restart"] = RandomString(t)

t.Logf("Restarting %s...", name)
if err := client.Update(ctx, &depl); err != nil {
return fmt.Errorf("failed to update Deployment: %w", err)
}
for _, pod := range pods.Items {
if err := client.Delete(ctx, &pod); err != nil {
return err

timeout := 30 * time.Second
if err := wait.PollUntilContextTimeout(ctx, time.Second, timeout, false, func(ctx context.Context) (bool, error) {
var current appsv1.Deployment
if err := client.Get(ctx, ctrlruntimeclient.ObjectKeyFromObject(&depl), &current); err != nil {
return false, fmt.Errorf("failed to get current Deployment: %w", err)
}

replicas := current.Spec.Replicas
if replicas == nil || *replicas < 1 {
// This should never happen.
return false, errors.New("Deployment has no replicas defined")
}

ready := true &&
current.Status.AvailableReplicas == *replicas &&
current.Status.ReadyReplicas == *replicas &&
current.Status.UpdatedReplicas == *replicas &&
current.Status.UnavailableReplicas == 0

return ready, nil
}); err != nil {
return fmt.Errorf("Deployment did not fully roll out after %v: %w", timeout, err)
}

return nil
}

Expand Down

0 comments on commit 9102251

Please sign in to comment.