Skip to content

Commit

Permalink
rewrite flaky broker_deleted_recreated (#4175)
Browse files Browse the repository at this point in the history
* rewrite flaky broker_deleted_recreated

* style

* style

* style

* lint

* lint
  • Loading branch information
maschmid authored Jan 9, 2025
1 parent 9e811ba commit d90c382
Show file tree
Hide file tree
Showing 2 changed files with 336 additions and 2 deletions.
87 changes: 85 additions & 2 deletions test/rekt/features/broker_deleted_recreated.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@
package features

import (
"context"
"math"
"strconv"

cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
kafkabroker "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/test/e2e_new/bogus_config"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub/assert"

"knative.dev/pkg/system"

Expand All @@ -36,15 +46,88 @@ import (
brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
)

func compose(steps ...feature.StepFn) feature.StepFn {
return func(ctx context.Context, t feature.T) {
for _, s := range steps {
s(ctx, t)
}
}
}

// BrokerDeletedRecreated tests that when a broker and trigger is deleted and re-created, the original sink will eventually stop receiving events
func BrokerDeletedRecreated() *feature.Feature {
f := feature.NewFeatureNamed("broker deleted and recreated")

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")

f.Setup("test broker", featuressteps.BrokerSmokeTest(brokerName, triggerName))
sink1 := feature.MakeRandomK8sName("asink")
sink2 := feature.MakeRandomK8sName("bsink")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

eventMatchers := []cetest.EventMatcher{
cetest.HasId(event.ID()),
cetest.HasSource(event.Source()),
cetest.HasType(event.Type()),
cetest.HasSubject(event.Subject()),
}

backoffPolicy := eventingduck.BackoffPolicyLinear

f.Setup("test broker", compose(
eventshub.Install(sink1, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink1), ""),
),
trigger.IsReady(triggerName),
eventshub.Install(
feature.MakeRandomK8sName("source"),
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.AddSequence,
eventshub.InputEvent(event),
// We want to send 1 event/s until the timeout
func(ctx context.Context, envs map[string]string) error {
_, timeout := environment.PollTimingsFromContext(ctx)
envs["PERIOD"] = "1" // in seconds
envs["MAX_MESSAGES"] = strconv.Itoa(int(math.Ceil(timeout.Seconds())))
return nil
},
),
assert.OnStore(sink1).MatchEvent(eventMatchers...).AtLeast(1),
))

f.Requirement("delete broker", featuressteps.DeleteBroker(brokerName))
f.Assert("test broker after deletion", featuressteps.BrokerSmokeTest(brokerName, triggerName))
f.Assert("test broker after deletion", compose(
eventshub.Install(sink2, eventshub.StartReceiver),
broker.Install(brokerName, broker.WithEnvConfig()...),
broker.IsReady(brokerName),
trigger.Install(
triggerName,
trigger.WithBrokerName(brokerName),
trigger.WithRetry(3, &backoffPolicy, ptr.To("PT1S")),
trigger.WithSubscriber(service.AsKReference(sink2), ""),
),
trigger.IsReady(triggerName),
// We need to check both that
// 1. sink1 eventually stops receiving new events
// 2. sink2 eventually starts receiving all events
// therefore, we check that eventually, the last few events sent (16 for no particular reason) are all received by the sink2 only
// and contain an uninterrupted (without any missing sequence numbers) source sequence as sent by the source with eventshub.AddSequence
EventSequenceOnStores(sink1, sink2).
MatchingReceived(eventMatchers...). // ... when ...
OrderedBySourceSequence(). // ..., and taken the ...
LastN(16). // ... events, the sequence...
ContainsOnlyEventsObservedBy(sink2). // ...and...
IsAnUninterruptedSourceSequence().
Eventually(),
))

return f
}
Expand Down
251 changes: 251 additions & 0 deletions test/rekt/features/sequence_assertions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright 2024 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"cmp"
"context"
"fmt"

"knative.dev/reconciler-test/pkg/eventshub/assert"

"slices"

cetest "github.com/cloudevents/sdk-go/v2/test"
types2 "github.com/cloudevents/sdk-go/v2/types"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
)

type sequenceTransformationOrAssertion func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error)

type SequenceAssertionBuilder struct {
storeNames []string
matchers []eventshub.EventInfoMatcherCtx
transformsOrAssertions []sequenceTransformationOrAssertion
}

func getEventInfoSourceSequenceNumber(eventInfo eventshub.EventInfo) (int32, error) {
sequenceExtension, ok := eventInfo.Event.Extensions()["sequence"]
if !ok {
return 0, fmt.Errorf("event does not contain a sequence extension: %s", eventInfo.String())
}

sequenceNumber, err := types2.ToInteger(sequenceExtension)
if err != nil {
return 0, fmt.Errorf("event \"sequence\" extension value %q is not a number: %s", sequenceExtension, eventInfo.String())
}

return sequenceNumber, nil
}

func orderBySourceSequence(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
sorted := slices.Clone(events)

var conversionError error

slices.SortFunc(sorted, func(a, b eventshub.EventInfo) int {
var err error
var an, bn int32
an, err = getEventInfoSourceSequenceNumber(a)
if err != nil {
conversionError = err
return 0
}

bn, err = getEventInfoSourceSequenceNumber(b)
if err != nil {
conversionError = err
return 0
}

return cmp.Compare(an, bn)
})

if conversionError != nil {
return nil, conversionError
}

return sorted, nil
}

func reverse(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
reversed := slices.Clone(events)
slices.Reverse(reversed)
return reversed, nil
}

func firstN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[:n], nil
}

func lastN(n int, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) < n {
return nil, fmt.Errorf("expected at least %d events, got %d", n, len(events))
}

return events[len(events)-n:], nil
}

// EventSequenceOnStores starts an assertion about sequence of events received by the given named stores
// The assertions are specially designed for checking sequences as generated by sources with eventshub.AddSequence
func EventSequenceOnStores(names ...string) SequenceAssertionBuilder {
return SequenceAssertionBuilder{
storeNames: names,
}
}

func (b SequenceAssertionBuilder) MatchingReceived(matchers ...cetest.EventMatcher) SequenceAssertionBuilder {
b.matchers = append(b.matchers, assert.MatchKind(eventshub.EventReceived).WithContext())
b.matchers = append(b.matchers, assert.MatchEvent(matchers...).WithContext())
return b
}

func (b SequenceAssertionBuilder) Reversed() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, reverse)
return b
}

func (b SequenceAssertionBuilder) OrderedBySourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, orderBySourceSequence)
return b
}

func (b SequenceAssertionBuilder) FirstN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return firstN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) LastN(n int) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
return lastN(n, events)
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnly(matcher eventshub.EventInfoMatcher) SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
for _, event := range events {
err := matcher(event)
if err != nil {
return nil, err
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) ContainsOnlyEventsObservedBy(observerName string) SequenceAssertionBuilder {
return b.ContainsOnly(func(info eventshub.EventInfo) error {
if info.Observer != observerName {
return fmt.Errorf("expected observer to be %s, got %s", observerName, info.Observer)
}
return nil
})
}

func (b SequenceAssertionBuilder) IsAnUninterruptedSourceSequence() SequenceAssertionBuilder {
b.transformsOrAssertions = append(b.transformsOrAssertions, func(ctx context.Context, events []eventshub.EventInfo) ([]eventshub.EventInfo, error) {
if len(events) == 0 {
return nil, fmt.Errorf("no events received")
}

firstSequenceNumber, err := getEventInfoSourceSequenceNumber(events[0])
if err != nil {
return nil, err
}

expectedSequenceNumber := firstSequenceNumber - 1
for _, event := range events {
expectedSequenceNumber++

sequenceNumber, err := getEventInfoSourceSequenceNumber(event)
if err != nil {
return nil, err
}

if sequenceNumber != expectedSequenceNumber {
return nil, fmt.Errorf("expected sequence number %d, got %d", expectedSequenceNumber, sequenceNumber)
}
}

return events, nil
})
return b
}

func (b SequenceAssertionBuilder) Eventually() feature.StepFn {
return func(ctx context.Context, t feature.T) {
retryInterval, retryTimeout := environment.PollTimingsFromContext(ctx)

var internalErr error

err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(ctx context.Context) (bool, error) {
events := make([]eventshub.EventInfo, 0)
for _, storeName := range b.storeNames {
store := eventshub.StoreFromContext(ctx, storeName)

storeEvents, _, _, err := store.Find(func(info eventshub.EventInfo) error {
for _, matcher := range b.matchers {
err := matcher.WithContext(ctx)(info)
if err != nil {
return err
}
}
return nil
})

if err != nil {
internalErr = err
return false, nil
}

events = append(events, storeEvents...)
}

for _, transformOrAssertion := range b.transformsOrAssertions {
var err error
events, err = transformOrAssertion(ctx, events)
if err != nil {
internalErr = err
return false, nil
}
}

internalErr = nil
return true, nil
})

if internalErr != nil {
t.Fatal(internalErr)
}

if err != nil {
t.Fatal(err)
}
}
}

0 comments on commit d90c382

Please sign in to comment.