-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest-limits): Add GRPC service to read per tenant stream limits from kafka #15668
Conversation
6ca29f9
to
1e9e2df
Compare
💻 Deploy preview deleted. |
2515c70
to
9b37696
Compare
pkg/kafka/limits/ingest_limits.go
Outdated
metadata: make(map[string]map[uint64]int64), | ||
} | ||
|
||
if cfg.IngestLimits.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to wrap the call to NewIngestLimits
with cfg.IngestLimits.Enabled
instead of put it in NewIngestLimits
? It feels weird to return a partial initialized struct. Is that a pattern we use elsewhere?
pkg/kafka/limits/ingest_limits.go
Outdated
// the metadata map. If Kafka is not enabled, it simply waits for context cancellation. | ||
// The method also starts a goroutine to periodically evict old streams from the metadata map. | ||
func (s *IngestLimits) running(ctx context.Context) error { | ||
if !s.cfg.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is also kind of odd?
@@ -358,6 +359,7 @@ type Loki struct { | |||
tenantConfigs *runtime.TenantConfigs | |||
TenantLimits validation.TenantLimits | |||
distributor *distributor.Distributor | |||
ingestLimits *limits.IngestLimits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, it's because it's registered as a module we have that weird initialization issue?
@@ -382,6 +384,29 @@ func (t *Loki) initDistributor() (services.Service, error) { | |||
return t.distributor, nil | |||
} | |||
|
|||
func (t *Loki) initIngestLimits() (services.Service, error) { | |||
if !t.Cfg.KafkaConfig.IngestLimits.Enabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems not, we have a second guard here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answering here for the above. The guards above are obsolete and predate the initIngestLimits
, so keep this one here and remove the above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answering here for the above. The guards above are obsolete and predate the
initIngestLimits
, so keeping this one here and remove the above.
What this PR does / why we need it:
Adds basic service to read per stream metadata from the separate topic (introduced by #15648) and serve them per tenant for other services (e.g. distributors, ingest-limits-frontend). The present implementation returns the count of recorded streams from the metadata topic over the configured window size (i.e. defaults to
1m
). In addition pods of this service use a round-robin balance to read from partitions, which effectively means that duplicate data are possible in case of rebalancing events.Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
lastSeenAt
field as introduced in feat(distributor): Add stream metadata writes to separate topic #15648 in favor of the record's timestamp.tenantID -> streamHash -> record.Timestamp
and protected via a single RW mutex. If we see contention later on we could consider striping the locks over tenants?!? AFAICS we would fetch the limits per distributor request or introduce a periodic recheck to have less contention !?!ServeHTTP
handler is a temporarily introduced to inspect the service when developing. This can be removed once we have theingest-limits-frontend
calling the service and the distributor enforcing limits.Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR