Skip to content

Commit

Permalink
fixes (linters, query parameters)
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Feb 28, 2024
1 parent 3dff950 commit 37e9d8b
Show file tree
Hide file tree
Showing 29 changed files with 305 additions and 351 deletions.
45 changes: 32 additions & 13 deletions internal/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type (
queryQueryContentAllocator
queryTransactionControlAllocator
queryTransactionControlBeginTxAllocator
queryTransactionControlTxIdAllocator
queryTransactionControlTxIDAllocator
queryTransactionSettingsAllocator
queryTransactionSettingsSerializableReadWriteAllocator
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (a *Allocator) Free() {
a.queryQueryContentAllocator.free()
a.queryTransactionControlAllocator.free()
a.queryTransactionControlBeginTxAllocator.free()
a.queryTransactionControlTxIdAllocator.free()
a.queryTransactionControlTxIDAllocator.free()
a.queryTransactionSettingsAllocator.free()
a.queryTransactionSettingsSerializableReadWriteAllocator.free()

Expand Down Expand Up @@ -925,9 +925,12 @@ type queryExecuteQueryRequestAllocator struct {
allocations []*Ydb_Query.ExecuteQueryRequest
}

func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() (v *Ydb_Query.ExecuteQueryRequest) {
func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() (
v *Ydb_Query.ExecuteQueryRequest,
) {
v = queryExecuteQueryRequestPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -943,9 +946,12 @@ type queryExecuteQueryResponsePartAllocator struct {
allocations []*Ydb_Query.ExecuteQueryResponsePart
}

func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() (v *Ydb_Query.ExecuteQueryResponsePart) {
func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() (
v *Ydb_Query.ExecuteQueryResponsePart,
) {
v = queryExecuteQueryResponsePartPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -961,9 +967,12 @@ type queryExecuteQueryRequestQueryContentAllocator struct {
allocations []*Ydb_Query.ExecuteQueryRequest_QueryContent
}

func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() (v *Ydb_Query.ExecuteQueryRequest_QueryContent) {
func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() (
v *Ydb_Query.ExecuteQueryRequest_QueryContent,
) {
v = queryExecuteQueryRequestQueryContentPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -981,6 +990,7 @@ type queryTransactionControlAllocator struct {
func (a *queryTransactionControlAllocator) QueryTransactionControl() (v *Ydb_Query.TransactionControl) {
v = queryTransactionControlPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -996,9 +1006,12 @@ type queryTransactionControlBeginTxAllocator struct {
allocations []*Ydb_Query.TransactionControl_BeginTx
}

func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() (v *Ydb_Query.TransactionControl_BeginTx) {
func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() (
v *Ydb_Query.TransactionControl_BeginTx,
) {
v = queryTransactionControlBeginTxPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1009,19 +1022,20 @@ func (a *queryTransactionControlBeginTxAllocator) free() {
a.allocations = a.allocations[:0]
}

type queryTransactionControlTxIdAllocator struct {
type queryTransactionControlTxIDAllocator struct {
allocations []*Ydb_Query.TransactionControl_TxId
}

func (a *queryTransactionControlTxIdAllocator) QueryTransactionControlTxId() (v *Ydb_Query.TransactionControl_TxId) {
v = queryTransactionControlTxIdPool.Get()
func (a *queryTransactionControlTxIDAllocator) QueryTransactionControlTxID() (v *Ydb_Query.TransactionControl_TxId) {
v = queryTransactionControlTxIDPool.Get()
a.allocations = append(a.allocations, v)

return v
}

func (a *queryTransactionControlTxIdAllocator) free() {
func (a *queryTransactionControlTxIDAllocator) free() {
for _, v := range a.allocations {
queryTransactionControlTxIdPool.Put(v)
queryTransactionControlTxIDPool.Put(v)
}
a.allocations = a.allocations[:0]
}
Expand All @@ -1033,6 +1047,7 @@ type queryTransactionSettingsAllocator struct {
func (a *queryTransactionSettingsAllocator) QueryTransactionSettings() (v *Ydb_Query.TransactionSettings) {
v = queryTransactionSettingsPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1048,9 +1063,12 @@ type queryTransactionSettingsSerializableReadWriteAllocator struct {
allocations []*Ydb_Query.TransactionSettings_SerializableReadWrite
}

func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() (v *Ydb_Query.TransactionSettings_SerializableReadWrite) {
func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() (
v *Ydb_Query.TransactionSettings_SerializableReadWrite,
) {
v = queryTransactionSettingsSerializableReadWritePool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1068,6 +1086,7 @@ type queryQueryContentAllocator struct {
func (a *queryQueryContentAllocator) QueryQueryContent() (v *Ydb_Query.QueryContent) {
v = queryQueryContentPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand Down Expand Up @@ -1144,7 +1163,7 @@ var (
queryQueryContentPool Pool[Ydb_Query.QueryContent]
queryTransactionControlPool Pool[Ydb_Query.TransactionControl]
queryTransactionControlBeginTxPool Pool[Ydb_Query.TransactionControl_BeginTx]
queryTransactionControlTxIdPool Pool[Ydb_Query.TransactionControl_TxId]
queryTransactionControlTxIDPool Pool[Ydb_Query.TransactionControl_TxId]
queryTransactionSettingsPool Pool[Ydb_Query.TransactionSettings]
queryTransactionSettingsSerializableReadWritePool Pool[Ydb_Query.TransactionSettings_SerializableReadWrite]
)
33 changes: 23 additions & 10 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,24 @@ func (c Client) Close(ctx context.Context) error {
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

func do(ctx context.Context, pool Pool, op query.Operation, opts query.DoOptions) error {
func do(ctx context.Context, pool Pool, op query.Operation, opts *query.DoOptions) error {
return retry.Retry(ctx, func(ctx context.Context) error {
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
err := op(ctx, s)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
})
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}, opts.RetryOptions...)
}
Expand All @@ -63,10 +66,11 @@ func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOpti
if doOptions.Idempotent {
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent))
}
return do(ctx, c.pool, op, doOptions)

return do(ctx, c.pool, op, &doOptions)
}

func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxOptions) error {
func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts *query.DoTxOptions) error {
return do(ctx, pool, func(ctx context.Context, s query.Session) error {
tx, err := s.Begin(ctx, opts.TxSettings)
if err != nil {
Expand All @@ -78,6 +82,7 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}

return xerrors.WithStackTrace(err)
}
err = tx.CommitTx(ctx)
Expand All @@ -86,10 +91,12 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}

return xerrors.WithStackTrace(err)
}

return nil
}, opts.DoOptions)
}, &opts.DoOptions)
}

func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.DoTxOption) error {
Expand All @@ -100,7 +107,8 @@ func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.Do
if doTxOptions.Idempotent {
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithIdempotent(doTxOptions.Idempotent))
}
return doTx(ctx, c.pool, op, doTxOptions)

return doTx(ctx, c.pool, op, &doTxOptions)
}

func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
Expand All @@ -115,6 +123,7 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
return xerrors.WithStackTrace(xerrors.FromOperation(response))
}

return nil
}

Expand All @@ -124,7 +133,9 @@ type createSessionSettings struct {
onAttach func(id string)
}

func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings) (_ *Session, finalErr error) {
func createSession(
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings,
) (_ *Session, finalErr error) {
var (
createSessionCtx context.Context
cancelCreateSession context.CancelFunc
Expand All @@ -148,7 +159,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
}
defer func() {
if finalErr != nil {
deleteSession(ctx, client, s.GetSessionId())
_ = deleteSession(ctx, client, s.GetSessionId())
}
}()
attachCtx, cancelAttach := xcontext.WithCancel(context.Background())
Expand All @@ -167,7 +178,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
}
defer func() {
if finalErr != nil {
attach.CloseSend()
_ = attach.CloseSend()
}
}()
state, err := attach.Recv()
Expand All @@ -190,7 +201,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
if settings.onDetach != nil {
settings.onDetach(session.id)
}
attach.CloseSend()
_ = attach.CloseSend()
cancelAttach()
atomic.StoreUint32(
(*uint32)(&session.status),
Expand Down Expand Up @@ -218,24 +229,26 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,

func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)

return &Client{
grpcClient: grpcClient,
pool: newStubPool(
//config.PoolMaxSize(),
func(ctx context.Context) (_ *Session, err error) {
s, err := createSession(ctx, grpcClient, createSessionSettings{
createSessionTimeout: config.CreateSessionTimeout(),
})
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return s, nil
},
func(ctx context.Context, s *Session) error {
err := deleteSession(ctx, s.queryClient, s.id)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
),
Expand Down
12 changes: 7 additions & 5 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCreateSession(t *testing.T) {
}, nil)
service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil)
t.Log("execute")
var attached = 0
attached := 0
s, err := createSession(ctx, service, createSessionSettings{
onAttach: func(id string) {
attached++
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestDo(t *testing.T) {
return newTestSession()
}), func(ctx context.Context, s query.Session) error {
return nil
}, query.DoOptions{})
}, &query.DoOptions{})
require.NoError(t, err)
})
t.Run("RetryableError", func(t *testing.T) {
Expand All @@ -188,8 +188,9 @@ func TestDo(t *testing.T) {
if counter < 10 {
return xerrors.Retryable(errors.New(""))
}

return nil
}, query.DoOptions{})
}, &query.DoOptions{})
require.NoError(t, err)
require.Equal(t, 10, counter)
})
Expand All @@ -210,7 +211,7 @@ func TestDoTx(t *testing.T) {
return newTestSessionWithClient(client)
}), func(ctx context.Context, tx query.TxActor) error {
return nil
}, query.DoTxOptions{})
}, &query.DoTxOptions{})
require.NoError(t, err)
})
t.Run("RetryableError", func(t *testing.T) {
Expand All @@ -233,8 +234,9 @@ func TestDoTx(t *testing.T) {
if counter < 10 {
return xerrors.Retryable(errors.New(""))
}

return nil
}, query.DoTxOptions{})
}, &query.DoTxOptions{})
require.NoError(t, err)
require.Equal(t, 10, counter)
})
Expand Down
5 changes: 3 additions & 2 deletions internal/query/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var _ query.Column = (*column)(nil)

type column struct {
n string
t query.Type
t types.Type
}

func newColumn(c *Ydb.Column) *column {
Expand All @@ -26,13 +26,14 @@ func newColumns(cc []*Ydb.Column) (columns []query.Column) {
for i := range cc {
columns[i] = newColumn(cc[i])
}

return columns
}

func (c *column) Name() string {
return c.n
}

func (c *column) Type() query.Type {
func (c *column) Type() types.Type {
return c.t
}
1 change: 1 addition & 0 deletions internal/query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func New(opts ...Option) *Config {
o(c)
}
}

return c
}

Expand Down
13 changes: 6 additions & 7 deletions internal/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
)

var (
ErrNotImplemented = errors.New("not implemented yet")
errRedundantCallNextResultSet = errors.New("redundant call NextResultSet()")
errWrongNextResultSetIndex = errors.New("wrong result set index")
errInterruptedStream = errors.New("interrupted stream")
errClosedResult = errors.New("result closed early")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errWrongArgumentsCount = errors.New("wrong arguments count")
ErrNotImplemented = errors.New("not implemented yet")
errWrongNextResultSetIndex = errors.New("wrong result set index")
errInterruptedStream = errors.New("interrupted stream")
errClosedResult = errors.New("result closed early")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errWrongArgumentsCount = errors.New("wrong arguments count")
)
Loading

0 comments on commit 37e9d8b

Please sign in to comment.