Skip to content

Commit

Permalink
Record expected and ready replicas for consumer group resource
Browse files Browse the repository at this point in the history
Understanding how KEDA scales up and down Knative Kafka
resources is critical to debugging any performance problem
that the autoscaler might introduce, so we need to expose new
metrics that record mutations to Knative resources done by KEDA.

`kafka_controller_consumer_group_expected_replicas` records number
of expected replicas for a given Kafka consumer group resource.

`kafka_controller_consumer_group_ready_replicas` records number
of ready replicas for a given Kafka consumer group resource.

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi committed Oct 10, 2023
1 parent 13ce94b commit 9d1c673
Showing 1 changed file with 74 additions and 23 deletions.
97 changes: 74 additions & 23 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,45 @@ var (
// initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)

expectedReplicasNum = stats.Int64("consumer_group_expected_replicas", "Number of expected consumer group replicas", stats.UnitDimensionless)
expectedReplicasGauge = view.LastValue()

readyReplicasNum = stats.Int64("consumer_group_ready_replicas", "Number of ready consumer group replicas", stats.UnitDimensionless)
readyReplicasGauge = view.LastValue()
)

var (
ConsumerNameTagKey = tag.MustNewKey("consumer_name")
ConsumerKindTagKey = tag.MustNewKey("consumer_kind")
)

func init() {
views := []*view.View{
{
Description: "Latency of consumer group schedule operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
TagKeys: []tag.Key{controller.NamespaceTagKey, ConsumerNameTagKey, ConsumerKindTagKey},
Measure: scheduleLatencyStat,
Aggregation: scheduleDistribution,
},
{
Description: "Latency of consumer group offsets initialization operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
TagKeys: []tag.Key{controller.NamespaceTagKey, ConsumerNameTagKey, ConsumerKindTagKey},
Measure: initializeOffsetsLatencyStat,
Aggregation: initializeOffsetsDistribution,
},
{
Description: "Number of expected consumer group replicas",
TagKeys: []tag.Key{controller.NamespaceTagKey, ConsumerNameTagKey, ConsumerKindTagKey},
Measure: expectedReplicasNum,
Aggregation: expectedReplicasGauge,
},
{
Description: "Number of expected consumer group replicas",
TagKeys: []tag.Key{controller.NamespaceTagKey, ConsumerNameTagKey, ConsumerKindTagKey},
Measure: readyReplicasNum,
Aggregation: readyReplicasGauge,
},
}
if err := view.Register(views...); err != nil {
panic(err)
Expand Down Expand Up @@ -153,6 +176,8 @@ type Reconciler struct {
}

func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
recordExpectedReplicasMetric(ctx, cg)

if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -486,6 +511,8 @@ func (r *Reconciler) propagateStatus(ctx context.Context, cg *kafkainternals.Con
}
cg.Status.Replicas = pointer.Int32(count)

recordReadyReplicasMetric(ctx, cg)

if cg.Spec.Replicas != nil && *cg.Spec.Replicas == 0 {
subscriber, err := r.Resolver.URIFromDestinationV1(ctx, cg.Spec.Template.Spec.Subscriber, cg)
if err != nil {
Expand Down Expand Up @@ -759,31 +786,55 @@ var (
)

func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
ctx, err := metricTagsOf(ctx, cg)
if err != nil {
return
}
metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}

func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}
ctx, err := metricTagsOf(ctx, cg)
if err != nil {
return
}
metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}

func recordExpectedReplicasMetric(ctx context.Context, cg *kafkainternals.ConsumerGroup) {
ctx, err := metricTagsOf(ctx, cg)
if err != nil {
return
}

r := int32(0)
if cg.Spec.Replicas != nil {
r = *cg.Spec.Replicas
}
metrics.Record(ctx, expectedReplicasNum.M(int64(r)))
}

func recordReadyReplicasMetric(ctx context.Context, cg *kafkainternals.ConsumerGroup) {
ctx, err := metricTagsOf(ctx, cg)
if err != nil {
return
}

r := int32(0)
if cg.Status.Replicas != nil {
r = *cg.Status.Replicas
}
metrics.Record(ctx, readyReplicasNum.M(int64(r)))
}

metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
func metricTagsOf(ctx context.Context, cg *kafkainternals.ConsumerGroup) (context.Context, error) {
uf := cg.GetUserFacingResourceRef()
return tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
tag.Insert(ConsumerNameTagKey, uf.Name),
tag.Insert(ConsumerKindTagKey, uf.Kind),
)
}

func keyOf(cg metav1.Object) string {
Expand Down

0 comments on commit 9d1c673

Please sign in to comment.