Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactored query stats: implemetation instead interface #1610

Merged
merged 11 commits into from
Jan 16, 2025
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation

## v3.96.2
* Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns`

Expand Down
93 changes: 65 additions & 28 deletions internal/stats/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
asmyasnikov marked this conversation as resolved.
Show resolved Hide resolved

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
)

type (
Expand All @@ -19,12 +21,17 @@ type (
// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
NextPhase() (p QueryPhase, ok bool)

// QueryPhases is a range iterator over query phases.
QueryPhases() xiter.Seq[QueryPhase]
}
// QueryPhase holds query execution phase statistics.
QueryPhase interface {
// NextTableAccess returns next accessed table within query execution phase.
// If ok flag is false, then there are no more accessed tables and t is invalid.
NextTableAccess() (t *TableAccess, ok bool)
// TableAccess is a range iterator over query execution phase's accessed tables.
TableAccess() xiter.Seq[*TableAccess]
Duration() time.Duration
CPUTime() time.Duration
AffectedShards() uint64
Expand Down Expand Up @@ -86,57 +93,70 @@ func fromOperationStats(pb *Ydb_TableStats.OperationStats) OperationStats {
}
}

func (s *queryStats) ProcessCPUTime() time.Duration {
return fromUs(s.pb.GetProcessCpuTimeUs())
func (stats *queryStats) ProcessCPUTime() time.Duration {
return fromUs(stats.pb.GetProcessCpuTimeUs())
}

func (s *queryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(s.pb.GetCompilation())
func (stats *queryStats) Compilation() (c *CompilationStats) {
return fromCompilationStats(stats.pb.GetCompilation())
}

func (s *queryStats) QueryPlan() string {
return s.pb.GetQueryPlan()
func (stats *queryStats) QueryPlan() string {
return stats.pb.GetQueryPlan()
}

func (s *queryStats) QueryAST() string {
return s.pb.GetQueryAst()
func (stats *queryStats) QueryAST() string {
return stats.pb.GetQueryAst()
}

func (s *queryStats) TotalCPUTime() time.Duration {
return fromUs(s.pb.GetTotalCpuTimeUs())
func (stats *queryStats) TotalCPUTime() time.Duration {
return fromUs(stats.pb.GetTotalCpuTimeUs())
}

func (s *queryStats) TotalDuration() time.Duration {
return fromUs(s.pb.GetTotalDurationUs())
func (stats *queryStats) TotalDuration() time.Duration {
return fromUs(stats.pb.GetTotalDurationUs())
}

// NextPhase returns next execution phase within query.
// If ok flag is false, then there are no more phases and p is invalid.
func (s *queryStats) NextPhase() (p QueryPhase, ok bool) {
if s.pos >= len(s.pb.GetQueryPhases()) {
func (stats *queryStats) NextPhase() (p QueryPhase, ok bool) {
if stats.pos >= len(stats.pb.GetQueryPhases()) {
return
}
pb := s.pb.GetQueryPhases()[s.pos]
pb := stats.pb.GetQueryPhases()[stats.pos]
if pb == nil {
return
}
s.pos++
stats.pos++

return &queryPhase{
pb: pb,
}, true
}

func (stats *queryStats) QueryPhases() xiter.Seq[QueryPhase] {
return func(yield func(p QueryPhase) bool) {
for _, pb := range stats.pb.GetQueryPhases() {
cont := yield(&queryPhase{
pb: pb,
})
if !cont {
return
}
}
}
}

// NextTableAccess returns next accessed table within query execution phase.
//
// If ok flag is false, then there are no more accessed tables and t is
// invalid.
func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if queryPhase.pos >= len(queryPhase.pb.GetTableAccess()) {
func (phase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
if phase.pos >= len(phase.pb.GetTableAccess()) {
return
}
pb := queryPhase.pb.GetTableAccess()[queryPhase.pos]
queryPhase.pos++
pb := phase.pb.GetTableAccess()[phase.pos]
phase.pos++

return &TableAccess{
Name: pb.GetName(),
Expand All @@ -147,20 +167,37 @@ func (queryPhase *queryPhase) NextTableAccess() (t *TableAccess, ok bool) {
}, true
}

func (queryPhase *queryPhase) Duration() time.Duration {
return fromUs(queryPhase.pb.GetDurationUs())
func (phase *queryPhase) TableAccess() xiter.Seq[*TableAccess] {
return func(yield func(access *TableAccess) bool) {
for _, pb := range phase.pb.GetTableAccess() {
cont := yield(&TableAccess{
Name: pb.GetName(),
Reads: fromOperationStats(pb.GetReads()),
Updates: fromOperationStats(pb.GetUpdates()),
Deletes: fromOperationStats(pb.GetDeletes()),
PartitionsCount: pb.GetPartitionsCount(),
})
if !cont {
return
}
}
}
}

func (phase *queryPhase) Duration() time.Duration {
return fromUs(phase.pb.GetDurationUs())
}

func (queryPhase *queryPhase) CPUTime() time.Duration {
return fromUs(queryPhase.pb.GetCpuTimeUs())
func (phase *queryPhase) CPUTime() time.Duration {
return fromUs(phase.pb.GetCpuTimeUs())
}

func (queryPhase *queryPhase) AffectedShards() uint64 {
return queryPhase.pb.GetAffectedShards()
func (phase *queryPhase) AffectedShards() uint64 {
return phase.pb.GetAffectedShards()
}

func (queryPhase *queryPhase) IsLiteralPhase() bool {
return queryPhase.pb.GetLiteralPhase()
func (phase *queryPhase) IsLiteralPhase() bool {
return phase.pb.GetLiteralPhase()
}

func FromQueryStats(pb *Ydb_TableStats.QueryStats) QueryStats {
Expand Down
100 changes: 100 additions & 0 deletions internal/stats/query_go1.23_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//go:build go1.23

package stats

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_TableStats"
)

func TestIterateOverQueryPhases(t *testing.T) {
s := FromQueryStats(&Ydb_TableStats.QueryStats{
QueryPhases: []*Ydb_TableStats.QueryPhaseStats{
{
DurationUs: 1,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "a",
},
{
Name: "b",
},
{
Name: "c",
},
},
},
{
DurationUs: 2,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "d",
},
{
Name: "e",
},
{
Name: "f",
},
},
},
{
DurationUs: 3,
TableAccess: []*Ydb_TableStats.TableAccessStats{
{
Name: "g",
},
{
Name: "h",
},
{
Name: "i",
},
},
},
},
})
t.Run("ImmutableIteration", func(t *testing.T) {
for i := range make([]struct{}, 3) {
t.Run(fmt.Sprintf("Pass#%d", i), func(t *testing.T) {
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for phase := range s.QueryPhases() {
durations = append(durations, phase.Duration())
for access := range phase.TableAccess() {
tables = append(tables, access.Name)
}
}
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)
})
}
})
t.Run("MutableIteration", func(t *testing.T) {
durations := make([]time.Duration, 0, 3)
tables := make([]string, 0, 9)
for {
phase, ok := s.NextPhase()
if !ok {
break
}
durations = append(durations, phase.Duration())
for {
access, ok := phase.NextTableAccess()
if !ok {
break
}
tables = append(tables, access.Name)
}
}
require.Equal(t, []time.Duration{1000, 2000, 3000}, durations)
require.Equal(t, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i"}, tables)

_, ok := s.NextPhase()
require.False(t, ok)
})
}
5 changes: 4 additions & 1 deletion internal/xiter/xiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@

package xiter

type Seq2[K, V any] func(yield func(K, V) bool)
type (
Seq[T any] func(yield func(T) bool)
Seq2[K, V any] func(yield func(K, V) bool)
)
5 changes: 4 additions & 1 deletion internal/xiter/xiter_go1.23.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ import (
"iter"
)

type Seq2[K, V any] iter.Seq2[K, V]
type (
Seq[T any] iter.Seq[T]
Seq2[K, V any] iter.Seq2[K, V]
)
Loading