From 5d1304845307085b14ee92d74648a35e7ae1e3ea Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 15:52:32 +0300 Subject: [PATCH 01/14] processing of panics in retriers --- retry/sql.go | 12 +--------- retry/sql_test.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/retry/sql.go b/retry/sql.go index 29ddeb6b2..9b6aab209 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -211,17 +211,7 @@ func DoTxWithResult[T any](ctx context.Context, db *sql.DB, //nolint:funlen return zeroValue, unwrapErrBadConn(xerrors.WithStackTrace(err)) } defer func() { - if finalErr == nil { - return - } - errRollback := tx.Rollback() - if errRollback == nil { - return - } - finalErr = xerrors.NewWithIssues("", - xerrors.WithStackTrace(finalErr), - xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)), - ) + _ = tx.Rollback() }() v, err := op(xcontext.MarkRetryCall(ctx), tx) if err != nil { diff --git a/retry/sql_test.go b/retry/sql_test.go index 4f42d06dd..432e4f167 100644 --- a/retry/sql_test.go +++ b/retry/sql_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "database/sql/driver" + "errors" "strconv" "testing" "time" @@ -233,3 +234,61 @@ func TestDoTx(t *testing.T) { }) } } + +func TestCleanUpResourcesOnPanicInRetryOperation(t *testing.T) { + panicErr := errors.New("test") + t.Run("Do", func(t *testing.T) { + m := &mockConnector{ + t: t, + } + db := sql.OpenDB(m) + defer func() { + require.NoError(t, db.Close()) + }() + require.Panics(t, func() { + require.Equal(t, 0, db.Stats().OpenConnections) + require.Equal(t, 0, db.Stats().Idle) + require.Equal(t, 0, db.Stats().InUse) + defer func() { + require.Equal(t, 1, db.Stats().OpenConnections) + require.Equal(t, 1, db.Stats().Idle) + require.Equal(t, 0, db.Stats().InUse) + }() + _ = Do(context.Background(), db, + func(ctx context.Context, cc *sql.Conn) error { + require.Equal(t, 1, db.Stats().OpenConnections) + require.Equal(t, 0, db.Stats().Idle) + require.Equal(t, 1, db.Stats().InUse) + panic(panicErr) + }, + ) + }) + }) + t.Run("DoTx", func(t *testing.T) { + m := &mockConnector{ + t: t, + } + db := sql.OpenDB(m) + defer func() { + require.NoError(t, db.Close()) + }() + require.Panics(t, func() { + require.Equal(t, 0, db.Stats().OpenConnections) + require.Equal(t, 0, db.Stats().Idle) + require.Equal(t, 0, db.Stats().InUse) + defer func() { + require.Equal(t, 1, db.Stats().OpenConnections) + require.Equal(t, 1, db.Stats().Idle) + require.Equal(t, 0, db.Stats().InUse) + }() + _ = DoTx(context.Background(), db, + func(ctx context.Context, tx *sql.Tx) error { + require.Equal(t, 1, db.Stats().OpenConnections) + require.Equal(t, 0, db.Stats().Idle) + require.Equal(t, 1, db.Stats().InUse) + panic(panicErr) + }, + ) + }) + }) +} From 9b813be0599aa17d1fe4a3948ae1c88d1d9b2c62 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 16:16:54 +0300 Subject: [PATCH 02/14] fix linter issue --- retry/sql.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/retry/sql.go b/retry/sql.go index 9b6aab209..c9a357f54 100644 --- a/retry/sql.go +++ b/retry/sql.go @@ -173,7 +173,7 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err // DoTxWithResult is a retryer of database/sql transactions with fallbacks on errors // // Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental -func DoTxWithResult[T any](ctx context.Context, db *sql.DB, //nolint:funlen +func DoTxWithResult[T any](ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) (T, error), opts ...doTxOption, ) (T, error) { From 1d838c9b7259af3d1003a2e89dd4ad2ba16087cf Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 16:37:09 +0300 Subject: [PATCH 03/14] refactored query stats: implemetation instead interface --- internal/query/execute_query.go | 2 +- internal/query/options/execute.go | 8 +-- internal/query/options/execute_script.go | 2 +- internal/query/result.go | 4 +- internal/query/result_test.go | 32 +++++----- internal/stats/query.go | 75 ++++++++---------------- internal/table/scanner/result.go | 2 +- internal/xsql/propose/conn.go | 2 +- query/example_test.go | 18 +++--- query/session.go | 2 +- table/result/result.go | 4 +- 11 files changed, 65 insertions(+), 86 deletions(-) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 577d7f8d7..58fe1d61a 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -24,7 +24,7 @@ import ( type executeSettings interface { ExecMode() options.ExecMode StatsMode() options.StatsMode - StatsCallback() func(stats stats.QueryStats) + StatsCallback() func(stats *stats.QueryStats) TxControl() *query.TransactionControl Syntax() options.Syntax Params() params.Parameters diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index c066e701c..cce65f4af 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -32,7 +32,7 @@ type ( execMode ExecMode statsMode StatsMode resourcePool string - statsCallback func(queryStats stats.QueryStats) + statsCallback func(queryStats *stats.QueryStats) callOptions []grpc.CallOption txControl *tx.Control retryOptions []retry.Option @@ -59,7 +59,7 @@ type ( syntaxOption = Syntax statsModeOption struct { mode StatsMode - callback func(stats.QueryStats) + callback func(*stats.QueryStats) } execModeOption = ExecMode responsePartLimitBytes int64 @@ -73,7 +73,7 @@ func (s *executeSettings) RetryOpts() []retry.Option { return s.retryOptions } -func (s *executeSettings) StatsCallback() func(stats.QueryStats) { +func (s *executeSettings) StatsCallback() func(*stats.QueryStats) { return s.statsCallback } @@ -225,7 +225,7 @@ func (opt statsModeOption) applyExecuteOption(s *executeSettings) { s.statsCallback = opt.callback } -func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOption { +func WithStatsMode(mode StatsMode, callback func(*stats.QueryStats)) statsModeOption { return statsModeOption{ mode: mode, callback: callback, diff --git a/internal/query/options/execute_script.go b/internal/query/options/execute_script.go index 4ca9a6dd3..86fc3af0c 100644 --- a/internal/query/options/execute_script.go +++ b/internal/query/options/execute_script.go @@ -34,7 +34,7 @@ type ( Query string } Mode ExecMode - Stats stats.QueryStats + Stats *stats.QueryStats ResultSetsMeta []struct { Columns []struct { Name string diff --git a/internal/query/result.go b/internal/query/result.go index c361484c0..895d56d04 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -38,7 +38,7 @@ type ( resultSetIndex int64 closed chan struct{} trace *trace.Query - statsCallback func(queryStats stats.QueryStats) + statsCallback func(queryStats *stats.QueryStats) onClose []func() onNextPartErr []func(err error) onTxMeta []func(txMeta *Ydb_Query.TransactionMeta) @@ -93,7 +93,7 @@ func withTrace(t *trace.Query) resultOption { } } -func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption { +func withStatsCallback(callback func(queryStats *stats.QueryStats)) resultOption { return func(s *streamResult) { s.statsCallback = callback } diff --git a/internal/query/result_test.go b/internal/query/result_test.go index 98bfb41da..af27a0737 100644 --- a/internal/query/result_test.go +++ b/internal/query/result_test.go @@ -1926,8 +1926,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2287,8 +2287,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2649,8 +2649,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2986,8 +2986,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3358,8 +3358,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3719,8 +3719,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4081,8 +4081,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4418,8 +4418,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { + var s *stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { s = queryStats })) require.NoError(t, err) diff --git a/internal/stats/query.go b/internal/stats/query.go index 66964d5b2..3b35beefe 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -7,29 +7,6 @@ import ( ) type ( - // QueryStats holds query execution statistics. - QueryStats interface { - ProcessCPUTime() time.Duration - Compilation() (c *CompilationStats) - QueryPlan() string - QueryAST() string - TotalCPUTime() time.Duration - TotalDuration() time.Duration - - // NextPhase returns next execution phase within query. - // If ok flag is false, then there are no more phases and p is invalid. - NextPhase() (p QueryPhase, ok bool) - } - // QueryPhase holds query execution phase statistics. - QueryPhase interface { - // NextTableAccess returns next accessed table within query execution phase. - // If ok flag is false, then there are no more accessed tables and t is invalid. - NextTableAccess() (t *TableAccess, ok bool) - Duration() time.Duration - CPUTime() time.Duration - AffectedShards() uint64 - IsLiteralPhase() bool - } OperationStats struct { Rows uint64 Bytes uint64 @@ -55,13 +32,13 @@ type ( Duration time.Duration CPUTime time.Duration } - // queryStats holds query execution statistics. - queryStats struct { + // QueryStats holds query execution statistics. + QueryStats struct { pb *Ydb_TableStats.QueryStats pos int } - // queryPhase holds query execution phase statistics. - queryPhase struct { + // QueryPhase holds query execution phase statistics. + QueryPhase struct { pb *Ydb_TableStats.QueryPhaseStats pos int } @@ -86,33 +63,33 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats { } } -func (s *queryStats) ProcessCPUTime() time.Duration { +func (s *QueryStats) ProcessCPUTime() time.Duration { return fromUs(s.pb.GetProcessCpuTimeUs()) } -func (s *queryStats) Compilation() (c *CompilationStats) { +func (s *QueryStats) Compilation() (c *CompilationStats) { return fromCompilationStats(s.pb.GetCompilation()) } -func (s *queryStats) QueryPlan() string { +func (s *QueryStats) QueryPlan() string { return s.pb.GetQueryPlan() } -func (s *queryStats) QueryAST() string { +func (s *QueryStats) QueryAST() string { return s.pb.GetQueryAst() } -func (s *queryStats) TotalCPUTime() time.Duration { +func (s *QueryStats) TotalCPUTime() time.Duration { return fromUs(s.pb.GetTotalCpuTimeUs()) } -func (s *queryStats) TotalDuration() time.Duration { +func (s *QueryStats) TotalDuration() time.Duration { return fromUs(s.pb.GetTotalDurationUs()) } // NextPhase returns next execution phase within query. // If ok flag is false, then there are no more phases and p is invalid. -func (s *queryStats) NextPhase() (p QueryPhase, ok bool) { +func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { if s.pos >= len(s.pb.GetQueryPhases()) { return } @@ -122,7 +99,7 @@ func (s *queryStats) NextPhase() (p QueryPhase, ok bool) { } s.pos++ - return &queryPhase{ + return QueryPhase{ pb: pb, }, true } @@ -131,12 +108,12 @@ func (s *queryStats) NextPhase() (p QueryPhase, ok bool) { // // If ok flag is false, then there are no more accessed tables and t is // invalid. -func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) { - if queryPhase.pos >= len(queryPhase.pb.GetTableAccess()) { +func (QueryPhase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { + if QueryPhase.pos >= len(QueryPhase.pb.GetTableAccess()) { return } - pb := queryPhase.pb.GetTableAccess()[queryPhase.pos] - queryPhase.pos++ + pb := QueryPhase.pb.GetTableAccess()[QueryPhase.pos] + QueryPhase.pos++ return &TableAccess{ Name: pb.GetName(), @@ -147,28 +124,28 @@ func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) { }, true } -func (queryPhase *queryPhase) Duration() time.Duration { - return fromUs(queryPhase.pb.GetDurationUs()) +func (QueryPhase *QueryPhase) Duration() time.Duration { + return fromUs(QueryPhase.pb.GetDurationUs()) } -func (queryPhase *queryPhase) CPUTime() time.Duration { - return fromUs(queryPhase.pb.GetCpuTimeUs()) +func (QueryPhase *QueryPhase) CPUTime() time.Duration { + return fromUs(QueryPhase.pb.GetCpuTimeUs()) } -func (queryPhase *queryPhase) AffectedShards() uint64 { - return queryPhase.pb.GetAffectedShards() +func (QueryPhase *QueryPhase) AffectedShards() uint64 { + return QueryPhase.pb.GetAffectedShards() } -func (queryPhase *queryPhase) IsLiteralPhase() bool { - return queryPhase.pb.GetLiteralPhase() +func (QueryPhase *QueryPhase) IsLiteralPhase() bool { + return QueryPhase.pb.GetLiteralPhase() } -func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats { +func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats { if pb == nil { return nil } - return &queryStats{ + return &QueryStats{ pb: pb, } } diff --git a/internal/table/scanner/result.go b/internal/table/scanner/result.go index e2368b91f..97aacb69a 100644 --- a/internal/table/scanner/result.go +++ b/internal/table/scanner/result.go @@ -220,7 +220,7 @@ func (r *baseResult) CurrentResultSet() result.Set { } // Stats returns query execution queryStats. -func (r *baseResult) Stats() stats.QueryStats { +func (r *baseResult) Stats() *stats.QueryStats { r.statsMtx.RLock() defer r.statsMtx.RUnlock() diff --git a/internal/xsql/propose/conn.go b/internal/xsql/propose/conn.go index cc98f1052..5be558428 100644 --- a/internal/xsql/propose/conn.go +++ b/internal/xsql/propose/conn.go @@ -82,7 +82,7 @@ func (c *Conn) Explain(ctx context.Context, sql string, _ *params.Params) (ast s _, err := c.session.Query( ctx, sql, options.WithExecMode(options.ExecModeExplain), - options.WithStatsMode(options.StatsModeNone, func(stats stats.QueryStats) { + options.WithStatsMode(options.StatsModeNone, func(stats *stats.QueryStats) { ast = stats.QueryAST() plan = stats.QueryPlan() }), diff --git a/query/example_test.go b/query/example_test.go index ad58dfe6f..3bb1699cf 100644 --- a/query/example_test.go +++ b/query/example_test.go @@ -246,7 +246,7 @@ func Example_resultStats() { id int32 // required value myStr string // required value ) - var stats query.Stats + var s query.Stats // Do retry operation on errors with best effort row, err := db.Query().QueryRow(ctx, // context manage exiting from Do `SELECT CAST($id AS Uint64) AS id, CAST($myStr AS Text) AS myStr`, @@ -256,8 +256,8 @@ func Example_resultStats() { Param("$myStr").Text("123"). Build(), ), - query.WithStatsMode(query.StatsModeFull, func(s query.Stats) { - stats = s + query.WithStatsMode(query.StatsModeFull, func(stats query.Stats) { + s = stats }), query.WithIdempotent(), ) @@ -275,14 +275,14 @@ func Example_resultStats() { fmt.Printf("id=%v, myStr='%s'\n", id, myStr) fmt.Println("Stats:") - fmt.Printf("- Compilation='%v'\n", stats.Compilation()) - fmt.Printf("- TotalCPUTime='%v'\n", stats.TotalCPUTime()) - fmt.Printf("- ProcessCPUTime='%v'\n", stats.ProcessCPUTime()) - fmt.Printf("- QueryAST='%v'\n", stats.QueryAST()) - fmt.Printf("- QueryPlan='%v'\n", stats.QueryPlan()) + fmt.Printf("- Compilation='%v'\n", s.Compilation()) + fmt.Printf("- TotalCPUTime='%v'\n", s.TotalCPUTime()) + fmt.Printf("- ProcessCPUTime='%v'\n", s.ProcessCPUTime()) + fmt.Printf("- QueryAST='%v'\n", s.QueryAST()) + fmt.Printf("- QueryPlan='%v'\n", s.QueryPlan()) fmt.Println("- Phases:") for { - phase, ok := stats.NextPhase() + phase, ok := s.NextPhase() if !ok { break } diff --git a/query/session.go b/query/session.go index 1891c159c..fe4e9b872 100644 --- a/query/session.go +++ b/query/session.go @@ -18,5 +18,5 @@ type ( Begin(ctx context.Context, txSettings TransactionSettings) (Transaction, error) } - Stats = stats.QueryStats + Stats = *stats.QueryStats ) diff --git a/table/result/result.go b/table/result/result.go index e82860f3d..96c464178 100644 --- a/table/result/result.go +++ b/table/result/result.go @@ -108,7 +108,7 @@ type BaseResult interface { // Stats returns query execution QueryStats. // // If query result have no stats - returns nil - Stats() (s stats.QueryStats) + Stats() (s Stats) // Err return scanner error // To handle errors, do not need to check after scanning each row @@ -130,3 +130,5 @@ type Result interface { type StreamResult interface { BaseResult } + +type Stats = *stats.QueryStats From 0d3f84a48e69bb7764067a7b3275250e57439eab Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 16:39:02 +0300 Subject: [PATCH 04/14] CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b52f2f157..aaa040bf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Refactored query stats for table-service result and query-service result: implementation instead interface + ## v3.96.2 * Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns` From 7347733767875c8c0bae86d2a4afae64c889a197 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 17:51:02 +0300 Subject: [PATCH 05/14] added iterator API over query phases --- internal/stats/query.go | 15 ++++++++ internal/stats/query_go1.23_test.go | 55 +++++++++++++++++++++++++++++ internal/xiter/xiter.go | 5 ++- internal/xiter/xiter_go1.23.go | 5 ++- 4 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 internal/stats/query_go1.23_test.go diff --git a/internal/stats/query.go b/internal/stats/query.go index 3b35beefe..90070a40a 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -4,6 +4,8 @@ import ( "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" ) type ( @@ -104,6 +106,19 @@ func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { }, true } +func (s *QueryStats) QueryPhases() xiter.Seq[QueryPhase] { + return func(yield func(p QueryPhase) bool) { + for _, pb := range s.pb.GetQueryPhases() { + cont := yield(QueryPhase{ + pb: pb, + }) + if !cont { + return + } + } + } +} + // NextTableAccess returns next accessed table within query execution phase. // // If ok flag is false, then there are no more accessed tables and t is diff --git a/internal/stats/query_go1.23_test.go b/internal/stats/query_go1.23_test.go new file mode 100644 index 000000000..bc00a8011 --- /dev/null +++ b/internal/stats/query_go1.23_test.go @@ -0,0 +1,55 @@ +//go:build go1.23 + +package stats + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" +) + +func TestIterateOverQueryPhases(t *testing.T) { + s := &QueryStats{ + pb: &Ydb_TableStats.QueryStats{ + QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ + { + DurationUs: 1, + }, + { + DurationUs: 2, + }, + { + DurationUs: 3, + }, + }, + }, + } + t.Run("ImmutableIteration", func(t *testing.T) { + for i := range make([]struct{}, 3) { + t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) { + durations := make([]uint64, 0, 3) + for phase := range s.QueryPhases() { + durations = append(durations, phase.pb.GetDurationUs()) + } + require.Equal(t, []uint64{1, 2, 3}, durations) + }) + } + }) + t.Run("MutableIteration", func(t *testing.T) { + durations := make([]uint64, 0, 3) + for { + phase, ok := s.NextPhase() + if !ok { + break + } + durations = append(durations, phase.pb.GetDurationUs()) + } + require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, 3, s.pos) + + _, ok := s.NextPhase() + require.False(t, ok) + }) +} diff --git a/internal/xiter/xiter.go b/internal/xiter/xiter.go index d2fc0ab5d..1a8d5d521 100644 --- a/internal/xiter/xiter.go +++ b/internal/xiter/xiter.go @@ -2,4 +2,7 @@ package xiter -type Seq2[K, V any] func(yield func(K, V) bool) +type ( + Seq[T any] func(yield func(T) bool) + Seq2[K, V any] func(yield func(K, V) bool) +) diff --git a/internal/xiter/xiter_go1.23.go b/internal/xiter/xiter_go1.23.go index 77a2b2adc..12253f9cf 100644 --- a/internal/xiter/xiter_go1.23.go +++ b/internal/xiter/xiter_go1.23.go @@ -6,4 +6,7 @@ import ( "iter" ) -type Seq2[K, V any] iter.Seq2[K, V] +type ( + Seq[T any] iter.Seq[T] + Seq2[K, V any] iter.Seq2[K, V] +) From b8f25371f4594d0d530f7d88679ba84039874fc5 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:06:07 +0300 Subject: [PATCH 06/14] Added into query stats the range iterators for iterate over query phases and table access --- internal/stats/query.go | 43 +++++++++++++++++-------- internal/stats/query_go1.23_test.go | 49 ++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/internal/stats/query.go b/internal/stats/query.go index 90070a40a..185df614d 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -106,7 +106,7 @@ func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { }, true } -func (s *QueryStats) QueryPhases() xiter.Seq[QueryPhase] { +func (s *QueryStats) RangeQueryPhases() xiter.Seq[QueryPhase] { return func(yield func(p QueryPhase) bool) { for _, pb := range s.pb.GetQueryPhases() { cont := yield(QueryPhase{ @@ -123,12 +123,12 @@ func (s *QueryStats) QueryPhases() xiter.Seq[QueryPhase] { // // If ok flag is false, then there are no more accessed tables and t is // invalid. -func (QueryPhase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { - if QueryPhase.pos >= len(QueryPhase.pb.GetTableAccess()) { +func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { + if phase.pos >= len(phase.pb.GetTableAccess()) { return } - pb := QueryPhase.pb.GetTableAccess()[QueryPhase.pos] - QueryPhase.pos++ + pb := phase.pb.GetTableAccess()[phase.pos] + phase.pos++ return &TableAccess{ Name: pb.GetName(), @@ -139,20 +139,37 @@ func (QueryPhase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { }, true } -func (QueryPhase *QueryPhase) Duration() time.Duration { - return fromUs(QueryPhase.pb.GetDurationUs()) +func (phase *QueryPhase) RangeTableAccess() xiter.Seq[*TableAccess] { + return func(yield func(access *TableAccess) bool) { + for _, pb := range phase.pb.GetTableAccess() { + cont := yield(&TableAccess{ + Name: pb.GetName(), + Reads: fromOperationStats(pb.GetReads()), + Updates: fromOperationStats(pb.GetUpdates()), + Deletes: fromOperationStats(pb.GetDeletes()), + PartitionsCount: pb.GetPartitionsCount(), + }) + if !cont { + return + } + } + } +} + +func (phase *QueryPhase) Duration() time.Duration { + return fromUs(phase.pb.GetDurationUs()) } -func (QueryPhase *QueryPhase) CPUTime() time.Duration { - return fromUs(QueryPhase.pb.GetCpuTimeUs()) +func (phase *QueryPhase) CPUTime() time.Duration { + return fromUs(phase.pb.GetCpuTimeUs()) } -func (QueryPhase *QueryPhase) AffectedShards() uint64 { - return QueryPhase.pb.GetAffectedShards() +func (phase *QueryPhase) AffectedShards() uint64 { + return phase.pb.GetAffectedShards() } -func (QueryPhase *QueryPhase) IsLiteralPhase() bool { - return QueryPhase.pb.GetLiteralPhase() +func (phase *QueryPhase) IsLiteralPhase() bool { + return phase.pb.GetLiteralPhase() } func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats { diff --git a/internal/stats/query_go1.23_test.go b/internal/stats/query_go1.23_test.go index bc00a8011..1e7036f1d 100644 --- a/internal/stats/query_go1.23_test.go +++ b/internal/stats/query_go1.23_test.go @@ -16,12 +16,45 @@ func TestIterateOverQueryPhases(t *testing.T) { QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ { DurationUs: 1, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "c", + }, + }, }, { DurationUs: 2, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "d", + }, + { + Name: "e", + }, + { + Name: "f", + }, + }, }, { DurationUs: 3, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "g", + }, + { + Name: "h", + }, + { + Name: "i", + }, + }, }, }, }, @@ -30,23 +63,37 @@ func TestIterateOverQueryPhases(t *testing.T) { for i := range make([]struct{}, 3) { t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) { durations := make([]uint64, 0, 3) - for phase := range s.QueryPhases() { + tables := make([]string, 0, 9) + for phase := range s.RangeQueryPhases() { durations = append(durations, phase.pb.GetDurationUs()) + for access := range phase.RangeTableAccess() { + tables = append(tables, access.Name) + } } require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) }) } }) t.Run("MutableIteration", func(t *testing.T) { durations := make([]uint64, 0, 3) + tables := make([]string, 0, 9) for { phase, ok := s.NextPhase() if !ok { break } durations = append(durations, phase.pb.GetDurationUs()) + for { + access, ok := phase.NextTableAccess() + if !ok { + break + } + tables = append(tables, access.Name) + } } require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) require.Equal(t, 3, s.pos) _, ok := s.NextPhase() From c8f737c27686ce39b2bcb119853d4b8bd37df734 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:07:46 +0300 Subject: [PATCH 07/14] CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index aaa040bf2..1214b6c4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ * Refactored query stats for table-service result and query-service result: implementation instead interface +* Added into query stats the range iterators for iterate over query phases and table access ## v3.96.2 * Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns` From 62cd2d62e95d6c4783a11072833120948c49847a Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:26:38 +0300 Subject: [PATCH 08/14] rolled back changes --- CHANGELOG.md | 1 - internal/stats/query.go | 32 -------------------------------- table/result/result.go | 4 +--- 3 files changed, 1 insertion(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1214b6c4e..3cd42509a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,3 @@ -* Refactored query stats for table-service result and query-service result: implementation instead interface * Added into query stats the range iterators for iterate over query phases and table access ## v3.96.2 diff --git a/internal/stats/query.go b/internal/stats/query.go index 185df614d..fc9eb4f0f 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -4,8 +4,6 @@ import ( "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" ) type ( @@ -106,19 +104,6 @@ func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { }, true } -func (s *QueryStats) RangeQueryPhases() xiter.Seq[QueryPhase] { - return func(yield func(p QueryPhase) bool) { - for _, pb := range s.pb.GetQueryPhases() { - cont := yield(QueryPhase{ - pb: pb, - }) - if !cont { - return - } - } - } -} - // NextTableAccess returns next accessed table within query execution phase. // // If ok flag is false, then there are no more accessed tables and t is @@ -139,23 +124,6 @@ func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { }, true } -func (phase *QueryPhase) RangeTableAccess() xiter.Seq[*TableAccess] { - return func(yield func(access *TableAccess) bool) { - for _, pb := range phase.pb.GetTableAccess() { - cont := yield(&TableAccess{ - Name: pb.GetName(), - Reads: fromOperationStats(pb.GetReads()), - Updates: fromOperationStats(pb.GetUpdates()), - Deletes: fromOperationStats(pb.GetDeletes()), - PartitionsCount: pb.GetPartitionsCount(), - }) - if !cont { - return - } - } - } -} - func (phase *QueryPhase) Duration() time.Duration { return fromUs(phase.pb.GetDurationUs()) } diff --git a/table/result/result.go b/table/result/result.go index 96c464178..e82860f3d 100644 --- a/table/result/result.go +++ b/table/result/result.go @@ -108,7 +108,7 @@ type BaseResult interface { // Stats returns query execution QueryStats. // // If query result have no stats - returns nil - Stats() (s Stats) + Stats() (s stats.QueryStats) // Err return scanner error // To handle errors, do not need to check after scanning each row @@ -130,5 +130,3 @@ type Result interface { type StreamResult interface { BaseResult } - -type Stats = *stats.QueryStats From bb89f1e2dcad1c944ecd1555398e12d136df954f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:37:51 +0300 Subject: [PATCH 09/14] rolled back changes --- internal/stats/query.go | 115 +++++++++++++++++++++------- internal/stats/query_go1.23_test.go | 96 ++++++++++++----------- query/session.go | 2 +- 3 files changed, 135 insertions(+), 78 deletions(-) diff --git a/internal/stats/query.go b/internal/stats/query.go index fc9eb4f0f..8292cbf57 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -1,12 +1,41 @@ package stats import ( + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" ) type ( + // QueryStats holds query execution statistics. + QueryStats interface { + ProcessCPUTime() time.Duration + Compilation() (c *CompilationStats) + QueryPlan() string + QueryAST() string + TotalCPUTime() time.Duration + TotalDuration() time.Duration + + // NextPhase returns next execution phase within query. + // If ok flag is false, then there are no more phases and p is invalid. + NextPhase() (p QueryPhase, ok bool) + + // QueryPhases is a range iterator over query phases. + QueryPhases() xiter.Seq[QueryPhase] + } + // QueryPhase holds query execution phase statistics. + QueryPhase interface { + // NextTableAccess returns next accessed table within query execution phase. + // If ok flag is false, then there are no more accessed tables and t is invalid. + NextTableAccess() (t *TableAccess, ok bool) + // TableAccess is a range iterator over query execution phase's accessed tables. + TableAccess() xiter.Seq[*TableAccess] + Duration() time.Duration + CPUTime() time.Duration + AffectedShards() uint64 + IsLiteralPhase() bool + } OperationStats struct { Rows uint64 Bytes uint64 @@ -32,13 +61,13 @@ type ( Duration time.Duration CPUTime time.Duration } - // QueryStats holds query execution statistics. - QueryStats struct { + // queryStats holds query execution statistics. + queryStats struct { pb *Ydb_TableStats.QueryStats pos int } - // QueryPhase holds query execution phase statistics. - QueryPhase struct { + // queryPhase holds query execution phase statistics. + queryPhase struct { pb *Ydb_TableStats.QueryPhaseStats pos int } @@ -63,52 +92,65 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats { } } -func (s *QueryStats) ProcessCPUTime() time.Duration { - return fromUs(s.pb.GetProcessCpuTimeUs()) +func (stats *queryStats) ProcessCPUTime() time.Duration { + return fromUs(stats.pb.GetProcessCpuTimeUs()) } -func (s *QueryStats) Compilation() (c *CompilationStats) { - return fromCompilationStats(s.pb.GetCompilation()) +func (stats *queryStats) Compilation() (c *CompilationStats) { + return fromCompilationStats(stats.pb.GetCompilation()) } -func (s *QueryStats) QueryPlan() string { - return s.pb.GetQueryPlan() +func (stats *queryStats) QueryPlan() string { + return stats.pb.GetQueryPlan() } -func (s *QueryStats) QueryAST() string { - return s.pb.GetQueryAst() +func (stats *queryStats) QueryAST() string { + return stats.pb.GetQueryAst() } -func (s *QueryStats) TotalCPUTime() time.Duration { - return fromUs(s.pb.GetTotalCpuTimeUs()) +func (stats *queryStats) TotalCPUTime() time.Duration { + return fromUs(stats.pb.GetTotalCpuTimeUs()) } -func (s *QueryStats) TotalDuration() time.Duration { - return fromUs(s.pb.GetTotalDurationUs()) +func (stats *queryStats) TotalDuration() time.Duration { + return fromUs(stats.pb.GetTotalDurationUs()) } // NextPhase returns next execution phase within query. // If ok flag is false, then there are no more phases and p is invalid. -func (s *QueryStats) NextPhase() (p QueryPhase, ok bool) { - if s.pos >= len(s.pb.GetQueryPhases()) { +func (stats *queryStats) NextPhase() (p QueryPhase, ok bool) { + if stats.pos >= len(stats.pb.GetQueryPhases()) { return } - pb := s.pb.GetQueryPhases()[s.pos] + pb := stats.pb.GetQueryPhases()[stats.pos] if pb == nil { return } - s.pos++ + stats.pos++ - return QueryPhase{ + return &queryPhase{ pb: pb, }, true } +func (stats *queryStats) QueryPhases() xiter.Seq[QueryPhase] { + return func(yield func(p QueryPhase) bool) { + for _, pb := range stats.pb.GetQueryPhases() { + cont := yield(&queryPhase{ + pb: pb, + }) + if !cont { + return + } + } + } +} + // NextTableAccess returns next accessed table within query execution phase. // // If ok flag is false, then there are no more accessed tables and t is // invalid. -func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { +func (phase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) { if phase.pos >= len(phase.pb.GetTableAccess()) { return } @@ -124,28 +166,45 @@ func (phase *QueryPhase) NextTableAccess() (t *TableAccess, ok bool) { }, true } -func (phase *QueryPhase) Duration() time.Duration { +func (phase *queryPhase) TableAccess() xiter.Seq[*TableAccess] { + return func(yield func(access *TableAccess) bool) { + for _, pb := range phase.pb.GetTableAccess() { + cont := yield(&TableAccess{ + Name: pb.GetName(), + Reads: fromOperationStats(pb.GetReads()), + Updates: fromOperationStats(pb.GetUpdates()), + Deletes: fromOperationStats(pb.GetDeletes()), + PartitionsCount: pb.GetPartitionsCount(), + }) + if !cont { + return + } + } + } +} + +func (phase *queryPhase) Duration() time.Duration { return fromUs(phase.pb.GetDurationUs()) } -func (phase *QueryPhase) CPUTime() time.Duration { +func (phase *queryPhase) CPUTime() time.Duration { return fromUs(phase.pb.GetCpuTimeUs()) } -func (phase *QueryPhase) AffectedShards() uint64 { +func (phase *queryPhase) AffectedShards() uint64 { return phase.pb.GetAffectedShards() } -func (phase *QueryPhase) IsLiteralPhase() bool { +func (phase *queryPhase) IsLiteralPhase() bool { return phase.pb.GetLiteralPhase() } -func FromQueryStats(pb *Ydb_TableStats.QueryStats) *QueryStats { +func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats { if pb == nil { return nil } - return &QueryStats{ + return &queryStats{ pb: pb, } } diff --git a/internal/stats/query_go1.23_test.go b/internal/stats/query_go1.23_test.go index 1e7036f1d..ba13045e4 100644 --- a/internal/stats/query_go1.23_test.go +++ b/internal/stats/query_go1.23_test.go @@ -5,85 +5,84 @@ package stats import ( "fmt" "testing" + "time" "github.com/stretchr/testify/require" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" ) func TestIterateOverQueryPhases(t *testing.T) { - s := &QueryStats{ - pb: &Ydb_TableStats.QueryStats{ - QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ - { - DurationUs: 1, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "a", - }, - { - Name: "b", - }, - { - Name: "c", - }, + s := FromQueryStats(&Ydb_TableStats.QueryStats{ + QueryPhases: []*Ydb_TableStats.QueryPhaseStats{ + { + DurationUs: 1, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "a", + }, + { + Name: "b", + }, + { + Name: "c", }, }, - { - DurationUs: 2, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "d", - }, - { - Name: "e", - }, - { - Name: "f", - }, + }, + { + DurationUs: 2, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "d", + }, + { + Name: "e", + }, + { + Name: "f", }, }, - { - DurationUs: 3, - TableAccess: []*Ydb_TableStats.TableAccessStats{ - { - Name: "g", - }, - { - Name: "h", - }, - { - Name: "i", - }, + }, + { + DurationUs: 3, + TableAccess: []*Ydb_TableStats.TableAccessStats{ + { + Name: "g", + }, + { + Name: "h", + }, + { + Name: "i", }, }, }, }, - } + }) t.Run("ImmutableIteration", func(t *testing.T) { for i := range make([]struct{}, 3) { t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) { - durations := make([]uint64, 0, 3) + durations := make([]time.Duration, 0, 3) tables := make([]string, 0, 9) - for phase := range s.RangeQueryPhases() { - durations = append(durations, phase.pb.GetDurationUs()) - for access := range phase.RangeTableAccess() { + for phase := range s.QueryPhases() { + durations = append(durations, phase.Duration()) + for access := range phase.TableAccess() { tables = append(tables, access.Name) } } - require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []time.Duration{1000, 2000, 3000}, durations) require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) }) } }) t.Run("MutableIteration", func(t *testing.T) { - durations := make([]uint64, 0, 3) + durations := make([]time.Duration, 0, 3) tables := make([]string, 0, 9) for { phase, ok := s.NextPhase() if !ok { break } - durations = append(durations, phase.pb.GetDurationUs()) + durations = append(durations, phase.Duration()) for { access, ok := phase.NextTableAccess() if !ok { @@ -92,9 +91,8 @@ func TestIterateOverQueryPhases(t *testing.T) { tables = append(tables, access.Name) } } - require.Equal(t, []uint64{1, 2, 3}, durations) + require.Equal(t, []time.Duration{1000, 2000, 3000}, durations) require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables) - require.Equal(t, 3, s.pos) _, ok := s.NextPhase() require.False(t, ok) diff --git a/query/session.go b/query/session.go index fe4e9b872..1891c159c 100644 --- a/query/session.go +++ b/query/session.go @@ -18,5 +18,5 @@ type ( Begin(ctx context.Context, txSettings TransactionSettings) (Transaction, error) } - Stats = *stats.QueryStats + Stats = stats.QueryStats ) From 013bf31df57ec4a765b32817f5d1b6a0efcea0a3 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:44:09 +0300 Subject: [PATCH 10/14] Apply suggestions from code review --- internal/query/execute_query.go | 2 +- internal/query/options/execute.go | 8 +++--- internal/query/options/execute_script.go | 2 +- internal/query/result.go | 4 +-- internal/query/result_test.go | 32 ++++++++++++------------ internal/table/scanner/result.go | 2 +- internal/xsql/propose/conn.go | 2 +- query/example_test.go | 18 ++++++------- 8 files changed, 35 insertions(+), 35 deletions(-) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index 58fe1d61a..577d7f8d7 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -24,7 +24,7 @@ import ( type executeSettings interface { ExecMode() options.ExecMode StatsMode() options.StatsMode - StatsCallback() func(stats *stats.QueryStats) + StatsCallback() func(stats stats.QueryStats) TxControl() *query.TransactionControl Syntax() options.Syntax Params() params.Parameters diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index cce65f4af..c066e701c 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -32,7 +32,7 @@ type ( execMode ExecMode statsMode StatsMode resourcePool string - statsCallback func(queryStats *stats.QueryStats) + statsCallback func(queryStats stats.QueryStats) callOptions []grpc.CallOption txControl *tx.Control retryOptions []retry.Option @@ -59,7 +59,7 @@ type ( syntaxOption = Syntax statsModeOption struct { mode StatsMode - callback func(*stats.QueryStats) + callback func(stats.QueryStats) } execModeOption = ExecMode responsePartLimitBytes int64 @@ -73,7 +73,7 @@ func (s *executeSettings) RetryOpts() []retry.Option { return s.retryOptions } -func (s *executeSettings) StatsCallback() func(*stats.QueryStats) { +func (s *executeSettings) StatsCallback() func(stats.QueryStats) { return s.statsCallback } @@ -225,7 +225,7 @@ func (opt statsModeOption) applyExecuteOption(s *executeSettings) { s.statsCallback = opt.callback } -func WithStatsMode(mode StatsMode, callback func(*stats.QueryStats)) statsModeOption { +func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOption { return statsModeOption{ mode: mode, callback: callback, diff --git a/internal/query/options/execute_script.go b/internal/query/options/execute_script.go index 86fc3af0c..4ca9a6dd3 100644 --- a/internal/query/options/execute_script.go +++ b/internal/query/options/execute_script.go @@ -34,7 +34,7 @@ type ( Query string } Mode ExecMode - Stats *stats.QueryStats + Stats stats.QueryStats ResultSetsMeta []struct { Columns []struct { Name string diff --git a/internal/query/result.go b/internal/query/result.go index 895d56d04..c361484c0 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -38,7 +38,7 @@ type ( resultSetIndex int64 closed chan struct{} trace *trace.Query - statsCallback func(queryStats *stats.QueryStats) + statsCallback func(queryStats stats.QueryStats) onClose []func() onNextPartErr []func(err error) onTxMeta []func(txMeta *Ydb_Query.TransactionMeta) @@ -93,7 +93,7 @@ func withTrace(t *trace.Query) resultOption { } } -func withStatsCallback(callback func(queryStats *stats.QueryStats)) resultOption { +func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption { return func(s *streamResult) { s.statsCallback = callback } diff --git a/internal/query/result_test.go b/internal/query/result_test.go index af27a0737..98bfb41da 100644 --- a/internal/query/result_test.go +++ b/internal/query/result_test.go @@ -1926,8 +1926,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2287,8 +2287,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2649,8 +2649,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -2986,8 +2986,8 @@ func TestResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3358,8 +3358,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -3719,8 +3719,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4081,8 +4081,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) @@ -4418,8 +4418,8 @@ func TestMaterializedResultStats(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - var s *stats.QueryStats - result, err := newResult(ctx, stream, withStatsCallback(func(queryStats *stats.QueryStats) { + var s stats.QueryStats + result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) { s = queryStats })) require.NoError(t, err) diff --git a/internal/table/scanner/result.go b/internal/table/scanner/result.go index 97aacb69a..e2368b91f 100644 --- a/internal/table/scanner/result.go +++ b/internal/table/scanner/result.go @@ -220,7 +220,7 @@ func (r *baseResult) CurrentResultSet() result.Set { } // Stats returns query execution queryStats. -func (r *baseResult) Stats() *stats.QueryStats { +func (r *baseResult) Stats() stats.QueryStats { r.statsMtx.RLock() defer r.statsMtx.RUnlock() diff --git a/internal/xsql/propose/conn.go b/internal/xsql/propose/conn.go index 5be558428..cc98f1052 100644 --- a/internal/xsql/propose/conn.go +++ b/internal/xsql/propose/conn.go @@ -82,7 +82,7 @@ func (c *Conn) Explain(ctx context.Context, sql string, _ *params.Params) (ast s _, err := c.session.Query( ctx, sql, options.WithExecMode(options.ExecModeExplain), - options.WithStatsMode(options.StatsModeNone, func(stats *stats.QueryStats) { + options.WithStatsMode(options.StatsModeNone, func(stats stats.QueryStats) { ast = stats.QueryAST() plan = stats.QueryPlan() }), diff --git a/query/example_test.go b/query/example_test.go index 3bb1699cf..ad58dfe6f 100644 --- a/query/example_test.go +++ b/query/example_test.go @@ -246,7 +246,7 @@ func Example_resultStats() { id int32 // required value myStr string // required value ) - var s query.Stats + var stats query.Stats // Do retry operation on errors with best effort row, err := db.Query().QueryRow(ctx, // context manage exiting from Do `SELECT CAST($id AS Uint64) AS id, CAST($myStr AS Text) AS myStr`, @@ -256,8 +256,8 @@ func Example_resultStats() { Param("$myStr").Text("123"). Build(), ), - query.WithStatsMode(query.StatsModeFull, func(stats query.Stats) { - s = stats + query.WithStatsMode(query.StatsModeFull, func(s query.Stats) { + stats = s }), query.WithIdempotent(), ) @@ -275,14 +275,14 @@ func Example_resultStats() { fmt.Printf("id=%v, myStr='%s'\n", id, myStr) fmt.Println("Stats:") - fmt.Printf("- Compilation='%v'\n", s.Compilation()) - fmt.Printf("- TotalCPUTime='%v'\n", s.TotalCPUTime()) - fmt.Printf("- ProcessCPUTime='%v'\n", s.ProcessCPUTime()) - fmt.Printf("- QueryAST='%v'\n", s.QueryAST()) - fmt.Printf("- QueryPlan='%v'\n", s.QueryPlan()) + fmt.Printf("- Compilation='%v'\n", stats.Compilation()) + fmt.Printf("- TotalCPUTime='%v'\n", stats.TotalCPUTime()) + fmt.Printf("- ProcessCPUTime='%v'\n", stats.ProcessCPUTime()) + fmt.Printf("- QueryAST='%v'\n", stats.QueryAST()) + fmt.Printf("- QueryPlan='%v'\n", stats.QueryPlan()) fmt.Println("- Phases:") for { - phase, ok := s.NextPhase() + phase, ok := stats.NextPhase() if !ok { break } From 0460525f1a5a5114d034d70a27dcf0eb270403bd Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:45:00 +0300 Subject: [PATCH 11/14] Apply suggestions from code review --- internal/stats/query.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/stats/query.go b/internal/stats/query.go index 8292cbf57..c3908b89e 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -1,10 +1,11 @@ package stats import ( - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" ) type ( From a3735fd8f2974906d62f1db942d92edf02dfdd4d Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:55:09 +0300 Subject: [PATCH 12/14] minifix --- CHANGELOG.md | 2 +- internal/stats/query.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cd42509a..c64e52d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added into query stats the range iterators for iterate over query phases and table access +* Added into query stats the immutable range iterators from go1.23 for iterate over query phases and accessed tables ## v3.96.2 * Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns` diff --git a/internal/stats/query.go b/internal/stats/query.go index c3908b89e..f9fae2f25 100644 --- a/internal/stats/query.go +++ b/internal/stats/query.go @@ -4,7 +4,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats" - + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter" ) From 812ea6705245a2cae43d5b68a780ac4e7c6458f7 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Thu, 16 Jan 2025 18:59:06 +0300 Subject: [PATCH 13/14] Apply suggestions from code review --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c64e52d00..29a2f6402 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added into query stats the immutable range iterators from go1.23 for iterate over query phases and accessed tables +* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation ## v3.96.2 * Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns` From 0dc77f56a14b938be1c04fe1e646bd4183d26a1b Mon Sep 17 00:00:00 2001 From: robot Date: Thu, 16 Jan 2025 16:15:05 +0000 Subject: [PATCH 14/14] Release v3.97.0 --- CHANGELOG.md | 1 + internal/version/version.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29a2f6402..24dcf08e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +## v3.97.0 * Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation ## v3.96.2 diff --git a/internal/version/version.go b/internal/version/version.go index 64a526540..88c107b96 100644 --- a/internal/version/version.go +++ b/internal/version/version.go @@ -2,8 +2,8 @@ package version const ( Major = "3" - Minor = "96" - Patch = "2" + Minor = "97" + Patch = "0" Package = "ydb-go-sdk" )