-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: RangeQuery goroutine leak #15665
fix: RangeQuery goroutine leak #15665
Conversation
Yes that's correct what do you think of doing instead this
Technically the context will be done because the main function returns. |
IMO there are two problems:
diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go
index c6fba0fbf4..490a63150d 100644
--- a/pkg/querier/queryrange/downstreamer.go
+++ b/pkg/querier/queryrange/downstreamer.go
@@ -170,18 +170,19 @@ func (in instance) For(
go func() {
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
res, err := fn(queries[i])
+ if err != nil {
+ return err
+ }
response := logql.Resp{
I: i,
Res: res,
- Err: err,
}
-
// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
case ch <- response:
}
- return err
+ return nil
})
if err != nil {
ch <- logql.Resp{
@@ -192,15 +193,15 @@ func (in instance) For(
close(ch)
}()
+ var err error
for resp := range ch {
- if resp.Err != nil {
- return nil, resp.Err
- }
- if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil {
- return nil, err
+ if err != nil || resp.Err != nil {
+ err = resp.Err
+ continue
}
+ err = acc.Accumulate(ctx, resp.Res, resp.I)
}
- return acc.Result(), nil
+ return acc.Result(), err
}
// convert to matrix |
@cyriltovena @chaudum thanks for your review. i modified these code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
You're right. Both errors need to be checked separately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGMT. Thanks!
every time https://github.com/grafana/loki/blob/main/pkg/querier/queryrange/downstreamer.go#L197 this function return. there is no receiver for this channel.
https://github.com/grafana/loki/blob/main/pkg/querier/queryrange/downstreamer.go#L187. but there still have some error send to this channel. which result in goroutine leakage.
we can get this leakage through pprof/goroutine