Skip to content

Commit

Permalink
Merge branch 'master' into pool-for-encoders
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisEvd authored Jan 17, 2025
2 parents 7864cdf + 0dc77f5 commit 34ce284
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 45 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
* Added the use of sync.Pool in EncoderMap
* Added the use of sync.Pool in EncoderMap and renamed it to MultiEncoder

## v3.97.0
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation

## v3.96.2
* Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns`
Expand Down
93 changes: 65 additions & 28 deletions internal/stats/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
)

type (
Expand All @@ -19,12 +21,17 @@ type (
// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
NextPhase() (p QueryPhase, ok bool)

// QueryPhases is a range iterator over query phases.
QueryPhases() xiter.Seq[QueryPhase]
}
// QueryPhase holds query execution phase statistics.
QueryPhase interface {
// NextTableAccess returns next accessed table within query execution phase.
// If ok flag is false, then there are no more accessed tables and t is invalid.
NextTableAccess() (t *TableAccess, ok bool)
// TableAccess is a range iterator over query execution phase's accessed tables.
TableAccess() xiter.Seq[*TableAccess]
Duration() time.Duration
CPUTime() time.Duration
AffectedShards() uint64
Expand Down Expand Up @@ -86,57 +93,70 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats {
}
}

func (s *queryStats) ProcessCPUTime() time.Duration {
return fromUs(s.pb.GetProcessCpuTimeUs())
func (stats *queryStats) ProcessCPUTime() time.Duration {
return fromUs(stats.pb.GetProcessCpuTimeUs())
}

func (s *queryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(s.pb.GetCompilation())
func (stats *queryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(stats.pb.GetCompilation())
}

func (s *queryStats) QueryPlan() string {
return s.pb.GetQueryPlan()
func (stats *queryStats) QueryPlan() string {
return stats.pb.GetQueryPlan()
}

func (s *queryStats) QueryAST() string {
return s.pb.GetQueryAst()
func (stats *queryStats) QueryAST() string {
return stats.pb.GetQueryAst()
}

func (s *queryStats) TotalCPUTime() time.Duration {
return fromUs(s.pb.GetTotalCpuTimeUs())
func (stats *queryStats) TotalCPUTime() time.Duration {
return fromUs(stats.pb.GetTotalCpuTimeUs())
}

func (s *queryStats) TotalDuration() time.Duration {
return fromUs(s.pb.GetTotalDurationUs())
func (stats *queryStats) TotalDuration() time.Duration {
return fromUs(stats.pb.GetTotalDurationUs())
}

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
func (s *queryStats) NextPhase() (p QueryPhase, ok bool) {
if s.pos >= len(s.pb.GetQueryPhases()) {
func (stats *queryStats) NextPhase() (p QueryPhase, ok bool) {
if stats.pos >= len(stats.pb.GetQueryPhases()) {
return
}
pb := s.pb.GetQueryPhases()[s.pos]
pb := stats.pb.GetQueryPhases()[stats.pos]
if pb == nil {
return
}
s.pos++
stats.pos++

return &queryPhase{
pb: pb,
}, true
}

func (stats *queryStats) QueryPhases() xiter.Seq[QueryPhase] {
return func(yield func(p QueryPhase) bool) {
for _, pb := range stats.pb.GetQueryPhases() {
cont := yield(&queryPhase{
pb: pb,
})
if !cont {
return
}
}
}
}

// NextTableAccess returns next accessed table within query execution phase.
//
// If ok flag is false, then there are no more accessed tables and t is
// invalid.
func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if queryPhase.pos >= len(queryPhase.pb.GetTableAccess()) {
func (phase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if phase.pos >= len(phase.pb.GetTableAccess()) {
return
}
pb := queryPhase.pb.GetTableAccess()[queryPhase.pos]
queryPhase.pos++
pb := phase.pb.GetTableAccess()[phase.pos]
phase.pos++

return &TableAccess{
Name: pb.GetName(),
Expand All @@ -147,20 +167,37 @@ func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
}, true
}

func (queryPhase *queryPhase) Duration() time.Duration {
return fromUs(queryPhase.pb.GetDurationUs())
func (phase *queryPhase) TableAccess() xiter.Seq[*TableAccess] {
return func(yield func(access *TableAccess) bool) {
for _, pb := range phase.pb.GetTableAccess() {
cont := yield(&TableAccess{
Name: pb.GetName(),
Reads: fromOperationStats(pb.GetReads()),
Updates: fromOperationStats(pb.GetUpdates()),
Deletes: fromOperationStats(pb.GetDeletes()),
PartitionsCount: pb.GetPartitionsCount(),
})
if !cont {
return
}
}
}
}

func (phase *queryPhase) Duration() time.Duration {
return fromUs(phase.pb.GetDurationUs())
}

func (queryPhase *queryPhase) CPUTime() time.Duration {
return fromUs(queryPhase.pb.GetCpuTimeUs())
func (phase *queryPhase) CPUTime() time.Duration {
return fromUs(phase.pb.GetCpuTimeUs())
}

func (queryPhase *queryPhase) AffectedShards() uint64 {
return queryPhase.pb.GetAffectedShards()
func (phase *queryPhase) AffectedShards() uint64 {
return phase.pb.GetAffectedShards()
}

func (queryPhase *queryPhase) IsLiteralPhase() bool {
return queryPhase.pb.GetLiteralPhase()
func (phase *queryPhase) IsLiteralPhase() bool {
return phase.pb.GetLiteralPhase()
}

func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats {
Expand Down
100 changes: 100 additions & 0 deletions internal/stats/query_go1.23_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//go:build go1.23

package stats

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
)

func TestIterateOverQueryPhases(t *testing.T) {
s := FromQueryStats(&Ydb_TableStats.QueryStats{
QueryPhases: []*Ydb_TableStats.QueryPhaseStats{
{
DurationUs: 1,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
},
},
{
DurationUs: 2,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "d",
},
{
Name: "e",
},
{
Name: "f",
},
},
},
{
DurationUs: 3,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "g",
},
{
Name: "h",
},
{
Name: "i",
},
},
},
},
})
t.Run("ImmutableIteration", func(t *testing.T) {
for i := range make([]struct{}, 3) {
t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) {
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for phase := range s.QueryPhases() {
durations = append(durations, phase.Duration())
for access := range phase.TableAccess() {
tables = append(tables, access.Name)
}
}
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)
})
}
})
t.Run("MutableIteration", func(t *testing.T) {
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for {
phase, ok := s.NextPhase()
if !ok {
break
}
durations = append(durations, phase.Duration())
for {
access, ok := phase.NextTableAccess()
if !ok {
break
}
tables = append(tables, access.Name)
}
}
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)

_, ok := s.NextPhase()
require.False(t, ok)
})
}
4 changes: 2 additions & 2 deletions internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package version

const (
Major = "3"
Minor = "96"
Patch = "2"
Minor = "97"
Patch = "0"

Package = "ydb-go-sdk"
)
Expand Down
5 changes: 4 additions & 1 deletion internal/xiter/xiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@

package xiter

type Seq2[K, V any] func(yield func(K, V) bool)
type (
Seq[T any] func(yield func(T) bool)
Seq2[K, V any] func(yield func(K, V) bool)
)
5 changes: 4 additions & 1 deletion internal/xiter/xiter_go1.23.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ import (
"iter"
)

type Seq2[K, V any] iter.Seq2[K, V]
type (
Seq[T any] iter.Seq[T]
Seq2[K, V any] iter.Seq2[K, V]
)
14 changes: 2 additions & 12 deletions retry/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func DoTx(ctx context.Context, db *sql.DB, op func(context.Context, *sql.Tx) err
// DoTxWithResult is a retryer of database/sql transactions with fallbacks on errors
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func DoTxWithResult[T any](ctx context.Context, db *sql.DB, //nolint:funlen
func DoTxWithResult[T any](ctx context.Context, db *sql.DB,
op func(context.Context, *sql.Tx) (T, error),
opts ...doTxOption,
) (T, error) {
Expand Down Expand Up @@ -211,17 +211,7 @@ func DoTxWithResult[T any](ctx context.Context, db *sql.DB, //nolint:funlen
return zeroValue, unwrapErrBadConn(xerrors.WithStackTrace(err))
}
defer func() {
if finalErr == nil {
return
}
errRollback := tx.Rollback()
if errRollback == nil {
return
}
finalErr = xerrors.NewWithIssues("",
xerrors.WithStackTrace(finalErr),
xerrors.WithStackTrace(fmt.Errorf("rollback failed: %w", errRollback)),
)
_ = tx.Rollback()
}()
v, err := op(xcontext.MarkRetryCall(ctx), tx)
if err != nil {
Expand Down
Loading

0 comments on commit 34ce284

Please sign in to comment.