Skip to content

Commit

Permalink
query client interface and simple implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jan 18, 2024
1 parent 847aa72 commit 45c2d7d
Show file tree
Hide file tree
Showing 25 changed files with 1,490 additions and 13 deletions.
4 changes: 4 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"github.com/ydb-platform/ydb-go-sdk/v3/scripting"
Expand Down Expand Up @@ -34,6 +35,9 @@ type Connection interface {
// Table returns table client
Table() table.Client

// Query returns query client
Query() query.Client

// Scheme returns scheme client
Scheme() scheme.Client

Expand Down
28 changes: 28 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
discoveryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/discovery/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/dsn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
internalQuery "github.com/ydb-platform/ydb-go-sdk/v3/internal/query"
queryConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
internalRatelimiter "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter"
ratelimiterConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/ratelimiter/config"
internalScheme "github.com/ydb-platform/ydb-go-sdk/v3/internal/scheme"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/log"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter"
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
"github.com/ydb-platform/ydb-go-sdk/v3/scripting"
Expand Down Expand Up @@ -68,6 +71,9 @@ type Driver struct { //nolint:maligned
table *internalTable.Client
tableOptions []tableConfig.Option

query *internalQuery.Client
queryOptions []queryConfig.Option

scripting *internalScripting.Client
scriptingOptions []scriptingConfig.Option

Expand Down Expand Up @@ -141,6 +147,7 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
d.scheme.Close,
d.scripting.Close,
d.table.Close,
d.query.Close,
d.topic.Close,
d.balancer.Close,
d.pool.Release,
Expand Down Expand Up @@ -180,6 +187,11 @@ func (d *Driver) Table() table.Client {
return d.table
}

// Query returns query client
func (d *Driver) Query() query.Client {
return nil // d.query
}

// Scheme returns scheme client
func (d *Driver) Scheme() scheme.Client {
return d.scheme
Expand Down Expand Up @@ -399,6 +411,22 @@ func (d *Driver) connect(ctx context.Context) (err error) {
return xerrors.WithStackTrace(err)
}

d.query, err = internalQuery.New(ctx,
d.balancer,
queryConfig.New(
append(
// prepend common params from root config
[]queryConfig.Option{
queryConfig.With(d.config.Common),
},
d.queryOptions...,
)...,
),
)
if err != nil {
return xerrors.WithStackTrace(err)
}

d.scheme, err = internalScheme.New(ctx,
d.balancer,
schemeConfig.New(
Expand Down
126 changes: 126 additions & 0 deletions internal/query/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package query

import (
"context"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
)

type balancer interface {
grpc.ClientConnInterface
}

var _ query.Client = (*Client)(nil)

type Client struct {
grpcClient Ydb_Query_V1.QueryServiceClient
pool SessionPool
}

func (c Client) Close(ctx context.Context) error {
err := c.pool.Close(ctx)
if err != nil {
return xerrors.WithStackTrace(err)
}
return nil
}

func (c Client) do(ctx context.Context, op query.Operation, opts query.DoOptions) error {
return retry.Retry(ctx, func(ctx context.Context) error {
err := c.pool.With(ctx, func(ctx context.Context, s *session) error {
err := op(ctx, s)
if err != nil {
return xerrors.WithStackTrace(err)
}
return nil
})
if err != nil {
return xerrors.WithStackTrace(err)
}
return nil
}, opts.RetryOptions...)
}

func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOption) error {
doOptions := query.NewDoOptions(opts...)
if doOptions.Label != "" {
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithLabel(doOptions.Label))
}
if doOptions.Idempotent {
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent))
}
return c.do(ctx, op, doOptions)
}

func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.DoTxOption) error {
doTxOptions := query.NewDoTxOptions(opts...)
return c.do(ctx, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, doTxOptions.TxSettings)
if err != nil {
return xerrors.WithStackTrace(err)
}
err = op(ctx, tx)
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}
}
err = tx.CommitTx(ctx)
if err != nil {
errRollback := tx.Rollback(ctx)
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}
return xerrors.WithStackTrace(err)
}
return nil
}, doTxOptions.DoOptions)
}

func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
return &Client{
grpcClient: grpcClient,
pool: &poolMock{
create: func(ctx context.Context) (*session, error) {
s, err := grpcClient.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{})
if err != nil {
return nil, xerrors.WithStackTrace(
xerrors.Transport(err),
)
}
if s.GetStatus() != Ydb.StatusIds_SUCCESS {
return nil, xerrors.WithStackTrace(
xerrors.Operation(xerrors.FromOperation(s)),
)
}
return &session{
id: s.GetSessionId(),
nodeID: s.GetNodeId(),
queryService: grpcClient,
status: query.SessionStatusReady,
}, nil
},
close: func(ctx context.Context, s *session) error {
_, err := grpcClient.DeleteSession(ctx,
&Ydb_Query.DeleteSessionRequest{
SessionId: s.id,
},
)
if err != nil {
return xerrors.WithStackTrace(err)
}
return nil
},
},
}, nil
}
90 changes: 90 additions & 0 deletions internal/query/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package config

import (
"time"

"github.com/jonboulle/clockwork"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

const (
DefaultSessionPoolDeleteTimeout = 500 * time.Millisecond
DefaultSessionPoolCreateSessionTimeout = 5 * time.Second
DefaultSessionPoolSizeLimit = 50
DefaultSessionPoolIdleThreshold = 5 * time.Minute
)

type Config struct {
config.Common

sizeLimit int

createSessionTimeout time.Duration
deleteTimeout time.Duration
idleThreshold time.Duration

trace *trace.Query

clock clockwork.Clock
}

func New(opts ...Option) *Config {
c := defaults()
for _, o := range opts {
if o != nil {
o(c)
}
}
return c
}

func defaults() *Config {
return &Config{
sizeLimit: DefaultSessionPoolSizeLimit,
createSessionTimeout: DefaultSessionPoolCreateSessionTimeout,
deleteTimeout: DefaultSessionPoolDeleteTimeout,
idleThreshold: DefaultSessionPoolIdleThreshold,
clock: clockwork.NewRealClock(),
trace: &trace.Query{},
}
}

// Trace defines trace over table client calls
func (c *Config) Trace() *trace.Query {
return c.trace
}

// Clock defines clock
func (c *Config) Clock() clockwork.Clock {
return c.clock
}

// SizeLimit is an upper bound of pooled sessions.
// If SizeLimit is less than or equal to zero then the
// DefaultSessionPoolSizeLimit variable is used as a limit.
func (c *Config) SizeLimit() int {
return c.sizeLimit
}

// IdleThreshold is a maximum duration between any activity within session.
// If this threshold reached, idle session will be closed
//
// If IdleThreshold is less than zero then there is no idle limit.
// If IdleThreshold is zero, then the DefaultSessionPoolIdleThreshold value is used.
func (c *Config) IdleThreshold() time.Duration {
return c.idleThreshold
}

// CreateSessionTimeout limits maximum time spent on Create session request
func (c *Config) CreateSessionTimeout() time.Duration {
return c.createSessionTimeout
}

// DeleteTimeout limits maximum time spent on Delete request
//
// If DeleteTimeout is less than or equal to zero then the DefaultSessionPoolDeleteTimeout is used.
func (c *Config) DeleteTimeout() time.Duration {
return c.deleteTimeout
}
63 changes: 63 additions & 0 deletions internal/query/config/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package config

import (
"time"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
)

type Option func(*Config)

// With applies common configuration params
func With(config config.Common) Option {
return func(c *Config) {
c.Common = config
}
}

// WithSizeLimit defines upper bound of pooled sessions.
// If sizeLimit is less than or equal to zero then the
// DefaultSessionPoolSizeLimit variable is used as a limit.
func WithSizeLimit(sizeLimit int) Option {
return func(c *Config) {
if sizeLimit > 0 {
c.sizeLimit = sizeLimit
}
}
}

// WithIdleThreshold sets maximum duration between any activity within session.
// If this threshold reached, session will be closed.
//
// If idleThreshold is less than zero then there is no idle limit.
// If idleThreshold is zero, then the DefaultSessionPoolIdleThreshold value is used.
func WithIdleThreshold(idleThreshold time.Duration) Option {
return func(c *Config) {
if idleThreshold < 0 {
idleThreshold = 0
}
c.idleThreshold = idleThreshold
}
}

// WithCreateSessionTimeout limits maximum time spent on Create session request
// If createSessionTimeout is less than or equal to zero then no used timeout on create session request
func WithCreateSessionTimeout(createSessionTimeout time.Duration) Option {
return func(c *Config) {
if createSessionTimeout > 0 {
c.createSessionTimeout = createSessionTimeout
} else {
c.createSessionTimeout = 0
}
}
}

// WithDeleteTimeout limits maximum time spent on Delete request
// If deleteTimeout is less than or equal to zero then the DefaultSessionPoolDeleteTimeout is used.
func WithDeleteTimeout(deleteTimeout time.Duration) Option {
return func(c *Config) {
if deleteTimeout > 0 {
c.deleteTimeout = deleteTimeout
}
}
}
Loading

0 comments on commit 45c2d7d

Please sign in to comment.