From 45c2d7d089608d1469086ba6ad572a4bb915be3f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 16 Jan 2024 13:03:07 +0300 Subject: [PATCH] query client interface and simple implementation --- connection.go | 4 + driver.go | 28 +++++ internal/query/client.go | 126 +++++++++++++++++++++ internal/query/config/config.go | 90 +++++++++++++++ internal/query/config/options.go | 63 +++++++++++ internal/query/pool.go | 38 +++++++ internal/query/session.go | 142 +++++++++++++++++++++++ internal/query/transaction.go | 30 +++++ internal/session/status.go | 11 ++ options.go | 20 ++-- query/client.go | 127 +++++++++++++++++++++ query/do_options.go | 22 ++++ query/do_tx_options.go | 34 ++++++ query/example_test.go | 141 +++++++++++++++++++++++ query/execute_options.go | 72 ++++++++++++ query/named.go | 32 ++++++ query/named_test.go | 72 ++++++++++++ query/options.go | 7 ++ query/parameters.go | 128 +++++++++++++++++++++ query/session_status.go | 34 ++++++ query/tx_control.go | 188 +++++++++++++++++++++++++++++++ query/type.go | 5 + query/value.go | 65 +++++++++++ table/table.go | 13 ++- trace/query.go | 11 ++ 25 files changed, 1490 insertions(+), 13 deletions(-) create mode 100644 internal/query/client.go create mode 100644 internal/query/config/config.go create mode 100644 internal/query/config/options.go create mode 100644 internal/query/pool.go create mode 100644 internal/query/session.go create mode 100644 internal/query/transaction.go create mode 100644 internal/session/status.go create mode 100644 query/client.go create mode 100644 query/do_options.go create mode 100644 query/do_tx_options.go create mode 100644 query/example_test.go create mode 100644 query/execute_options.go create mode 100644 query/named.go create mode 100644 query/named_test.go create mode 100644 query/options.go create mode 100644 query/parameters.go create mode 100644 query/session_status.go create mode 100644 query/tx_control.go create mode 100644 query/type.go create mode 100644 query/value.go create mode 100644 trace/query.go diff --git a/connection.go b/connection.go index 3e8e3d2a4..4a4c6440f 100644 --- a/connection.go +++ b/connection.go @@ -5,6 +5,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/coordination" "github.com/ydb-platform/ydb-go-sdk/v3/discovery" + "github.com/ydb-platform/ydb-go-sdk/v3/query" "github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" "github.com/ydb-platform/ydb-go-sdk/v3/scripting" @@ -34,6 +35,9 @@ type Connection interface { // Table returns table client Table() table.Client + // Query returns query client + Query() query.Client + // Scheme returns scheme client Scheme() scheme.Client diff --git a/driver.go b/driver.go index 65eb86a81..7718b0e05 100644 --- a/driver.go +++ b/driver.go @@ -20,6 +20,8 @@ import ( discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn" "github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint" + internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query" + queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter" ratelimiterConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config" internalScheme "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme" @@ -35,6 +37,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/query" "github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter" "github.com/ydb-platform/ydb-go-sdk/v3/scheme" "github.com/ydb-platform/ydb-go-sdk/v3/scripting" @@ -68,6 +71,9 @@ type Driver struct { //nolint:maligned table *internalTable.Client tableOptions []tableConfig.Option + query *internalQuery.Client + queryOptions []queryConfig.Option + scripting *internalScripting.Client scriptingOptions []scriptingConfig.Option @@ -141,6 +147,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) { d.scheme.Close, d.scripting.Close, d.table.Close, + d.query.Close, d.topic.Close, d.balancer.Close, d.pool.Release, @@ -180,6 +187,11 @@ func (d *Driver) Table() table.Client { return d.table } +// Query returns query client +func (d *Driver) Query() query.Client { + return nil // d.query +} + // Scheme returns scheme client func (d *Driver) Scheme() scheme.Client { return d.scheme @@ -399,6 +411,22 @@ func (d *Driver) connect(ctx context.Context) (err error) { return xerrors.WithStackTrace(err) } + d.query, err = internalQuery.New(ctx, + d.balancer, + queryConfig.New( + append( + // prepend common params from root config + []queryConfig.Option{ + queryConfig.With(d.config.Common), + }, + d.queryOptions..., + )..., + ), + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + d.scheme, err = internalScheme.New(ctx, d.balancer, schemeConfig.New( diff --git a/internal/query/client.go b/internal/query/client.go new file mode 100644 index 000000000..eea1a8549 --- /dev/null +++ b/internal/query/client.go @@ -0,0 +1,126 @@ +package query + +import ( + "context" + + "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "google.golang.org/grpc" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/query" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" +) + +type balancer interface { + grpc.ClientConnInterface +} + +var _ query.Client = (*Client)(nil) + +type Client struct { + grpcClient Ydb_Query_V1.QueryServiceClient + pool SessionPool +} + +func (c Client) Close(ctx context.Context) error { + err := c.pool.Close(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil +} + +func (c Client) do(ctx context.Context, op query.Operation, opts query.DoOptions) error { + return retry.Retry(ctx, func(ctx context.Context) error { + err := c.pool.With(ctx, func(ctx context.Context, s *session) error { + err := op(ctx, s) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil + }) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil + }, opts.RetryOptions...) +} + +func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOption) error { + doOptions := query.NewDoOptions(opts...) + if doOptions.Label != "" { + doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithLabel(doOptions.Label)) + } + if doOptions.Idempotent { + doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent)) + } + return c.do(ctx, op, doOptions) +} + +func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.DoTxOption) error { + doTxOptions := query.NewDoTxOptions(opts...) + return c.do(ctx, func(ctx context.Context, s query.Session) error { + tx, err := s.Begin(ctx, doTxOptions.TxSettings) + if err != nil { + return xerrors.WithStackTrace(err) + } + err = op(ctx, tx) + if err != nil { + errRollback := tx.Rollback(ctx) + if errRollback != nil { + return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) + } + } + err = tx.CommitTx(ctx) + if err != nil { + errRollback := tx.Rollback(ctx) + if errRollback != nil { + return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) + } + return xerrors.WithStackTrace(err) + } + return nil + }, doTxOptions.DoOptions) +} + +func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) { + grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer) + return &Client{ + grpcClient: grpcClient, + pool: &poolMock{ + create: func(ctx context.Context) (*session, error) { + s, err := grpcClient.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{}) + if err != nil { + return nil, xerrors.WithStackTrace( + xerrors.Transport(err), + ) + } + if s.GetStatus() != Ydb.StatusIds_SUCCESS { + return nil, xerrors.WithStackTrace( + xerrors.Operation(xerrors.FromOperation(s)), + ) + } + return &session{ + id: s.GetSessionId(), + nodeID: s.GetNodeId(), + queryService: grpcClient, + status: query.SessionStatusReady, + }, nil + }, + close: func(ctx context.Context, s *session) error { + _, err := grpcClient.DeleteSession(ctx, + &Ydb_Query.DeleteSessionRequest{ + SessionId: s.id, + }, + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil + }, + }, + }, nil +} diff --git a/internal/query/config/config.go b/internal/query/config/config.go new file mode 100644 index 000000000..309e51e5f --- /dev/null +++ b/internal/query/config/config.go @@ -0,0 +1,90 @@ +package config + +import ( + "time" + + "github.com/jonboulle/clockwork" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +const ( + DefaultSessionPoolDeleteTimeout = 500 * time.Millisecond + DefaultSessionPoolCreateSessionTimeout = 5 * time.Second + DefaultSessionPoolSizeLimit = 50 + DefaultSessionPoolIdleThreshold = 5 * time.Minute +) + +type Config struct { + config.Common + + sizeLimit int + + createSessionTimeout time.Duration + deleteTimeout time.Duration + idleThreshold time.Duration + + trace *trace.Query + + clock clockwork.Clock +} + +func New(opts ...Option) *Config { + c := defaults() + for _, o := range opts { + if o != nil { + o(c) + } + } + return c +} + +func defaults() *Config { + return &Config{ + sizeLimit: DefaultSessionPoolSizeLimit, + createSessionTimeout: DefaultSessionPoolCreateSessionTimeout, + deleteTimeout: DefaultSessionPoolDeleteTimeout, + idleThreshold: DefaultSessionPoolIdleThreshold, + clock: clockwork.NewRealClock(), + trace: &trace.Query{}, + } +} + +// Trace defines trace over table client calls +func (c *Config) Trace() *trace.Query { + return c.trace +} + +// Clock defines clock +func (c *Config) Clock() clockwork.Clock { + return c.clock +} + +// SizeLimit is an upper bound of pooled sessions. +// If SizeLimit is less than or equal to zero then the +// DefaultSessionPoolSizeLimit variable is used as a limit. +func (c *Config) SizeLimit() int { + return c.sizeLimit +} + +// IdleThreshold is a maximum duration between any activity within session. +// If this threshold reached, idle session will be closed +// +// If IdleThreshold is less than zero then there is no idle limit. +// If IdleThreshold is zero, then the DefaultSessionPoolIdleThreshold value is used. +func (c *Config) IdleThreshold() time.Duration { + return c.idleThreshold +} + +// CreateSessionTimeout limits maximum time spent on Create session request +func (c *Config) CreateSessionTimeout() time.Duration { + return c.createSessionTimeout +} + +// DeleteTimeout limits maximum time spent on Delete request +// +// If DeleteTimeout is less than or equal to zero then the DefaultSessionPoolDeleteTimeout is used. +func (c *Config) DeleteTimeout() time.Duration { + return c.deleteTimeout +} diff --git a/internal/query/config/options.go b/internal/query/config/options.go new file mode 100644 index 000000000..732cffbcf --- /dev/null +++ b/internal/query/config/options.go @@ -0,0 +1,63 @@ +package config + +import ( + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" +) + +type Option func(*Config) + +// With applies common configuration params +func With(config config.Common) Option { + return func(c *Config) { + c.Common = config + } +} + +// WithSizeLimit defines upper bound of pooled sessions. +// If sizeLimit is less than or equal to zero then the +// DefaultSessionPoolSizeLimit variable is used as a limit. +func WithSizeLimit(sizeLimit int) Option { + return func(c *Config) { + if sizeLimit > 0 { + c.sizeLimit = sizeLimit + } + } +} + +// WithIdleThreshold sets maximum duration between any activity within session. +// If this threshold reached, session will be closed. +// +// If idleThreshold is less than zero then there is no idle limit. +// If idleThreshold is zero, then the DefaultSessionPoolIdleThreshold value is used. +func WithIdleThreshold(idleThreshold time.Duration) Option { + return func(c *Config) { + if idleThreshold < 0 { + idleThreshold = 0 + } + c.idleThreshold = idleThreshold + } +} + +// WithCreateSessionTimeout limits maximum time spent on Create session request +// If createSessionTimeout is less than or equal to zero then no used timeout on create session request +func WithCreateSessionTimeout(createSessionTimeout time.Duration) Option { + return func(c *Config) { + if createSessionTimeout > 0 { + c.createSessionTimeout = createSessionTimeout + } else { + c.createSessionTimeout = 0 + } + } +} + +// WithDeleteTimeout limits maximum time spent on Delete request +// If deleteTimeout is less than or equal to zero then the DefaultSessionPoolDeleteTimeout is used. +func WithDeleteTimeout(deleteTimeout time.Duration) Option { + return func(c *Config) { + if deleteTimeout > 0 { + c.deleteTimeout = deleteTimeout + } + } +} diff --git a/internal/query/pool.go b/internal/query/pool.go new file mode 100644 index 000000000..057e0c74b --- /dev/null +++ b/internal/query/pool.go @@ -0,0 +1,38 @@ +package query + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" +) + +type SessionPool interface { + With(ctx context.Context, f func(ctx context.Context, s *session) error) error + Close(ctx context.Context) error +} + +var _ SessionPool = (*poolMock)(nil) + +type poolMock struct { + create func(ctx context.Context) (*session, error) + close func(ctx context.Context, s *session) error +} + +func (pool poolMock) Close(ctx context.Context) error { + return nil +} + +func (pool poolMock) With(ctx context.Context, f func(ctx context.Context, s *session) error) error { + s, err := pool.create(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + defer func() { + _ = pool.close(ctx, s) + }() + err = f(ctx, s) + if err != nil { + return xerrors.WithStackTrace(err) + } + return nil +} diff --git a/internal/query/session.go b/internal/query/session.go new file mode 100644 index 000000000..2084852a8 --- /dev/null +++ b/internal/query/session.go @@ -0,0 +1,142 @@ +package query + +import ( + "context" + "sync/atomic" + "time" + + "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +var _ query.Session = (*session)(nil) + +type session struct { + id string + nodeID int64 + queryService Ydb_Query_V1.QueryServiceClient + status query.SessionStatus + lastUsage atomic.Pointer[time.Time] +} + +func (s *session) Begin(ctx context.Context, txSettings *query.TransactionSettings) (query.Transaction, error) { + response, err := s.queryService.BeginTransaction(ctx, + &Ydb_Query.BeginTransactionRequest{ + SessionId: s.id, + TxSettings: txSettings.Desc(), + }, + ) + if err != nil { + return nil, xerrors.WithStackTrace(xerrors.Transport(err)) + } + if response.GetStatus() != Ydb.StatusIds_SUCCESS { + return nil, xerrors.WithStackTrace(xerrors.Operation(xerrors.FromOperation(response))) + } + return transaction{ + id: response.GetTxMeta().GetId(), + }, nil +} + +func (s *session) ID() string { + return s.id +} + +func (s *session) NodeID() int64 { + return s.nodeID +} + +func (s *session) Status() query.SessionStatus { + status := query.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))) + return status +} + +func (s *session) LastUsage() time.Time { + return *s.lastUsage.Load() +} + +func (s *session) updateLastUsage() { + t := time.Now() + s.lastUsage.Store(&t) +} + +func queryFromText(q string, syntax Ydb_Query.Syntax) *Ydb_Query.QueryContent { + return &Ydb_Query.QueryContent{ + Syntax: syntax, + Text: q, + } +} + +func (s *session) Execute( + ctx context.Context, q string, opts ...query.ExecuteOption, +) ( + txr query.Transaction, + r query.Result, + err error, +) { + var ( + executeOpts = query.NewExecuteOptions(opts...) + a = allocator.New() + request = Ydb_Query.ExecuteQueryRequest{ + SessionId: s.id, + ExecMode: 0, + TxControl: executeOpts.TxControl().Desc(), + Query: &Ydb_Query.ExecuteQueryRequest_QueryContent{ + QueryContent: queryFromText(q, executeOpts.Syntax()), + }, + Parameters: executeOpts.Params().ToYDB(a), + StatsMode: 0, + ConcurrentResultSets: false, + } + //stream Ydb_Query_V1.QueryService_ExecuteQueryClient + ) + defer func() { + a.Free() + }() + + ctx, cancel := xcontext.WithCancel(ctx) + + _, err = s.queryService.ExecuteQuery(ctx, &request, executeOpts.CallOptions()...) + if err != nil { + cancel() + return nil, nil, xerrors.WithStackTrace(err) + } + return nil, nil, xerrors.WithStackTrace(err) + // + //return scanner.NewStream(ctx, + // func(ctx context.Context) ( + // set *Ydb.ResultSet, + // stats *Ydb_TableStats.QueryStats, + // err error, + // ) { + // defer func() { + // onIntermediate(xerrors.HideEOF(err)) + // }() + // select { + // case <-ctx.Done(): + // return nil, nil, xerrors.WithStackTrace(ctx.Err()) + // default: + // var response *Ydb_Table.ExecuteScanQueryPartialResponse + // response, err = stream.Recv() + // result := response.GetResult() + // if result == nil || err != nil { + // return nil, nil, xerrors.WithStackTrace(err) + // } + // return result.GetResultSet(), result.GetQueryStats(), nil + // } + // }, + // func(err error) error { + // cancel() + // onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err)) + // return err + // }, + // scanner.WithIgnoreTruncated(s.config.IgnoreTruncated()), + // scanner.WithMarkTruncatedAsRetryable(), + //) + // +} diff --git a/internal/query/transaction.go b/internal/query/transaction.go new file mode 100644 index 000000000..84085c71b --- /dev/null +++ b/internal/query/transaction.go @@ -0,0 +1,30 @@ +package query + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +var _ query.Transaction = (*transaction)(nil) + +type transaction struct { + id string +} + +func (t transaction) Execute( + ctx context.Context, query string, opts ...query.TxExecuteOption, +) (r query.Result, err error) { + //TODO implement me + panic("implement me") +} + +func (t transaction) CommitTx(ctx context.Context) (err error) { + //TODO implement me + panic("implement me") +} + +func (t transaction) Rollback(ctx context.Context) (err error) { + //TODO implement me + panic("implement me") +} diff --git a/internal/session/status.go b/internal/session/status.go new file mode 100644 index 000000000..7807fdbd5 --- /dev/null +++ b/internal/session/status.go @@ -0,0 +1,11 @@ +package session + +type Status = string + +const ( + StatusUnknown = Status("unknown") + StatusReady = Status("ready") + StatusBusy = Status("busy") + StatusClosing = Status("closing") + StatusClosed = Status("closed") +) diff --git a/options.go b/options.go index 41c995f97..51b1733ad 100644 --- a/options.go +++ b/options.go @@ -17,6 +17,7 @@ import ( coordinationConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/coordination/config" discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config" "github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn" + queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config" ratelimiterConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config" schemeConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme/config" scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config" @@ -352,7 +353,6 @@ func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) O for _, cert := range certs { _ = WithCertificate(cert)(ctx, c) } - return nil } } @@ -362,7 +362,15 @@ func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) O func WithTableConfigOption(option tableConfig.Option) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, option) + return nil + } +} +// WithQueryConfigOption collects additional configuration options for query.Client. +// This option does not replace collected option, instead it will appen provided options. +func WithQueryConfigOption(option queryConfig.Option) Option { + return func(ctx context.Context, c *Driver) error { + c.queryOptions = append(c.queryOptions, option) return nil } } @@ -371,7 +379,7 @@ func WithTableConfigOption(option tableConfig.Option) Option { func WithSessionPoolSizeLimit(sizeLimit int) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithSizeLimit(sizeLimit)) - + c.queryOptions = append(c.queryOptions, queryConfig.WithSizeLimit(sizeLimit)) return nil } } @@ -387,6 +395,7 @@ func WithSessionPoolKeepAliveMinSize(keepAliveMinSize int) Option { func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithIdleThreshold(idleThreshold)) + c.queryOptions = append(c.queryOptions, queryConfig.WithIdleThreshold(idleThreshold)) c.databaseSQLOptions = append( c.databaseSQLOptions, xsql.WithIdleThreshold(idleThreshold), @@ -405,7 +414,7 @@ func WithSessionPoolKeepAliveTimeout(keepAliveTimeout time.Duration) Option { func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithCreateSessionTimeout(createSessionTimeout)) - + c.queryOptions = append(c.queryOptions, queryConfig.WithCreateSessionTimeout(createSessionTimeout)) return nil } } @@ -414,7 +423,7 @@ func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Opt func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithDeleteTimeout(deleteTimeout)) - + c.queryOptions = append(c.queryOptions, queryConfig.WithDeleteTimeout(deleteTimeout)) return nil } } @@ -423,7 +432,6 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { func WithIgnoreTruncated() Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithIgnoreTruncated()) - return nil } } @@ -436,7 +444,6 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { return func(ctx context.Context, c *Driver) error { c.panicCallback = panicCallback c.options = append(c.options, config.WithPanicCallback(panicCallback)) - return nil } } @@ -456,7 +463,6 @@ func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { // )..., ), ) - return nil } } diff --git a/query/client.go b/query/client.go new file mode 100644 index 000000000..c4f06c562 --- /dev/null +++ b/query/client.go @@ -0,0 +1,127 @@ +package query + +import ( + "context" + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" + "github.com/ydb-platform/ydb-go-sdk/v3/retry" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +type Client interface { + // Do provide the best effort for execute operation. + // + // Do implements internal busy loop until one of the following conditions is met: + // - deadline was canceled or deadlined + // - retry operation returned nil as error + // + // Warning: if context without deadline or cancellation func than Do can run indefinitely. + Do(ctx context.Context, op Operation, opts ...DoOption) error + + // DoTx provide the best effort for execute transaction. + // + // DoTx implements internal busy loop until one of the following conditions is met: + // - deadline was canceled or deadlined + // - retry operation returned nil as error + // + // DoTx makes auto begin (with TxSettings, by default - SerializableReadWrite), commit and + // rollback (on error) of transaction. + // + // If op TxOperation returns nil - transaction will be committed + // If op TxOperation return non nil - transaction will be rollback + // Warning: if context without deadline or cancellation func than DoTx can run indefinitely + DoTx(ctx context.Context, op TxOperation, opts ...DoTxOption) error +} + +type ( + // Operation is the interface that holds an operation for retry. + // if Operation returns not nil - operation will retry + // if Operation returns nil - retry loop will break + Operation func(ctx context.Context, s Session) error + + // TxOperation is the interface that holds an operation for retry. + // if TxOperation returns not nil - operation will retry + // if TxOperation returns nil - retry loop will break + TxOperation func(ctx context.Context, tx TransactionActor) error + + ClosableSession interface { + closer.Closer + + Session + } + + DoOption interface { + applyDoOption(o *DoOptions) + } + + DoOptions struct { + Label string + Idempotent bool + RetryOptions []retry.Option + Trace *trace.Query + } + + DoTxOption interface { + applyDoTxOption(o *DoTxOptions) + } + + DoTxOptions struct { + DoOptions + + TxSettings *TransactionSettings + } + + SessionInfo interface { + ID() string + NodeID() int64 + Status() SessionStatus + LastUsage() time.Time + } + + Session interface { + SessionInfo + + // Execute executes query. + // + // Execute used by default: + // - DefaultTxControl + // - flag WithKeepInCache(true) if params is not empty. + Execute(ctx context.Context, query string, opts ...ExecuteOption) (txr Transaction, r Result, err error) + + Begin(ctx context.Context, txSettings *TransactionSettings) (Transaction, error) + } + + Result interface { + Close() error + Err() error + NextResultSet(ctx context.Context) bool + Next() bool + Scan(dst ...interface{}) error + ScanNamed(dst ...NamedDestination) error + } + + NamedDestination interface { + Name() string + Destination() interface{} + } + + TransactionIdentifier interface { + ID() string + } + + TransactionActor interface { + // Execute executes query. + // + // Execute used by default: + // - flag WithKeepInCache(true) if params is not empty. + Execute(ctx context.Context, query string, opts ...TxExecuteOption) (r Result, err error) + } + + Transaction interface { + TransactionActor + + CommitTx(ctx context.Context) (err error) + Rollback(ctx context.Context) (err error) + } +) diff --git a/query/do_options.go b/query/do_options.go new file mode 100644 index 000000000..36591176f --- /dev/null +++ b/query/do_options.go @@ -0,0 +1,22 @@ +package query + +import ( + "github.com/ydb-platform/ydb-go-sdk/v3/retry" +) + +var ( + _ DoOption = idempotentOption{} + _ DoTxOption = idempotentOption{} +) + +func (idempotentOption) applyDoOption(opts *DoOptions) { + opts.Idempotent = true + opts.RetryOptions = append(opts.RetryOptions, retry.WithIdempotent(true)) +} + +func NewDoOptions(opts ...DoOption) (doOptions DoOptions) { + for _, opt := range opts { + opt.applyDoOption(&doOptions) + } + return doOptions +} diff --git a/query/do_tx_options.go b/query/do_tx_options.go new file mode 100644 index 000000000..568135887 --- /dev/null +++ b/query/do_tx_options.go @@ -0,0 +1,34 @@ +package query + +import ( + "github.com/ydb-platform/ydb-go-sdk/v3/retry" +) + +var _ DoTxOption = idempotentOption{} + +func (idempotentOption) applyDoTxOption(opts *DoTxOptions) { + opts.Idempotent = true + opts.RetryOptions = append(opts.RetryOptions, retry.WithIdempotent(true)) +} + +var _ DoTxOption = txSettingsOption{} + +type txSettingsOption struct { + txSettings *TransactionSettings +} + +func (opt txSettingsOption) applyDoTxOption(opts *DoTxOptions) { + opts.TxSettings = opt.txSettings +} + +func WithTxSettings(txSettings *TransactionSettings) txSettingsOption { + return txSettingsOption{txSettings: txSettings} +} + +func NewDoTxOptions(opts ...DoTxOption) (doTxOptions DoTxOptions) { + doTxOptions.TxSettings = TxSettings(WithDefaultTxMode()) + for _, opt := range opts { + opt.applyDoTxOption(&doTxOptions) + } + return doTxOptions +} diff --git a/query/example_test.go b/query/example_test.go new file mode 100644 index 000000000..cc6d1f37f --- /dev/null +++ b/query/example_test.go @@ -0,0 +1,141 @@ +package query_test + +import ( + "context" + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/query" +) + +func Example_selectWithoutParameters() { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local") + if err != nil { + fmt.Printf("failed connect: %v", err) + return + } + defer db.Close(ctx) // cleanup resources + var ( + id int32 // required value + myStr string // optional value + ) + // Do retry operation on errors with best effort + err = db.Query().Do(ctx, // context manage exiting from Do + func(ctx context.Context, s query.Session) (err error) { // retry operation + _, res, err := s.Execute(ctx, + `SELECT 42 as id, "my string" as myStr`, + ) + if err != nil { + return err // for auto-retry with driver + } + defer func() { _ = res.Close() }() // cleanup resources + for res.NextResultSet(ctx) { // iterate over result sets + for res.Next() { // iterate over rows + if err = res.Scan(&id, &myStr); err != nil { + return err // generally scan error not retryable, return it for driver check error + } + } + } + return res.Err() // return finally result error for auto-retry with driver + }, + query.WithIdempotent(), + ) + if err != nil { + fmt.Printf("unexpected error: %v", err) + } + fmt.Printf("id=%v, myStr='%s'\n", id, myStr) +} + +func Example_selectWithParameters() { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local") + if err != nil { + fmt.Printf("failed connect: %v", err) + return + } + defer db.Close(ctx) // cleanup resources + var ( + id int32 // required value + myStr string // optional value + ) + // Do retry operation on errors with best effort + err = db.Query().Do(ctx, // context manage exiting from Do + func(ctx context.Context, s query.Session) (err error) { // retry operation + _, res, err := s.Execute(ctx, + `SELECT CAST($id AS Uint64) AS id, CAST($myStr AS Text) AS myStr`, + query.WithParameters( + query.Param("$id", query.Uint64Value(123)), + query.Param("$myStr", query.TextValue("test")), + ), + ) + if err != nil { + return err // for auto-retry with driver + } + defer func() { _ = res.Close() }() // cleanup resources + for res.NextResultSet(ctx) { // iterate over result sets + for res.Next() { // iterate over rows + err = res.ScanNamed( + query.Named("id", &id), + query.Named("myStr", &myStr), + ) + if err != nil { + return err // generally scan error not retryable, return it for driver check error + } + } + } + return res.Err() // return finally result error for auto-retry with driver + }, + query.WithIdempotent(), + ) + if err != nil { + fmt.Printf("unexpected error: %v", err) + } + fmt.Printf("id=%v, myStr='%s'\n", id, myStr) +} + +func Example_txSelect() { + ctx := context.TODO() + db, err := ydb.Open(ctx, "grpc://localhost:2136/local") + if err != nil { + fmt.Printf("failed connect: %v", err) + return + } + defer db.Close(ctx) // cleanup resources + var ( + id int32 // required value + myStr string // optional value + ) + // Do retry operation on errors with best effort + err = db.Query().DoTx(ctx, // context manage exiting from Do + func(ctx context.Context, tx query.TransactionActor) (err error) { // retry operation + res, err := tx.Execute(ctx, + `SELECT 42 as id, "my string" as myStr`, + ) + if err != nil { + return err // for auto-retry with driver + } + defer func() { _ = res.Close() }() // cleanup resources + for res.NextResultSet(ctx) { // iterate over result sets + for res.Next() { // iterate over rows + err = res.ScanNamed( + query.Named("id", &id), + query.Named("myStr", &myStr), + ) + if err != nil { + return err // generally scan error not retryable, return it for driver check error + } + } + } + return res.Err() // return finally result error for auto-retry with driver + }, + query.WithIdempotent(), + query.WithTxSettings(query.TxSettings( + query.WithSnapshotReadOnly(), + )), + ) + if err != nil { + fmt.Printf("unexpected error: %v", err) + } + fmt.Printf("id=%v, myStr='%s'\n", id, myStr) +} diff --git a/query/execute_options.go b/query/execute_options.go new file mode 100644 index 000000000..7c8dc6d26 --- /dev/null +++ b/query/execute_options.go @@ -0,0 +1,72 @@ +package query + +import ( + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" + "google.golang.org/grpc" +) + +type ( + executeOptions struct { + txControl *TransactionControl + syntax syntax + params *Parameters + callOptions []grpc.CallOption + } + ExecuteOption interface { + applyExecuteOption(o *executeOptions) + } + + TxExecuteOptions struct { + params *Parameters + syntax syntax + } + + TxExecuteOption interface { + applyTxExecuteOption(o *TxExecuteOptions) + } +) + +func NewExecuteOptions(opts ...ExecuteOption) (executeOptions executeOptions) { + executeOptions.syntax = Ydb_Query.Syntax_SYNTAX_YQL_V1 + executeOptions.txControl = DefaultTxControl() + for _, opt := range opts { + opt.applyExecuteOption(&executeOptions) + } + return executeOptions +} + +func (opts executeOptions) TxControl() *TransactionControl { + return opts.txControl +} + +func (opts executeOptions) CallOptions() []grpc.CallOption { + return opts.callOptions +} + +type syntax = Ydb_Query.Syntax + +func (opts executeOptions) Syntax() syntax { + return opts.syntax +} + +func (opts executeOptions) Params() queryParams { + return opts.params.Params() +} + +func NewTxExecuteOptions(opts ...TxExecuteOption) (txExecuteOptions TxExecuteOptions) { + txExecuteOptions.syntax = Ydb_Query.Syntax_SYNTAX_YQL_V1 + for _, opt := range opts { + opt.applyTxExecuteOption(&txExecuteOptions) + } + return txExecuteOptions +} + +var _ ExecuteOption = (*Parameters)(nil) + +func WithParameters(params ...Parameter) *Parameters { + q := &Parameters{ + m: make(queryParams, len(params)), + } + q.Add(params...) + return q +} diff --git a/query/named.go b/query/named.go new file mode 100644 index 000000000..5ff7a1d66 --- /dev/null +++ b/query/named.go @@ -0,0 +1,32 @@ +package query + +import ( + "fmt" + "reflect" +) + +type namedDestination struct { + name string + ref interface{} +} + +func (dst namedDestination) Name() string { + return dst.name +} + +func (dst namedDestination) Destination() interface{} { + return dst.ref +} + +func Named(columnName string, destinationValueReference interface{}) (dst namedDestination) { + if columnName == "" { + panic("columnName must be not empty") + } + dst.name = columnName + v := reflect.TypeOf(destinationValueReference) + if v.Kind() != reflect.Ptr { + panic(fmt.Errorf("%T is not reference type", destinationValueReference)) + } + dst.ref = destinationValueReference + return dst +} diff --git a/query/named_test.go b/query/named_test.go new file mode 100644 index 000000000..7b804a2c3 --- /dev/null +++ b/query/named_test.go @@ -0,0 +1,72 @@ +package query + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNamed(t *testing.T) { + for _, tt := range []struct { + name string + ref interface{} + dst namedDestination + panic bool + }{ + { + name: "", + ref: nil, + dst: namedDestination{}, + panic: true, + }, + { + name: "nil_ref", + ref: nil, + dst: namedDestination{}, + panic: true, + }, + { + name: "not_ref", + ref: 123, + dst: namedDestination{}, + panic: true, + }, + { + name: "int_ptr", + ref: func(v int) *int { return &v }(123), + dst: namedDestination{ + name: "int_ptr", + ref: func(v int) *int { return &v }(123), + }, + panic: false, + }, + { + name: "int_dbl_ptr", + ref: func(v int) **int { + vv := &v + return &vv + }(123), + dst: namedDestination{ + name: "int_dbl_ptr", + ref: func(v int) **int { + vv := &v + return &vv + }(123), + }, + panic: false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.panic { + defer func() { + require.NotNil(t, recover()) + }() + } else { + defer func() { + require.Nil(t, recover()) + }() + } + require.Equal(t, tt.dst, Named(tt.name, tt.ref)) + }) + } +} diff --git a/query/options.go b/query/options.go new file mode 100644 index 000000000..62618839b --- /dev/null +++ b/query/options.go @@ -0,0 +1,7 @@ +package query + +type idempotentOption struct{} + +func WithIdempotent() idempotentOption { + return idempotentOption{} +} diff --git a/query/parameters.go b/query/parameters.go new file mode 100644 index 000000000..763bfe5b3 --- /dev/null +++ b/query/parameters.go @@ -0,0 +1,128 @@ +package query + +import ( + "sort" + + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring" +) + +// Parameters +type ( + queryParams map[string]Value + Parameter interface { + Name() string + Value() Value + } + parameter struct { + name string + value Value + } + Parameters struct { + m queryParams + } +) + +func (params *Parameters) applyTxExecuteOption(o *TxExecuteOptions) { + o.params = params +} + +func (params *Parameters) applyExecuteOption(o *executeOptions) { + o.params = params +} + +func (p parameter) Name() string { + return p.name +} + +func (p parameter) Value() Value { + return p.value +} + +func (qp queryParams) ToYDB(a *allocator.Allocator) map[string]*Ydb.TypedValue { + if qp == nil { + return nil + } + params := make(map[string]*Ydb.TypedValue, len(qp)) + for k, v := range qp { + params[k] = value.ToYDB(v, a) + } + return params +} + +func (params *Parameters) Params() queryParams { + if params == nil { + return nil + } + return params.m +} + +func (params *Parameters) Count() int { + if params == nil { + return 0 + } + return len(params.m) +} + +func (params *Parameters) Each(it func(name string, v Value)) { + if params == nil { + return + } + for key, v := range params.m { + it(key, v) + } +} + +func (params *Parameters) names() []string { + if params == nil { + return nil + } + names := make([]string, 0, len(params.m)) + for k := range params.m { + names = append(names, k) + } + sort.Strings(names) + return names +} + +func (params *Parameters) String() string { + buffer := xstring.Buffer() + defer buffer.Free() + + buffer.WriteByte('{') + for i, name := range params.names() { + if i != 0 { + buffer.WriteByte(',') + } + buffer.WriteByte('"') + buffer.WriteString(name) + buffer.WriteString("\":") + buffer.WriteString(params.m[name].Yql()) + } + buffer.WriteByte('}') + return buffer.String() +} + +func (params *Parameters) Add(parameters ...Parameter) { + for _, param := range parameters { + params.m[param.Name()] = param.Value() + } +} + +func Param(name string, v Value) Parameter { + switch len(name) { + case 0: + panic("empty name") + default: + if name[0] != '$' { + name = "$" + name + } + } + return ¶meter{ + name: name, + value: v, + } +} diff --git a/query/session_status.go b/query/session_status.go new file mode 100644 index 000000000..7a908d0e1 --- /dev/null +++ b/query/session_status.go @@ -0,0 +1,34 @@ +package query + +import ( + "fmt" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/session" +) + +type SessionStatus uint32 + +const ( + SessionStatusUnknown = SessionStatus(iota) + SessionStatusReady + SessionStatusBusy + SessionStatusClosing + SessionStatusClosed +) + +func (s SessionStatus) String() string { + switch s { + case 0: + return session.StatusUnknown + case 1: + return session.StatusReady + case 2: + return session.StatusBusy + case 3: + return session.StatusClosing + case 4: + return session.StatusClosed + default: + return fmt.Sprintf("unknown_%d", s) + } +} diff --git a/query/tx_control.go b/query/tx_control.go new file mode 100644 index 000000000..8d9f35e97 --- /dev/null +++ b/query/tx_control.go @@ -0,0 +1,188 @@ +package query + +import ( + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" +) + +var ( + serializableReadWrite = &Ydb_Query.TransactionSettings_SerializableReadWrite{ + SerializableReadWrite: &Ydb_Query.SerializableModeSettings{}, + } + staleReadOnly = &Ydb_Query.TransactionSettings_StaleReadOnly{ + StaleReadOnly: &Ydb_Query.StaleModeSettings{}, + } + snapshotReadOnly = &Ydb_Query.TransactionSettings_SnapshotReadOnly{ + SnapshotReadOnly: &Ydb_Query.SnapshotModeSettings{}, + } +) + +// Transaction control options +type ( + TxOption func(*TransactionSettings) + TransactionSettings struct { + settings Ydb_Query.TransactionSettings + } +) + +func (txSettings *TransactionSettings) Desc() *Ydb_Query.TransactionSettings { + return &txSettings.settings +} + +// TxSettings returns transaction settings +func TxSettings(opts ...TxOption) *TransactionSettings { + s := new(TransactionSettings) + for _, opt := range opts { + if opt != nil { + opt(s) + } + } + return s +} + +// BeginTx returns begin transaction control option +func BeginTx(opts ...TxOption) TxControlOption { + return func(d *txControlDesc) { + s := TxSettings(opts...) + d.TxSelector = &Ydb_Query.TransactionControl_BeginTx{ + BeginTx: &s.settings, + } + } +} + +func WithTx(t TransactionIdentifier) TxControlOption { + return func(d *txControlDesc) { + d.TxSelector = &Ydb_Query.TransactionControl_TxId{ + TxId: t.ID(), + } + } +} + +func WithTxID(txID string) TxControlOption { + return func(d *txControlDesc) { + d.TxSelector = &Ydb_Query.TransactionControl_TxId{ + TxId: txID, + } + } +} + +// CommitTx returns commit transaction control option +func CommitTx() TxControlOption { + return func(d *txControlDesc) { + d.CommitTx = true + } +} + +func WithDefaultTxMode() TxOption { + return WithSerializableReadWrite() +} + +func WithSerializableReadWrite() TxOption { + return func(d *TransactionSettings) { + d.settings.TxMode = serializableReadWrite + } +} + +func WithSnapshotReadOnly() TxOption { + return func(d *TransactionSettings) { + d.settings.TxMode = snapshotReadOnly + } +} + +func WithStaleReadOnly() TxOption { + return func(d *TransactionSettings) { + d.settings.TxMode = staleReadOnly + } +} + +func WithOnlineReadOnly(opts ...TxOnlineReadOnlyOption) TxOption { + return func(d *TransactionSettings) { + var ro txOnlineReadOnly + for _, opt := range opts { + if opt != nil { + opt(&ro) + } + } + d.settings.TxMode = &Ydb_Query.TransactionSettings_OnlineReadOnly{ + OnlineReadOnly: (*Ydb_Query.OnlineModeSettings)(&ro), + } + } +} + +type ( + txOnlineReadOnly Ydb_Query.OnlineModeSettings + TxOnlineReadOnlyOption func(*txOnlineReadOnly) +) + +func WithInconsistentReads() TxOnlineReadOnlyOption { + return func(d *txOnlineReadOnly) { + d.AllowInconsistentReads = true + } +} + +type ( + txControlDesc Ydb_Query.TransactionControl + TxControlOption func(*txControlDesc) +) + +type TransactionControl struct { + desc Ydb_Query.TransactionControl +} + +func (t *TransactionControl) Desc() *Ydb_Query.TransactionControl { + if t == nil { + return nil + } + return &t.desc +} + +// TxControl makes transaction control from given options +func TxControl(opts ...TxControlOption) *TransactionControl { + c := new(TransactionControl) + for _, opt := range opts { + if opt != nil { + opt((*txControlDesc)(&c.desc)) + } + } + return c +} + +// DefaultTxControl returns default transaction control with serializable read-write isolation mode and auto-commit +func DefaultTxControl() *TransactionControl { + return TxControl( + BeginTx(WithSerializableReadWrite()), + CommitTx(), + ) +} + +// SerializableReadWriteTxControl returns transaction control with serializable read-write isolation mode +func SerializableReadWriteTxControl(opts ...TxControlOption) *TransactionControl { + return TxControl( + append([]TxControlOption{ + BeginTx(WithSerializableReadWrite()), + }, opts...)..., + ) +} + +// OnlineReadOnlyTxControl returns online read-only transaction control +func OnlineReadOnlyTxControl(opts ...TxOnlineReadOnlyOption) *TransactionControl { + return TxControl( + BeginTx(WithOnlineReadOnly(opts...)), + CommitTx(), // open transactions not supported for OnlineReadOnly + ) +} + +// StaleReadOnlyTxControl returns stale read-only transaction control +func StaleReadOnlyTxControl() *TransactionControl { + return TxControl( + BeginTx(WithStaleReadOnly()), + CommitTx(), // open transactions not supported for StaleReadOnly + ) +} + +// SnapshotReadOnlyTxControl returns snapshot read-only transaction control +func SnapshotReadOnlyTxControl() *TransactionControl { + return TxControl( + BeginTx(WithSnapshotReadOnly()), + CommitTx(), // open transactions not supported for StaleReadOnly + ) +} diff --git a/query/type.go b/query/type.go new file mode 100644 index 000000000..291397155 --- /dev/null +++ b/query/type.go @@ -0,0 +1,5 @@ +package query + +import "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" + +type Type = value.Type diff --git a/query/value.go b/query/value.go new file mode 100644 index 000000000..10f8c2a19 --- /dev/null +++ b/query/value.go @@ -0,0 +1,65 @@ +package query + +import ( + "time" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" +) + +type Value = value.Value + +func BoolValue(v bool) Value { + return value.BoolValue(v) +} + +func Uint64Value(v uint64) Value { + return value.Uint64Value(v) +} + +func Int64Value(v int64) Value { + return value.Int64Value(v) +} + +func Uint32Value(v uint32) Value { + return value.Uint32Value(v) +} + +func Int32Value(v int32) Value { + return value.Int32Value(v) +} + +func Uint16Value(v uint16) Value { + return value.Uint16Value(v) +} + +func Int16Value(v int16) Value { + return value.Int16Value(v) +} + +func Uint8Value(v uint8) Value { + return value.Uint8Value(v) +} + +func Int8Value(v int8) Value { + return value.Int8Value(v) +} + +func TextValue(v string) Value { + return value.TextValue(v) +} + +func BytesValue(v []byte) Value { + return value.BytesValue(v) +} + +func IntervalValue(v time.Duration) Value { + return value.IntervalValueFromDuration(v) +} + +func TimestampValue(v time.Time) Value { + return value.TimestampValueFromTime(v) +} + +func DatetimeValue(v time.Time) Value { + return value.DatetimeValueFromTime(v) +} diff --git a/table/table.go b/table/table.go index 45b81f687..51fb2d119 100644 --- a/table/table.go +++ b/table/table.go @@ -10,6 +10,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/session" "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring" "github.com/ydb-platform/ydb-go-sdk/v3/retry" @@ -70,14 +71,14 @@ type Client interface { DoTx(ctx context.Context, op TxOperation, opts ...Option) error } -type SessionStatus = string +type SessionStatus = session.Status const ( - SessionStatusUnknown = SessionStatus("unknown") - SessionReady = SessionStatus("ready") - SessionBusy = SessionStatus("busy") - SessionClosing = SessionStatus("closing") - SessionClosed = SessionStatus("closed") + SessionStatusUnknown = session.StatusUnknown + SessionReady = session.StatusReady + SessionBusy = session.StatusBusy + SessionClosing = session.StatusClosing + SessionClosed = session.StatusClosed ) type SessionInfo interface { diff --git a/trace/query.go b/trace/query.go new file mode 100644 index 000000000..c76a2d28a --- /dev/null +++ b/trace/query.go @@ -0,0 +1,11 @@ +package trace + +// tool gtrace used from ./internal/cmd/gtrace + +//go:generate gtrace + +type ( + // Query specified trace of retry call activity. + // gtrace:gen + Query struct{} +)