Skip to content

Commit

Permalink
fixes (linters, query parameters)
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Feb 28, 2024
1 parent 3dff950 commit ef1b438
Show file tree
Hide file tree
Showing 28 changed files with 280 additions and 328 deletions.
45 changes: 32 additions & 13 deletions internal/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type (
queryQueryContentAllocator
queryTransactionControlAllocator
queryTransactionControlBeginTxAllocator
queryTransactionControlTxIdAllocator
queryTransactionControlTxIDAllocator
queryTransactionSettingsAllocator
queryTransactionSettingsSerializableReadWriteAllocator
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func (a *Allocator) Free() {
a.queryQueryContentAllocator.free()
a.queryTransactionControlAllocator.free()
a.queryTransactionControlBeginTxAllocator.free()
a.queryTransactionControlTxIdAllocator.free()
a.queryTransactionControlTxIDAllocator.free()
a.queryTransactionSettingsAllocator.free()
a.queryTransactionSettingsSerializableReadWriteAllocator.free()

Expand Down Expand Up @@ -925,9 +925,12 @@ type queryExecuteQueryRequestAllocator struct {
allocations []*Ydb_Query.ExecuteQueryRequest
}

func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() (v *Ydb_Query.ExecuteQueryRequest) {
func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() (
v *Ydb_Query.ExecuteQueryRequest,
) {
v = queryExecuteQueryRequestPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -943,9 +946,12 @@ type queryExecuteQueryResponsePartAllocator struct {
allocations []*Ydb_Query.ExecuteQueryResponsePart
}

func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() (v *Ydb_Query.ExecuteQueryResponsePart) {
func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() (
v *Ydb_Query.ExecuteQueryResponsePart,
) {
v = queryExecuteQueryResponsePartPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -961,9 +967,12 @@ type queryExecuteQueryRequestQueryContentAllocator struct {
allocations []*Ydb_Query.ExecuteQueryRequest_QueryContent
}

func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() (v *Ydb_Query.ExecuteQueryRequest_QueryContent) {
func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() (
v *Ydb_Query.ExecuteQueryRequest_QueryContent,
) {
v = queryExecuteQueryRequestQueryContentPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -981,6 +990,7 @@ type queryTransactionControlAllocator struct {
func (a *queryTransactionControlAllocator) QueryTransactionControl() (v *Ydb_Query.TransactionControl) {
v = queryTransactionControlPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -996,9 +1006,12 @@ type queryTransactionControlBeginTxAllocator struct {
allocations []*Ydb_Query.TransactionControl_BeginTx
}

func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() (v *Ydb_Query.TransactionControl_BeginTx) {
func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() (
v *Ydb_Query.TransactionControl_BeginTx,
) {
v = queryTransactionControlBeginTxPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1009,19 +1022,20 @@ func (a *queryTransactionControlBeginTxAllocator) free() {
a.allocations = a.allocations[:0]
}

type queryTransactionControlTxIdAllocator struct {
type queryTransactionControlTxIDAllocator struct {
allocations []*Ydb_Query.TransactionControl_TxId
}

func (a *queryTransactionControlTxIdAllocator) QueryTransactionControlTxId() (v *Ydb_Query.TransactionControl_TxId) {
v = queryTransactionControlTxIdPool.Get()
func (a *queryTransactionControlTxIDAllocator) QueryTransactionControlTxID() (v *Ydb_Query.TransactionControl_TxId) {
v = queryTransactionControlTxIDPool.Get()
a.allocations = append(a.allocations, v)

return v
}

func (a *queryTransactionControlTxIdAllocator) free() {
func (a *queryTransactionControlTxIDAllocator) free() {
for _, v := range a.allocations {
queryTransactionControlTxIdPool.Put(v)
queryTransactionControlTxIDPool.Put(v)
}
a.allocations = a.allocations[:0]
}
Expand All @@ -1033,6 +1047,7 @@ type queryTransactionSettingsAllocator struct {
func (a *queryTransactionSettingsAllocator) QueryTransactionSettings() (v *Ydb_Query.TransactionSettings) {
v = queryTransactionSettingsPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1048,9 +1063,12 @@ type queryTransactionSettingsSerializableReadWriteAllocator struct {
allocations []*Ydb_Query.TransactionSettings_SerializableReadWrite
}

func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() (v *Ydb_Query.TransactionSettings_SerializableReadWrite) {
func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() (
v *Ydb_Query.TransactionSettings_SerializableReadWrite,
) {
v = queryTransactionSettingsSerializableReadWritePool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand All @@ -1068,6 +1086,7 @@ type queryQueryContentAllocator struct {
func (a *queryQueryContentAllocator) QueryQueryContent() (v *Ydb_Query.QueryContent) {
v = queryQueryContentPool.Get()
a.allocations = append(a.allocations, v)

return v
}

Expand Down Expand Up @@ -1144,7 +1163,7 @@ var (
queryQueryContentPool Pool[Ydb_Query.QueryContent]
queryTransactionControlPool Pool[Ydb_Query.TransactionControl]
queryTransactionControlBeginTxPool Pool[Ydb_Query.TransactionControl_BeginTx]
queryTransactionControlTxIdPool Pool[Ydb_Query.TransactionControl_TxId]
queryTransactionControlTxIDPool Pool[Ydb_Query.TransactionControl_TxId]
queryTransactionSettingsPool Pool[Ydb_Query.TransactionSettings]
queryTransactionSettingsSerializableReadWritePool Pool[Ydb_Query.TransactionSettings_SerializableReadWrite]
)
22 changes: 18 additions & 4 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (c Client) Close(ctx context.Context) error {
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}

Expand All @@ -46,11 +47,13 @@ func do(ctx context.Context, pool Pool, op query.Operation, opts query.DoOptions
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
})
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
}, opts.RetryOptions...)
}
Expand All @@ -63,6 +66,7 @@ func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOpti
if doOptions.Idempotent {
doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent))
}

return do(ctx, c.pool, op, doOptions)
}

Expand All @@ -78,6 +82,7 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}

return xerrors.WithStackTrace(err)
}
err = tx.CommitTx(ctx)
Expand All @@ -86,8 +91,10 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO
if errRollback != nil {
return xerrors.WithStackTrace(xerrors.Join(err, errRollback))
}

return xerrors.WithStackTrace(err)
}

return nil
}, opts.DoOptions)
}
Expand All @@ -100,6 +107,7 @@ func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.Do
if doTxOptions.Idempotent {
doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithIdempotent(doTxOptions.Idempotent))
}

return doTx(ctx, c.pool, op, doTxOptions)
}

Expand All @@ -115,6 +123,7 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
return xerrors.WithStackTrace(xerrors.FromOperation(response))
}

return nil
}

Expand All @@ -124,7 +133,9 @@ type createSessionSettings struct {
onAttach func(id string)
}

func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings) (_ *Session, finalErr error) {
func createSession(
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings,
) (_ *Session, finalErr error) {
var (
createSessionCtx context.Context
cancelCreateSession context.CancelFunc
Expand All @@ -148,7 +159,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
}
defer func() {
if finalErr != nil {
deleteSession(ctx, client, s.GetSessionId())
_ = deleteSession(ctx, client, s.GetSessionId())
}
}()
attachCtx, cancelAttach := xcontext.WithCancel(context.Background())
Expand All @@ -167,7 +178,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
}
defer func() {
if finalErr != nil {
attach.CloseSend()
_ = attach.CloseSend()
}
}()
state, err := attach.Recv()
Expand All @@ -190,7 +201,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
if settings.onDetach != nil {
settings.onDetach(session.id)
}
attach.CloseSend()
_ = attach.CloseSend()
cancelAttach()
atomic.StoreUint32(
(*uint32)(&session.status),
Expand Down Expand Up @@ -218,6 +229,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,

func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) {
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)

return &Client{
grpcClient: grpcClient,
pool: newStubPool(
Expand All @@ -229,13 +241,15 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

return s, nil
},
func(ctx context.Context, s *Session) error {
err := deleteSession(ctx, s.queryClient, s.id)
if err != nil {
return xerrors.WithStackTrace(err)
}

return nil
},
),
Expand Down
4 changes: 3 additions & 1 deletion internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestCreateSession(t *testing.T) {
}, nil)
service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil)
t.Log("execute")
var attached = 0
attached := 0
s, err := createSession(ctx, service, createSessionSettings{
onAttach: func(id string) {
attached++
Expand Down Expand Up @@ -188,6 +188,7 @@ func TestDo(t *testing.T) {
if counter < 10 {
return xerrors.Retryable(errors.New(""))
}

return nil
}, query.DoOptions{})
require.NoError(t, err)
Expand Down Expand Up @@ -233,6 +234,7 @@ func TestDoTx(t *testing.T) {
if counter < 10 {
return xerrors.Retryable(errors.New(""))
}

return nil
}, query.DoTxOptions{})
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/query/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var _ query.Column = (*column)(nil)

type column struct {
n string
t query.Type
t types.Type
}

func newColumn(c *Ydb.Column) *column {
Expand All @@ -26,13 +26,14 @@ func newColumns(cc []*Ydb.Column) (columns []query.Column) {
for i := range cc {
columns[i] = newColumn(cc[i])
}

return columns
}

func (c *column) Name() string {
return c.n
}

func (c *column) Type() query.Type {
func (c *column) Type() types.Type {
return c.t
}
13 changes: 6 additions & 7 deletions internal/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
)

var (
ErrNotImplemented = errors.New("not implemented yet")
errRedundantCallNextResultSet = errors.New("redundant call NextResultSet()")
errWrongNextResultSetIndex = errors.New("wrong result set index")
errInterruptedStream = errors.New("interrupted stream")
errClosedResult = errors.New("result closed early")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errWrongArgumentsCount = errors.New("wrong arguments count")
ErrNotImplemented = errors.New("not implemented yet")
errWrongNextResultSetIndex = errors.New("wrong result set index")
errInterruptedStream = errors.New("interrupted stream")
errClosedResult = errors.New("result closed early")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errWrongArgumentsCount = errors.New("wrong arguments count")
)
11 changes: 9 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
Expand All @@ -18,7 +19,7 @@ type executeSettings interface {
StatsMode() query.StatsMode
TxControl() *query.TransactionControl
Syntax() query.Syntax
Params() *query.Parameters
Params() *params.Parameters
CallOptions() []grpc.CallOption
}

Expand All @@ -39,11 +40,14 @@ func executeQueryRequest(a *allocator.Allocator, sessionID string, q string, set
return request, settings.CallOptions()
}

func queryFromText(a *allocator.Allocator, q string, syntax Ydb_Query.Syntax) *Ydb_Query.ExecuteQueryRequest_QueryContent {
func queryFromText(
a *allocator.Allocator, q string, syntax Ydb_Query.Syntax,
) *Ydb_Query.ExecuteQueryRequest_QueryContent {
content := a.QueryExecuteQueryRequestQueryContent()
content.QueryContent = a.QueryQueryContent()
content.QueryContent.Syntax = syntax
content.QueryContent.Text = q

return content
}

Expand All @@ -61,13 +65,16 @@ func execute(
stream, err := client.ExecuteQuery(ctx, request, callOptions...)
if err != nil {
cancel()

return nil, nil, xerrors.WithStackTrace(err)
}
r, txID, err := newResult(ctx, stream, cancel)
if err != nil {
cancel()

return nil, nil, xerrors.WithStackTrace(err)
}

return &transaction{
id: txID,
s: session,
Expand Down
Loading

0 comments on commit ef1b438

Please sign in to comment.