Skip to content

Commit

Permalink
* Fixed goroutine leak on failed execute call in query client
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Dec 23, 2024
1 parent 2855451 commit 698aab1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed goroutine leak on failed execute call in query client

## v3.95.4
* Fixed connections pool leak on closing sessions
* Fixed an error in logging session deletion events
Expand Down
12 changes: 10 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,22 @@ func execute(
return nil, xerrors.WithStackTrace(err)
}

executeCtx := xcontext.ValueOnly(ctx)
executeCtx, executeCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer func() {
if finalErr != nil {
executeCancel()
}
}()

stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

r, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
r, err := newResult(ctx, stream, append(opts,
withStatsCallback(settings.StatsCallback()),
withOnClose(executeCancel),
)...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down
44 changes: 31 additions & 13 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ type (
}
streamResult struct {
stream Ydb_Query_V1.QueryService_ExecuteQueryClient
closeOnce func()
close func()
lastPart *Ydb_Query.ExecuteQueryResponsePart
resultSetIndex int64
closed chan struct{}
trace *trace.Query
statsCallback func(queryStats stats.QueryStats)
onClose []func()
onNextPartErr []func(err error)
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
}
Expand Down Expand Up @@ -98,6 +99,12 @@ func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption
}
}

func withOnClose(onClose func()) resultOption {
return func(s *streamResult) {
s.onClose = append(s.onClose, onClose)
}
}

func onNextPartErr(callback func(err error)) resultOption {
return func(s *streamResult) {
s.onNextPartErr = append(s.onNextPartErr, callback)
Expand All @@ -115,22 +122,33 @@ func newResult(
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
opts ...resultOption,
) (_ *streamResult, finalErr error) {
r := streamResult{
stream: stream,
closed: make(chan struct{}),
resultSetIndex: -1,
}
r.closeOnce = sync.OnceFunc(func() {
close(r.closed)
r.stream = nil
})
var (
closed = make(chan struct{})
r = streamResult{
stream: stream,
onClose: []func(){
func() {
close(closed)
},
},
closed: closed,
resultSetIndex: -1,
}
)

for _, opt := range opts {
if opt != nil {
opt(&r)
}
}

r.close = sync.OnceFunc(func() {
for _, onClose := range r.onClose {
onClose()
}
r.stream = nil
})

if r.trace != nil {
onDone := trace.QueryOnResultNew(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.newResult"),
Expand Down Expand Up @@ -177,7 +195,7 @@ func (r *streamResult) nextPart(ctx context.Context) (
default:
part, err = nextPart(r.stream)
if err != nil {
r.closeOnce()
r.close()

for _, callback := range r.onNextPartErr {
callback(err)
Expand Down Expand Up @@ -208,7 +226,7 @@ func nextPart(stream Ydb_Query_V1.QueryService_ExecuteQueryClient) (
}

func (r *streamResult) Close(ctx context.Context) (finalErr error) {
defer r.closeOnce()
defer r.close()

if r.trace != nil {
onDone := trace.QueryOnResultClose(r.trace, &ctx,
Expand Down Expand Up @@ -261,7 +279,7 @@ func (r *streamResult) nextResultSet(ctx context.Context) (_ *resultSet, err err
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
}
if part.GetResultSetIndex() < r.resultSetIndex {
r.closeOnce()
r.close()
if part.GetResultSetIndex() <= 0 && r.resultSetIndex > 0 {
return nil, xerrors.WithStackTrace(io.EOF)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/query/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func TestResultNextResultSet(t *testing.T) {
require.EqualValues(t, 1, rs.rowIndex)
}
t.Log("explicit interrupt stream")
r.closeOnce()
r.close()
{
t.Log("next (row=3)")
_, err := rs.nextRow(context.Background())
Expand Down

0 comments on commit 698aab1

Please sign in to comment.