Skip to content

Commit

Permalink
Upgrade dependencies (#4189)
Browse files Browse the repository at this point in the history
* upgrade to latest dependencies

bumping knative.dev/eventing ef6b31a...7c97e6f:
  > 7c97e6f Schduler: MAXFILLUP strategy will spread vreplicas across multiple pods (# 8263)

Signed-off-by: Knative Automation <[email protected]>

* Fix consumer group build error and remove unused configs

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Additional logging for sacura tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Increase nodes

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Additional logging

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Test knative/eventing#8388

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Apply patch

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Increase sacura receiver timeout

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Point to real eventing dep

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Tmp: Shorter processing time

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix data race in watch and log resources

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Knative Automation <[email protected]>
Signed-off-by: Pierangelo Di Pilato <[email protected]>
Co-authored-by: Knative Automation <[email protected]>
  • Loading branch information
pierDipi and knative-automation authored Jan 7, 2025
1 parent db85e46 commit 2cde7b9
Show file tree
Hide file tree
Showing 174 changed files with 6,932 additions and 3,266 deletions.

This file was deleted.

This file was deleted.

86 changes: 8 additions & 78 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package consumergroup

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

v1 "k8s.io/client-go/informers/core/v1"

"github.com/kelseyhightower/envconfig"
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -45,7 +43,6 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -93,11 +90,9 @@ type envConfig struct {
}

type SchedulerConfig struct {
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
SchedulerPolicy *scheduler.SchedulerPolicy
DeSchedulerPolicy *scheduler.SchedulerPolicy
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
}

func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
Expand All @@ -109,10 +104,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
}

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap),
DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap),
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
}

dispatcherPodInformer := podinformer.Get(ctx, internalsapi.DispatcherLabelSelectorStr)
Expand Down Expand Up @@ -332,11 +325,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, ssName string,
return createStatefulSetScheduler(
ctx,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
SchedulerPolicy: c.SchedulerPolicy,
DeSchedulerPolicy: c.DeSchedulerPolicy,
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
},
func() ([]scheduler.VPod, error) {
consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName)))
Expand Down Expand Up @@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod},
PodCapacity: c.Capacity,
RefreshPeriod: c.RefreshPeriod,
SchedulerPolicy: scheduler.MAXFILLUP,
SchedPolicy: c.SchedulerPolicy,
DeschedPolicy: c.DeSchedulerPolicy,
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: dispatcherPodInformer.Lister().Pods(system.Namespace()),
})

Expand All @@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, c SchedulerConfig, lister s
SchedulerConfig: c,
}
}

// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy {
p, err := schedulerPolicyFromConfigMap(ctx, configMapName)
if err != nil {
logging.FromContext(ctx).Fatal(zap.Error(err))
}
return p
}

// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) {
policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err)
}

logger := logging.FromContext(ctx).
Desugar().
With(zap.String("configmap", configMapName))
policy := &scheduler.SchedulerPolicy{}

preds, found := policyConfigMap.Data["predicates"]
if !found {
return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName)
}
if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

priors, found := policyConfigMap.Data["priorities"]
if !found {
return nil, fmt.Errorf("missing policy config map value at key priorities")
}
if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

if errs := validatePolicy(policy); errs != nil {
return nil, multierr.Combine(err)
}

logger.Info("Schedulers policy registration", zap.Any("policy", policy))

return policy, nil
}

func validatePolicy(policy *scheduler.SchedulerPolicy) []error {
var validationErrors []error

for _, priority := range policy.Priorities {
if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight {
validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name))
}
}
return validationErrors
}
26 changes: 13 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module knative.dev/eventing-kafka-broker

go 1.22.0
go 1.22.7

require (
github.com/IBM/sarama v1.43.3
Expand Down Expand Up @@ -28,17 +28,17 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.35.1
google.golang.org/protobuf v1.35.2
k8s.io/api v0.30.3
k8s.io/apiextensions-apiserver v0.30.3
k8s.io/apimachinery v0.30.3
k8s.io/apiserver v0.30.3
k8s.io/client-go v0.30.3
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
knative.dev/eventing v0.43.1-0.20241028083747-ef6b31a697e7
knative.dev/hack v0.0.0-20241025103803-ef6e7e983a60
knative.dev/pkg v0.0.0-20241026180704-25f6002b00f3
knative.dev/reconciler-test v0.0.0-20241024141702-aae114c1c0e3
knative.dev/eventing v0.43.1-0.20241223131119-c9047a198255
knative.dev/hack v0.0.0-20241227080210-e92a16ae0893
knative.dev/pkg v0.0.0-20241223131119-4c901591eb4a
knative.dev/reconciler-test v0.0.0-20241223131247-96258bea6ce4
sigs.k8s.io/controller-runtime v0.12.3
sigs.k8s.io/yaml v1.4.0
)
Expand Down Expand Up @@ -115,20 +115,20 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/tools v0.27.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/api v0.183.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.68.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 2cde7b9

Please sign in to comment.