diff --git a/internal/allocator/allocator.go b/internal/allocator/allocator.go index 31fae60ef..378e8d244 100644 --- a/internal/allocator/allocator.go +++ b/internal/allocator/allocator.go @@ -57,7 +57,7 @@ type ( queryQueryContentAllocator queryTransactionControlAllocator queryTransactionControlBeginTxAllocator - queryTransactionControlTxIdAllocator + queryTransactionControlTxIDAllocator queryTransactionSettingsAllocator queryTransactionSettingsSerializableReadWriteAllocator } @@ -115,7 +115,7 @@ func (a *Allocator) Free() { a.queryQueryContentAllocator.free() a.queryTransactionControlAllocator.free() a.queryTransactionControlBeginTxAllocator.free() - a.queryTransactionControlTxIdAllocator.free() + a.queryTransactionControlTxIDAllocator.free() a.queryTransactionSettingsAllocator.free() a.queryTransactionSettingsSerializableReadWriteAllocator.free() @@ -925,9 +925,12 @@ type queryExecuteQueryRequestAllocator struct { allocations []*Ydb_Query.ExecuteQueryRequest } -func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() (v *Ydb_Query.ExecuteQueryRequest) { +func (a *queryExecuteQueryRequestAllocator) QueryExecuteQueryRequest() ( + v *Ydb_Query.ExecuteQueryRequest, +) { v = queryExecuteQueryRequestPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -943,9 +946,12 @@ type queryExecuteQueryResponsePartAllocator struct { allocations []*Ydb_Query.ExecuteQueryResponsePart } -func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() (v *Ydb_Query.ExecuteQueryResponsePart) { +func (a *queryExecuteQueryResponsePartAllocator) QueryExecuteQueryResponsePart() ( + v *Ydb_Query.ExecuteQueryResponsePart, +) { v = queryExecuteQueryResponsePartPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -961,9 +967,12 @@ type queryExecuteQueryRequestQueryContentAllocator struct { allocations []*Ydb_Query.ExecuteQueryRequest_QueryContent } -func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() (v *Ydb_Query.ExecuteQueryRequest_QueryContent) { +func (a *queryExecuteQueryRequestQueryContentAllocator) QueryExecuteQueryRequestQueryContent() ( + v *Ydb_Query.ExecuteQueryRequest_QueryContent, +) { v = queryExecuteQueryRequestQueryContentPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -981,6 +990,7 @@ type queryTransactionControlAllocator struct { func (a *queryTransactionControlAllocator) QueryTransactionControl() (v *Ydb_Query.TransactionControl) { v = queryTransactionControlPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -996,9 +1006,12 @@ type queryTransactionControlBeginTxAllocator struct { allocations []*Ydb_Query.TransactionControl_BeginTx } -func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() (v *Ydb_Query.TransactionControl_BeginTx) { +func (a *queryTransactionControlBeginTxAllocator) QueryTransactionControlBeginTx() ( + v *Ydb_Query.TransactionControl_BeginTx, +) { v = queryTransactionControlBeginTxPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -1009,19 +1022,20 @@ func (a *queryTransactionControlBeginTxAllocator) free() { a.allocations = a.allocations[:0] } -type queryTransactionControlTxIdAllocator struct { +type queryTransactionControlTxIDAllocator struct { allocations []*Ydb_Query.TransactionControl_TxId } -func (a *queryTransactionControlTxIdAllocator) QueryTransactionControlTxId() (v *Ydb_Query.TransactionControl_TxId) { - v = queryTransactionControlTxIdPool.Get() +func (a *queryTransactionControlTxIDAllocator) QueryTransactionControlTxID() (v *Ydb_Query.TransactionControl_TxId) { + v = queryTransactionControlTxIDPool.Get() a.allocations = append(a.allocations, v) + return v } -func (a *queryTransactionControlTxIdAllocator) free() { +func (a *queryTransactionControlTxIDAllocator) free() { for _, v := range a.allocations { - queryTransactionControlTxIdPool.Put(v) + queryTransactionControlTxIDPool.Put(v) } a.allocations = a.allocations[:0] } @@ -1033,6 +1047,7 @@ type queryTransactionSettingsAllocator struct { func (a *queryTransactionSettingsAllocator) QueryTransactionSettings() (v *Ydb_Query.TransactionSettings) { v = queryTransactionSettingsPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -1048,9 +1063,12 @@ type queryTransactionSettingsSerializableReadWriteAllocator struct { allocations []*Ydb_Query.TransactionSettings_SerializableReadWrite } -func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() (v *Ydb_Query.TransactionSettings_SerializableReadWrite) { +func (a *queryTransactionSettingsSerializableReadWriteAllocator) QueryTransactionSettingsSerializableReadWrite() ( + v *Ydb_Query.TransactionSettings_SerializableReadWrite, +) { v = queryTransactionSettingsSerializableReadWritePool.Get() a.allocations = append(a.allocations, v) + return v } @@ -1068,6 +1086,7 @@ type queryQueryContentAllocator struct { func (a *queryQueryContentAllocator) QueryQueryContent() (v *Ydb_Query.QueryContent) { v = queryQueryContentPool.Get() a.allocations = append(a.allocations, v) + return v } @@ -1144,7 +1163,7 @@ var ( queryQueryContentPool Pool[Ydb_Query.QueryContent] queryTransactionControlPool Pool[Ydb_Query.TransactionControl] queryTransactionControlBeginTxPool Pool[Ydb_Query.TransactionControl_BeginTx] - queryTransactionControlTxIdPool Pool[Ydb_Query.TransactionControl_TxId] + queryTransactionControlTxIDPool Pool[Ydb_Query.TransactionControl_TxId] queryTransactionSettingsPool Pool[Ydb_Query.TransactionSettings] queryTransactionSettingsSerializableReadWritePool Pool[Ydb_Query.TransactionSettings_SerializableReadWrite] ) diff --git a/internal/query/client.go b/internal/query/client.go index 6eaa0bc1c..9d4235420 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -36,6 +36,7 @@ func (c Client) Close(ctx context.Context) error { if err != nil { return xerrors.WithStackTrace(err) } + return nil } @@ -46,11 +47,13 @@ func do(ctx context.Context, pool Pool, op query.Operation, opts query.DoOptions if err != nil { return xerrors.WithStackTrace(err) } + return nil }) if err != nil { return xerrors.WithStackTrace(err) } + return nil }, opts.RetryOptions...) } @@ -63,6 +66,7 @@ func (c Client) Do(ctx context.Context, op query.Operation, opts ...query.DoOpti if doOptions.Idempotent { doOptions.RetryOptions = append(doOptions.RetryOptions, retry.WithIdempotent(doOptions.Idempotent)) } + return do(ctx, c.pool, op, doOptions) } @@ -78,6 +82,7 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO if errRollback != nil { return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) } + return xerrors.WithStackTrace(err) } err = tx.CommitTx(ctx) @@ -86,8 +91,10 @@ func doTx(ctx context.Context, pool Pool, op query.TxOperation, opts query.DoTxO if errRollback != nil { return xerrors.WithStackTrace(xerrors.Join(err, errRollback)) } + return xerrors.WithStackTrace(err) } + return nil }, opts.DoOptions) } @@ -100,6 +107,7 @@ func (c Client) DoTx(ctx context.Context, op query.TxOperation, opts ...query.Do if doTxOptions.Idempotent { doTxOptions.RetryOptions = append(doTxOptions.RetryOptions, retry.WithIdempotent(doTxOptions.Idempotent)) } + return doTx(ctx, c.pool, op, doTxOptions) } @@ -115,6 +123,7 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, if response.GetStatus() != Ydb.StatusIds_SUCCESS { return xerrors.WithStackTrace(xerrors.FromOperation(response)) } + return nil } @@ -124,7 +133,9 @@ type createSessionSettings struct { onAttach func(id string) } -func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings) (_ *Session, finalErr error) { +func createSession( + ctx context.Context, client Ydb_Query_V1.QueryServiceClient, settings createSessionSettings, +) (_ *Session, finalErr error) { var ( createSessionCtx context.Context cancelCreateSession context.CancelFunc @@ -148,7 +159,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, } defer func() { if finalErr != nil { - deleteSession(ctx, client, s.GetSessionId()) + _ = deleteSession(ctx, client, s.GetSessionId()) } }() attachCtx, cancelAttach := xcontext.WithCancel(context.Background()) @@ -167,7 +178,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, } defer func() { if finalErr != nil { - attach.CloseSend() + _ = attach.CloseSend() } }() state, err := attach.Recv() @@ -190,7 +201,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, if settings.onDetach != nil { settings.onDetach(session.id) } - attach.CloseSend() + _ = attach.CloseSend() cancelAttach() atomic.StoreUint32( (*uint32)(&session.status), @@ -218,6 +229,7 @@ func createSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, func New(ctx context.Context, balancer balancer, config *config.Config) (*Client, error) { grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer) + return &Client{ grpcClient: grpcClient, pool: newStubPool( @@ -229,6 +241,7 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client if err != nil { return nil, xerrors.WithStackTrace(err) } + return s, nil }, func(ctx context.Context, s *Session) error { @@ -236,6 +249,7 @@ func New(ctx context.Context, balancer balancer, config *config.Config) (*Client if err != nil { return xerrors.WithStackTrace(err) } + return nil }, ), diff --git a/internal/query/client_test.go b/internal/query/client_test.go index 9b154317f..1385bddb3 100644 --- a/internal/query/client_test.go +++ b/internal/query/client_test.go @@ -36,7 +36,7 @@ func TestCreateSession(t *testing.T) { }, nil) service.EXPECT().AttachSession(gomock.Any(), gomock.Any()).Return(attachStream, nil) t.Log("execute") - var attached = 0 + attached := 0 s, err := createSession(ctx, service, createSessionSettings{ onAttach: func(id string) { attached++ @@ -188,6 +188,7 @@ func TestDo(t *testing.T) { if counter < 10 { return xerrors.Retryable(errors.New("")) } + return nil }, query.DoOptions{}) require.NoError(t, err) @@ -233,6 +234,7 @@ func TestDoTx(t *testing.T) { if counter < 10 { return xerrors.Retryable(errors.New("")) } + return nil }, query.DoTxOptions{}) require.NoError(t, err) diff --git a/internal/query/column.go b/internal/query/column.go index f320e06a5..4532b24ad 100644 --- a/internal/query/column.go +++ b/internal/query/column.go @@ -11,7 +11,7 @@ var _ query.Column = (*column)(nil) type column struct { n string - t query.Type + t types.Type } func newColumn(c *Ydb.Column) *column { @@ -26,6 +26,7 @@ func newColumns(cc []*Ydb.Column) (columns []query.Column) { for i := range cc { columns[i] = newColumn(cc[i]) } + return columns } @@ -33,6 +34,6 @@ func (c *column) Name() string { return c.n } -func (c *column) Type() query.Type { +func (c *column) Type() types.Type { return c.t } diff --git a/internal/query/errors.go b/internal/query/errors.go index ffb9b5272..7b866668d 100644 --- a/internal/query/errors.go +++ b/internal/query/errors.go @@ -5,11 +5,10 @@ import ( ) var ( - ErrNotImplemented = errors.New("not implemented yet") - errRedundantCallNextResultSet = errors.New("redundant call NextResultSet()") - errWrongNextResultSetIndex = errors.New("wrong result set index") - errInterruptedStream = errors.New("interrupted stream") - errClosedResult = errors.New("result closed early") - errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") - errWrongArgumentsCount = errors.New("wrong arguments count") + ErrNotImplemented = errors.New("not implemented yet") + errWrongNextResultSetIndex = errors.New("wrong result set index") + errInterruptedStream = errors.New("interrupted stream") + errClosedResult = errors.New("result closed early") + errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index") + errWrongArgumentsCount = errors.New("wrong arguments count") ) diff --git a/internal/query/execute_query.go b/internal/query/execute_query.go index e9bd8406f..397c1200b 100644 --- a/internal/query/execute_query.go +++ b/internal/query/execute_query.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -18,7 +19,7 @@ type executeSettings interface { StatsMode() query.StatsMode TxControl() *query.TransactionControl Syntax() query.Syntax - Params() *query.Parameters + Params() *params.Parameters CallOptions() []grpc.CallOption } @@ -39,11 +40,14 @@ func executeQueryRequest(a *allocator.Allocator, sessionID string, q string, set return request, settings.CallOptions() } -func queryFromText(a *allocator.Allocator, q string, syntax Ydb_Query.Syntax) *Ydb_Query.ExecuteQueryRequest_QueryContent { +func queryFromText( + a *allocator.Allocator, q string, syntax Ydb_Query.Syntax, +) *Ydb_Query.ExecuteQueryRequest_QueryContent { content := a.QueryExecuteQueryRequestQueryContent() content.QueryContent = a.QueryQueryContent() content.QueryContent.Syntax = syntax content.QueryContent.Text = q + return content } @@ -61,13 +65,16 @@ func execute( stream, err := client.ExecuteQuery(ctx, request, callOptions...) if err != nil { cancel() + return nil, nil, xerrors.WithStackTrace(err) } r, txID, err := newResult(ctx, stream, cancel) if err != nil { cancel() + return nil, nil, xerrors.WithStackTrace(err) } + return &transaction{ id: txID, s: session, diff --git a/internal/query/execute_query_test.go b/internal/query/execute_query_test.go index c68c3739e..1803b0b5b 100644 --- a/internal/query/execute_query_test.go +++ b/internal/query/execute_query_test.go @@ -15,6 +15,7 @@ import ( grpcStatus "google.golang.org/grpc/status" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -792,9 +793,11 @@ func TestExecuteQueryRequest(t *testing.T) { name: "WithParams", opts: []query.ExecuteOption{ query.WithParameters( - query.Param("$a", query.TextValue("A")), - query.Param("$b", query.TextValue("B")), - query.Param("$c", query.TextValue("C")), + params.Builder{}. + Param("$a").Text("A"). + Param("$b").Text("B"). + Param("$c").Text("C"). + Build(), ), }, request: &Ydb_Query.ExecuteQueryRequest{ diff --git a/internal/query/pool.go b/internal/query/pool.go index 3753c5ead..0e633c408 100644 --- a/internal/query/pool.go +++ b/internal/query/pool.go @@ -16,17 +16,17 @@ type Pool interface { var _ Pool = (*stubPool)(nil) type stubPool struct { - create func(ctx context.Context) (*Session, error) - close func(ctx context.Context, s *Session) error + createFunc func(ctx context.Context) (*Session, error) + closeFunc func(ctx context.Context, s *Session) error } func newStubPool( - create func(ctx context.Context) (*Session, error), - close func(ctx context.Context, s *Session) error, + createFunc func(ctx context.Context) (*Session, error), + closeFunc func(ctx context.Context, s *Session) error, ) *stubPool { return &stubPool{ - create: create, - close: close, + createFunc: createFunc, + closeFunc: closeFunc, } } @@ -39,16 +39,17 @@ func (pool *stubPool) get(ctx context.Context) (*Session, error) { case <-ctx.Done(): return nil, xerrors.WithStackTrace(ctx.Err()) default: - s, err := pool.create(ctx) + s, err := pool.createFunc(ctx) if err != nil { return nil, xerrors.WithStackTrace(err) } + return s, nil } } func (pool *stubPool) put(ctx context.Context, s *Session) { - pool.close(ctx, s) + _ = pool.closeFunc(ctx, s) } func (pool *stubPool) With(ctx context.Context, f func(ctx context.Context, s *Session) error) error { @@ -63,5 +64,6 @@ func (pool *stubPool) With(ctx context.Context, f func(ctx context.Context, s *S if err != nil { return xerrors.WithStackTrace(err) } + return nil } diff --git a/internal/query/result.go b/internal/query/result.go index 75cd8f888..ffaaa2b2a 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -56,6 +56,7 @@ func newResult( r.interrupt() close(r.closed) }) + return &r, part.GetTxMeta().GetId(), nil } } @@ -66,6 +67,7 @@ func nextPart(stream Ydb_Query_V1.QueryService_ExecuteQueryClient) (*Ydb_Query.E if xerrors.Is(err, io.EOF) { return nil, xerrors.WithStackTrace(err) } + return nil, xerrors.WithStackTrace(xerrors.Transport(err)) } if status := part.GetStatus(); status != Ydb.StatusIds_SUCCESS { @@ -73,11 +75,13 @@ func nextPart(stream Ydb_Query_V1.QueryService_ExecuteQueryClient) (*Ydb_Query.E xerrors.FromOperation(part), ) } + return part, nil } func (r *result) Close(ctx context.Context) error { r.close() + return nil } @@ -103,6 +107,7 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) { default: if r.lastPart.GetResultSetIndex() >= nextResultSetIndex { r.resultSetIndex = r.lastPart.GetResultSetIndex() + return newResultSet(func() (_ *Ydb_Query.ExecuteQueryResponsePart, err error) { defer func() { if err != nil && !xerrors.Is(err, @@ -122,15 +127,17 @@ func (r *result) nextResultSet(ctx context.Context) (_ *resultSet, err error) { if xerrors.Is(err, io.EOF) { r.close() } + return nil, xerrors.WithStackTrace(err) } r.lastPart = part if part.GetResultSetIndex() > nextResultSetIndex { return nil, xerrors.WithStackTrace(fmt.Errorf( - "result set (index=%d) recieve part (index=%d) for next result set: %w", + "result set (index=%d) receive part (index=%d) for next result set: %w", nextResultSetIndex, part.GetResultSetIndex(), io.EOF, )) } + return part, nil } }, r.lastPart), nil diff --git a/internal/query/result_set.go b/internal/query/result_set.go index f4f62bb5e..aeb1d6db6 100644 --- a/internal/query/result_set.go +++ b/internal/query/result_set.go @@ -22,9 +22,14 @@ type resultSet struct { done chan struct{} } -func newResultSet(recv func() (*Ydb_Query.ExecuteQueryResponsePart, error), part *Ydb_Query.ExecuteQueryResponsePart) *resultSet { +func newResultSet( + recv func() ( + *Ydb_Query.ExecuteQueryResponsePart, error, + ), + part *Ydb_Query.ExecuteQueryResponsePart, +) *resultSet { return &resultSet{ - index: part.ResultSetIndex, + index: part.GetResultSetIndex(), recv: recv, currentPart: part, rowIndex: -1, @@ -47,6 +52,7 @@ func (rs *resultSet) next(ctx context.Context) (*row, error) { if xerrors.Is(err, io.EOF) { close(rs.done) } + return nil, xerrors.WithStackTrace(err) } rs.rowIndex = 0 @@ -54,11 +60,13 @@ func (rs *resultSet) next(ctx context.Context) (*row, error) { } if rs.index != rs.currentPart.GetResultSetIndex() { close(rs.done) + return nil, xerrors.WithStackTrace(fmt.Errorf( "received part with result set index = %d, current result set index = %d: %w", rs.index, rs.currentPart.GetResultSetIndex(), errWrongResultSetIndex, )) } + return newRow(rs.columns, rs.currentPart.GetResultSet().GetRows()[rs.rowIndex]) } } diff --git a/internal/query/result_set_test.go b/internal/query/result_set_test.go index 4be49016b..a5d090310 100644 --- a/internal/query/result_set_test.go +++ b/internal/query/result_set_test.go @@ -112,15 +112,16 @@ func TestResultSetNext(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, io.EOF) - part, err := stream.Recv() + recv, err := stream.Recv() require.NoError(t, err) rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { part, err := stream.Recv() if err != nil { return nil, xerrors.WithStackTrace(err) } + return part, nil - }, part) + }, recv) require.EqualValues(t, 0, rs.index) { _, err := rs.next(ctx) @@ -215,15 +216,16 @@ func TestResultSetNext(t *testing.T) { }, }, }, nil) - part, err := stream.Recv() + recv, err := stream.Recv() require.NoError(t, err) rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { part, err := stream.Recv() if err != nil { return nil, xerrors.WithStackTrace(err) } + return part, nil - }, part) + }, recv) require.EqualValues(t, 0, rs.index) { _, err := rs.next(ctx) @@ -303,21 +305,22 @@ func TestResultSetNext(t *testing.T) { Status: Ydb.StatusIds_OVERLOADED, ResultSetIndex: 0, }, nil) - part, err := stream.Recv() + recv, err := stream.Recv() require.NoError(t, err) rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { part, err := nextPart(stream) if err != nil { return nil, xerrors.WithStackTrace(err) } - if part.ResultSetIndex != 0 { + if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { return nil, xerrors.WithStackTrace(fmt.Errorf( "critical violation of the logic: wrong result set index: %d != %d", - part.ResultSetIndex, 0, + resultSetIndex, 0, )) } + return part, nil - }, part) + }, recv) require.EqualValues(t, 0, rs.index) { _, err := rs.next(ctx) @@ -403,21 +406,22 @@ func TestResultSetNext(t *testing.T) { }, }, nil) stream.EXPECT().Recv().Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) - part, err := stream.Recv() + recv, err := stream.Recv() require.NoError(t, err) rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { part, err := nextPart(stream) if err != nil { return nil, xerrors.WithStackTrace(err) } - if part.ResultSetIndex != 0 { + if resultSetIndex := part.GetResultSetIndex(); resultSetIndex != 0 { return nil, xerrors.WithStackTrace(fmt.Errorf( "critical violation of the logic: wrong result set index: %d != %d", - part.ResultSetIndex, 0, + resultSetIndex, 0, )) } + return part, nil - }, part) + }, recv) require.EqualValues(t, 0, rs.index) { _, err := rs.next(ctx) @@ -561,15 +565,16 @@ func TestResultSetNext(t *testing.T) { }, }, }, nil) - part, err := stream.Recv() + recv, err := stream.Recv() require.NoError(t, err) rs := newResultSet(func() (*Ydb_Query.ExecuteQueryResponsePart, error) { part, err := nextPart(stream) if err != nil { return nil, xerrors.WithStackTrace(err) } + return part, nil - }, part) + }, recv) require.EqualValues(t, 0, rs.index) { _, err := rs.next(ctx) diff --git a/internal/query/row.go b/internal/query/row.go index 7f3e347b8..cd456bcf6 100644 --- a/internal/query/row.go +++ b/internal/query/row.go @@ -16,6 +16,7 @@ type row struct { func newRow(columns []query.Column, v *Ydb.Value) (*row, error) { data := newScannerData(columns, v.GetItems()) + return &row{ newScannerIndexed(data), newScannerNamed(data), diff --git a/internal/query/session.go b/internal/query/session.go index 02bdbbd6b..130d4d5f0 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -25,10 +25,13 @@ type Session struct { func (s *Session) Close(ctx context.Context) error { s.close() + return nil } -func begin(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string, txSettings query.TransactionSettings) (*transaction, error) { +func begin( + ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string, txSettings query.TransactionSettings, +) (*transaction, error) { a := allocator.New() defer a.Free() response, err := client.BeginTransaction(ctx, @@ -43,6 +46,7 @@ func begin(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionI if response.GetStatus() != Ydb.StatusIds_SUCCESS { return nil, xerrors.WithStackTrace(xerrors.FromOperation(response)) } + return &transaction{ id: response.GetTxMeta().GetId(), }, nil @@ -54,6 +58,7 @@ func (s *Session) Begin(ctx context.Context, txSettings query.TransactionSetting return nil, xerrors.WithStackTrace(err) } tx.s = s + return tx, nil } @@ -66,8 +71,7 @@ func (s *Session) NodeID() int64 { } func (s *Session) Status() query.SessionStatus { - status := query.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))) - return status + return query.SessionStatus(atomic.LoadUint32((*uint32)(&s.status))) } func (s *Session) Execute( diff --git a/internal/query/transaction.go b/internal/query/transaction.go index b971261f2..ccdfd31c5 100644 --- a/internal/query/transaction.go +++ b/internal/query/transaction.go @@ -30,14 +30,18 @@ func fromTxOptions(txID string, txOpts ...query.TxExecuteOption) executeSettings } } opts = append(opts, query.WithTxControl(query.TxControl(query.WithTxID(txID)))) + return query.ExecuteSettings(opts...) } -func (tx transaction) Execute(ctx context.Context, q string, opts ...query.TxExecuteOption) (r query.Result, err error) { +func (tx transaction) Execute(ctx context.Context, q string, opts ...query.TxExecuteOption) ( + r query.Result, err error, +) { _, res, err := execute(ctx, tx.s, tx.s.queryClient, q, fromTxOptions(tx.id, opts...)) if err != nil { return nil, xerrors.WithStackTrace(err) } + return res, nil } @@ -52,6 +56,7 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi if response.GetStatus() != Ydb.StatusIds_SUCCESS { return xerrors.WithStackTrace(xerrors.FromOperation(response)) } + return nil } @@ -70,6 +75,7 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi if response.GetStatus() != Ydb.StatusIds_SUCCESS { return xerrors.WithStackTrace(xerrors.FromOperation(response)) } + return nil } diff --git a/internal/query/transaction_test.go b/internal/query/transaction_test.go index 09c77021d..f9fc9c73c 100644 --- a/internal/query/transaction_test.go +++ b/internal/query/transaction_test.go @@ -12,6 +12,7 @@ import ( grpcStatus "google.golang.org/grpc/status" "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -22,9 +23,11 @@ func TestCommitTx(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{ - Status: Ydb.StatusIds_SUCCESS, - }, nil) + service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return( + &Ydb_Query.CommitTransactionResponse{ + Status: Ydb.StatusIds_SUCCESS, + }, nil, + ) t.Log("commit") err := commitTx(ctx, service, "123", "456") require.NoError(t, err) @@ -33,7 +36,9 @@ func TestCommitTx(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return( + nil, grpcStatus.Error(grpcCodes.Unavailable, ""), + ) t.Log("commit") err := commitTx(ctx, service, "123", "456") require.Error(t, err) @@ -43,9 +48,11 @@ func TestCommitTx(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{ - Status: Ydb.StatusIds_UNAVAILABLE, - }, nil) + service.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return( + &Ydb_Query.CommitTransactionResponse{ + Status: Ydb.StatusIds_UNAVAILABLE, + }, nil, + ) t.Log("commit") err := commitTx(ctx, service, "123", "456") require.Error(t, err) @@ -58,9 +65,11 @@ func TestRollback(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.RollbackTransactionResponse{ - Status: Ydb.StatusIds_SUCCESS, - }, nil) + service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return( + &Ydb_Query.RollbackTransactionResponse{ + Status: Ydb.StatusIds_SUCCESS, + }, nil, + ) t.Log("rollback") err := rollback(ctx, service, "123", "456") require.NoError(t, err) @@ -69,7 +78,9 @@ func TestRollback(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return(nil, grpcStatus.Error(grpcCodes.Unavailable, "")) + service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return( + nil, grpcStatus.Error(grpcCodes.Unavailable, ""), + ) t.Log("rollback") err := rollback(ctx, service, "123", "456") require.Error(t, err) @@ -79,9 +90,11 @@ func TestRollback(t *testing.T) { ctx := xtest.Context(t) ctrl := gomock.NewController(t) service := NewMockQueryServiceClient(ctrl) - service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.RollbackTransactionResponse{ - Status: Ydb.StatusIds_UNAVAILABLE, - }, nil) + service.EXPECT().RollbackTransaction(gomock.Any(), gomock.Any()).Return( + &Ydb_Query.RollbackTransactionResponse{ + Status: Ydb.StatusIds_UNAVAILABLE, + }, nil, + ) t.Log("rollback") err := rollback(ctx, service, "123", "456") require.Error(t, err) @@ -94,7 +107,7 @@ type testExecuteSettings struct { statsMode query.StatsMode txControl *query.TransactionControl syntax query.Syntax - params *query.Parameters + params *params.Parameters callOptions []grpc.CallOption } @@ -114,7 +127,7 @@ func (s testExecuteSettings) Syntax() query.Syntax { return s.syntax } -func (s testExecuteSettings) Params() *query.Parameters { +func (s testExecuteSettings) Params() *params.Parameters { return s.params } @@ -185,7 +198,7 @@ func TestFromTxOptions(t *testing.T) { name: "WithParams", txOpts: []query.TxExecuteOption{ query.WithParameters( - query.Param("$a", query.TextValue("A")), + params.Builder{}.Param("$a").Text("A").Build(), ), }, settings: testExecuteSettings{ @@ -193,9 +206,7 @@ func TestFromTxOptions(t *testing.T) { statsMode: query.StatsModeNone, txControl: query.TxControl(query.WithTxID("")), syntax: Ydb_Query.Syntax_SYNTAX_YQL_V1, - params: query.Params( - query.Param("$a", query.TextValue("A")), - ), + params: params.Builder{}.Param("$a").Text("A").Build(), }, }, } { diff --git a/options.go b/options.go index 6af346db3..6cd920d2f 100644 --- a/options.go +++ b/options.go @@ -353,6 +353,7 @@ func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) O for _, cert := range certs { _ = WithCertificate(cert)(ctx, c) } + return nil } } @@ -362,6 +363,7 @@ func WithCertificatesFromPem(bytes []byte, opts ...certificates.FromPemOption) O func WithTableConfigOption(option tableConfig.Option) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, option) + return nil } } @@ -371,6 +373,7 @@ func WithTableConfigOption(option tableConfig.Option) Option { func WithQueryConfigOption(option queryConfig.Option) Option { return func(ctx context.Context, c *Driver) error { c.queryOptions = append(c.queryOptions, option) + return nil } } @@ -380,6 +383,7 @@ func WithSessionPoolSizeLimit(sizeLimit int) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithSizeLimit(sizeLimit)) c.queryOptions = append(c.queryOptions, queryConfig.WithSizeLimit(sizeLimit)) + return nil } } @@ -414,6 +418,7 @@ func WithSessionPoolCreateSessionTimeout(createSessionTimeout time.Duration) Opt return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithCreateSessionTimeout(createSessionTimeout)) c.queryOptions = append(c.queryOptions, queryConfig.WithCreateSessionTimeout(createSessionTimeout)) + return nil } } @@ -423,6 +428,7 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithDeleteTimeout(deleteTimeout)) c.queryOptions = append(c.queryOptions, queryConfig.WithDeleteTimeout(deleteTimeout)) + return nil } } @@ -431,6 +437,7 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option { func WithIgnoreTruncated() Option { return func(ctx context.Context, c *Driver) error { c.tableOptions = append(c.tableOptions, tableConfig.WithIgnoreTruncated()) + return nil } } @@ -443,6 +450,7 @@ func WithPanicCallback(panicCallback func(e interface{})) Option { return func(ctx context.Context, c *Driver) error { c.panicCallback = panicCallback c.options = append(c.options, config.WithPanicCallback(panicCallback)) + return nil } } @@ -462,6 +470,7 @@ func WithTraceTable(t trace.Table, opts ...trace.TableComposeOption) Option { // )..., ), ) + return nil } } diff --git a/query/column.go b/query/column.go index 49defc291..d8c689aca 100644 --- a/query/column.go +++ b/query/column.go @@ -1,6 +1,8 @@ package query +import "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + type Column interface { Name() string - Type() Type + Type() types.Type } diff --git a/query/do_options.go b/query/do_options.go index 36591176f..ee6429dbd 100644 --- a/query/do_options.go +++ b/query/do_options.go @@ -18,5 +18,6 @@ func NewDoOptions(opts ...DoOption) (doOptions DoOptions) { for _, opt := range opts { opt.applyDoOption(&doOptions) } + return doOptions } diff --git a/query/do_tx_options.go b/query/do_tx_options.go index 064de0794..f04fc3c0f 100644 --- a/query/do_tx_options.go +++ b/query/do_tx_options.go @@ -30,5 +30,6 @@ func NewDoTxOptions(opts ...DoTxOption) (doTxOptions DoTxOptions) { for _, opt := range opts { opt.applyDoTxOption(&doTxOptions) } + return doTxOptions } diff --git a/query/example_test.go b/query/example_test.go index 31a2af40c..8f926018f 100644 --- a/query/example_test.go +++ b/query/example_test.go @@ -2,7 +2,9 @@ package query_test import ( "context" + "errors" "fmt" + "io" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/query" @@ -13,6 +15,7 @@ func Example_selectWithoutParameters() { db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) + return } defer db.Close(ctx) // cleanup resources @@ -33,11 +36,19 @@ func Example_selectWithoutParameters() { for { // iterate over result sets rs, err := res.NextResultSet(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } for { // iterate over rows row, err := rs.NextRow(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } if err = row.Scan(&id, &myStr); err != nil { @@ -45,6 +56,7 @@ func Example_selectWithoutParameters() { } } } + return res.Err() // return finally result error for auto-retry with driver }, query.WithIdempotent(), @@ -60,6 +72,7 @@ func Example_selectWithParameters() { db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) + return } defer db.Close(ctx) // cleanup resources @@ -73,8 +86,10 @@ func Example_selectWithParameters() { _, res, err := s.Execute(ctx, `SELECT CAST($id AS Uint64) AS id, CAST($myStr AS Text) AS myStr`, query.WithParameters( - query.Param("$id", query.Uint64Value(123)), - query.Param("$myStr", query.TextValue("test")), + ydb.ParamsBuilder(). + Param("$id").Uint64(123). + Param("$myStr").Text("123"). + Build(), ), ) if err != nil { @@ -84,11 +99,19 @@ func Example_selectWithParameters() { for { // iterate over result sets rs, err := res.NextResultSet(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } for { // iterate over rows row, err := rs.NextRow(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } if err = row.ScanNamed( @@ -99,6 +122,7 @@ func Example_selectWithParameters() { } } } + return res.Err() // return finally result error for auto-retry with driver }, query.WithIdempotent(), @@ -114,6 +138,7 @@ func Example_txSelect() { db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) + return } defer db.Close(ctx) // cleanup resources @@ -134,11 +159,19 @@ func Example_txSelect() { for { // iterate over result sets rs, err := res.NextResultSet(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } for { // iterate over rows row, err := rs.NextRow(ctx) if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err } if err = row.ScanNamed( @@ -149,6 +182,7 @@ func Example_txSelect() { } } } + return res.Err() // return finally result error for auto-retry with driver }, query.WithIdempotent(), diff --git a/query/execute_options.go b/query/execute_options.go index 640c6e7b3..1cf782a5f 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -3,6 +3,8 @@ package query import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "google.golang.org/grpc" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" ) type ( @@ -12,14 +14,15 @@ type ( callOptions []grpc.CallOption commonExecuteSettings struct { syntax Syntax - params *Parameters + params params.Parameters execMode ExecMode statsMode StatsMode callOptions []grpc.CallOption - txControl *TransactionControl } executeSettings struct { commonExecuteSettings + + txControl *TransactionControl } ExecuteOption interface { applyExecuteOption(s *executeSettings) @@ -30,8 +33,17 @@ type ( TxExecuteOption interface { applyTxExecuteOption(s *txExecuteSettings) } + parametersOption params.Parameters ) +func (params parametersOption) applyTxExecuteOption(s *txExecuteSettings) { + s.params = append(s.params, params...) +} + +func (params parametersOption) applyExecuteOption(s *executeSettings) { + s.params = append(s.params, params...) +} + func (opts callOptions) applyExecuteOption(s *executeSettings) { s.callOptions = append(s.callOptions, opts...) } @@ -78,12 +90,16 @@ func defaultCommonExecuteSettings() commonExecuteSettings { } } -func ExecuteSettings(opts ...ExecuteOption) (settings executeSettings) { +func ExecuteSettings(opts ...ExecuteOption) (settings *executeSettings) { + settings = &executeSettings{ + commonExecuteSettings: defaultCommonExecuteSettings(), + } settings.commonExecuteSettings = defaultCommonExecuteSettings() settings.txControl = DefaultTxControl() for _, opt := range opts { - opt.applyExecuteOption(&settings) + opt.applyExecuteOption(settings) } + return settings } @@ -107,8 +123,12 @@ func (s executeSettings) StatsMode() StatsMode { return s.statsMode } -func (s executeSettings) Params() *Parameters { - return s.params +func (s executeSettings) Params() *params.Parameters { + if len(s.params) == 0 { + return nil + } + + return &s.params } func TxExecuteSettings(opts ...TxExecuteOption) (settings txExecuteSettings) { @@ -116,27 +136,16 @@ func TxExecuteSettings(opts ...TxExecuteOption) (settings txExecuteSettings) { for _, opt := range opts { opt.applyTxExecuteOption(&settings) } + return settings } var _ ExecuteOption = (*parametersOption)(nil) -func WithParameters(params ...Parameter) *parametersOption { - opt := ¶metersOption{ - params: &Parameters{ - m: make(queryParams, len(params)), - }, - } - opt.params.Add(params...) - return opt -} +func WithParameters(parameters *params.Parameters) *parametersOption { + params := parametersOption(*parameters) -func Params(params ...Parameter) *Parameters { - p := &Parameters{ - m: make(queryParams, len(params)), - } - p.Add(params...) - return p + return ¶ms } var ( diff --git a/query/named.go b/query/named.go index 5ff7a1d66..e25a3ef3a 100644 --- a/query/named.go +++ b/query/named.go @@ -28,5 +28,6 @@ func Named(columnName string, destinationValueReference interface{}) (dst namedD panic(fmt.Errorf("%T is not reference type", destinationValueReference)) } dst.ref = destinationValueReference + return dst } diff --git a/query/named_test.go b/query/named_test.go index 7b804a2c3..febbeb3f3 100644 --- a/query/named_test.go +++ b/query/named_test.go @@ -44,12 +44,14 @@ func TestNamed(t *testing.T) { name: "int_dbl_ptr", ref: func(v int) **int { vv := &v + return &vv }(123), dst: namedDestination{ name: "int_dbl_ptr", ref: func(v int) **int { vv := &v + return &vv }(123), }, diff --git a/query/parameters.go b/query/parameters.go deleted file mode 100644 index f0bced090..000000000 --- a/query/parameters.go +++ /dev/null @@ -1,135 +0,0 @@ -package query - -import ( - "sort" - - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xstring" -) - -// Parameters -type ( - queryParams map[string]Value - Parameter interface { - Name() string - Value() Value - } - parameter struct { - name string - value Value - } - Parameters struct { - m queryParams - } - parametersOption struct { - params *Parameters - } -) - -func (opt *parametersOption) applyTxExecuteOption(o *txExecuteSettings) { - o.params = opt.params -} - -func (opt *parametersOption) applyExecuteOption(o *executeSettings) { - o.params = opt.params -} - -func (p parameter) Name() string { - return p.name -} - -func (p parameter) Value() Value { - return p.value -} - -func (params *Parameters) ToYDB(a *allocator.Allocator) map[string]*Ydb.TypedValue { - if params == nil || params.m == nil { - return nil - } - return params.m.toYDB(a) -} - -func (qp queryParams) toYDB(a *allocator.Allocator) map[string]*Ydb.TypedValue { - params := make(map[string]*Ydb.TypedValue, len(qp)) - for k, v := range qp { - params[k] = value.ToYDB(v, a) - } - return params -} - -func (params *Parameters) Params() queryParams { - if params == nil { - return nil - } - return params.m -} - -func (params *Parameters) Count() int { - if params == nil { - return 0 - } - return len(params.m) -} - -func (params *Parameters) Each(it func(name string, v Value)) { - if params == nil { - return - } - for key, v := range params.m { - it(key, v) - } -} - -func (params *Parameters) names() []string { - if params == nil { - return nil - } - names := make([]string, 0, len(params.m)) - for k := range params.m { - names = append(names, k) - } - sort.Strings(names) - return names -} - -func (params *Parameters) String() string { - buffer := xstring.Buffer() - defer buffer.Free() - - buffer.WriteByte('{') - for i, name := range params.names() { - if i != 0 { - buffer.WriteByte(',') - } - buffer.WriteByte('"') - buffer.WriteString(name) - buffer.WriteString("\":") - buffer.WriteString(params.m[name].Yql()) - } - buffer.WriteByte('}') - return buffer.String() -} - -func (params *Parameters) Add(parameters ...Parameter) { - for _, param := range parameters { - params.m[param.Name()] = param.Value() - } -} - -func Param(name string, v Value) Parameter { - switch len(name) { - case 0: - panic("empty name") - default: - if name[0] != '$' { - panic("parameter name must be started from $") - } - } - return ¶meter{ - name: name, - value: v, - } -} diff --git a/query/transaction_control.go b/query/transaction_control.go index 653768725..d3ba2b591 100644 --- a/query/transaction_control.go +++ b/query/transaction_control.go @@ -37,6 +37,7 @@ func (ctrl *TransactionControl) ToYDB(a *allocator.Allocator) *Ydb_Query.Transac txControl := a.QueryTransactionControl() ctrl.selector.applyTxSelector(a, txControl) txControl.CommitTx = ctrl.commit + return txControl } @@ -66,28 +67,28 @@ func BeginTx(opts ...txSettingsOption) beginTxOptions { } var ( - _ txControlOption = txIdTxControlOption("") - _ txSelector = txIdTxControlOption("") + _ txControlOption = txIDTxControlOption("") + _ txSelector = txIDTxControlOption("") ) -type txIdTxControlOption string +type txIDTxControlOption string -func (id txIdTxControlOption) applyTxControlOption(txControl *TransactionControl) { +func (id txIDTxControlOption) applyTxControlOption(txControl *TransactionControl) { txControl.selector = id } -func (id txIdTxControlOption) applyTxSelector(a *allocator.Allocator, txControl *Ydb_Query.TransactionControl) { - selector := a.QueryTransactionControlTxId() +func (id txIDTxControlOption) applyTxSelector(a *allocator.Allocator, txControl *Ydb_Query.TransactionControl) { + selector := a.QueryTransactionControlTxID() selector.TxId = string(id) txControl.TxSelector = selector } -func WithTx(t TxIdentifier) txIdTxControlOption { - return txIdTxControlOption(t.ID()) +func WithTx(t TxIdentifier) txIDTxControlOption { + return txIDTxControlOption(t.ID()) } -func WithTxID(txID string) txIdTxControlOption { - return txIdTxControlOption(txID) +func WithTxID(txID string) txIDTxControlOption { + return txIDTxControlOption(txID) } type commitTxOption struct{} @@ -112,6 +113,7 @@ func TxControl(opts ...txControlOption) *TransactionControl { opt.applyTxControlOption(txControl) } } + return txControl } diff --git a/query/transaction_settings.go b/query/transaction_settings.go index 45673017b..163166bc5 100644 --- a/query/transaction_settings.go +++ b/query/transaction_settings.go @@ -27,7 +27,7 @@ var ( // Transaction settings options type ( txSettingsOption interface { - applyTxSettingsOption(*allocator.Allocator, *Ydb_Query.TransactionSettings) + applyTxSettingsOption(a *allocator.Allocator, txSettings *Ydb_Query.TransactionSettings) } TransactionSettings []txSettingsOption ) @@ -46,6 +46,7 @@ func (opts TransactionSettings) ToYDB(a *allocator.Allocator) *Ydb_Query.Transac for _, opt := range opts { opt.applyTxSettingsOption(a, txSettings) } + return txSettings } @@ -62,7 +63,9 @@ var _ txSettingsOption = serializableReadWriteTxSettingsOption{} type serializableReadWriteTxSettingsOption struct{} -func (o serializableReadWriteTxSettingsOption) applyTxSettingsOption(a *allocator.Allocator, txSettings *Ydb_Query.TransactionSettings) { +func (o serializableReadWriteTxSettingsOption) applyTxSettingsOption( + a *allocator.Allocator, txSettings *Ydb_Query.TransactionSettings, +) { txSettings.TxMode = serializableReadWrite } @@ -74,7 +77,9 @@ var _ txSettingsOption = snapshotReadOnlyTxSettingsOption{} type snapshotReadOnlyTxSettingsOption struct{} -func (snapshotReadOnlyTxSettingsOption) applyTxSettingsOption(a *allocator.Allocator, settings *Ydb_Query.TransactionSettings) { +func (snapshotReadOnlyTxSettingsOption) applyTxSettingsOption( + a *allocator.Allocator, settings *Ydb_Query.TransactionSettings, +) { settings.TxMode = snapshotReadOnly } @@ -86,7 +91,9 @@ var _ txSettingsOption = staleReadOnlySettingsOption{} type staleReadOnlySettingsOption struct{} -func (staleReadOnlySettingsOption) applyTxSettingsOption(a *allocator.Allocator, settings *Ydb_Query.TransactionSettings) { +func (staleReadOnlySettingsOption) applyTxSettingsOption( + a *allocator.Allocator, settings *Ydb_Query.TransactionSettings, +) { settings.TxMode = staleReadOnly } @@ -97,7 +104,7 @@ func WithStaleReadOnly() txSettingsOption { type ( txOnlineReadOnly bool TxOnlineReadOnlyOption interface { - applyTxOnlineReadOnlyOption(*txOnlineReadOnly) + applyTxOnlineReadOnlyOption(opt *txOnlineReadOnly) } ) @@ -117,7 +124,9 @@ var _ txSettingsOption = onlineReadOnlySettingsOption{} type onlineReadOnlySettingsOption []TxOnlineReadOnlyOption -func (opts onlineReadOnlySettingsOption) applyTxSettingsOption(a *allocator.Allocator, settings *Ydb_Query.TransactionSettings) { +func (opts onlineReadOnlySettingsOption) applyTxSettingsOption( + a *allocator.Allocator, settings *Ydb_Query.TransactionSettings, +) { var ro txOnlineReadOnly for _, opt := range opts { if opt != nil { diff --git a/query/type.go b/query/type.go deleted file mode 100644 index 092fc2ff0..000000000 --- a/query/type.go +++ /dev/null @@ -1,7 +0,0 @@ -package query - -import ( - "github.com/ydb-platform/ydb-go-sdk/v3/internal/types" -) - -type Type = types.Type diff --git a/query/value.go b/query/value.go deleted file mode 100644 index 10f8c2a19..000000000 --- a/query/value.go +++ /dev/null @@ -1,65 +0,0 @@ -package query - -import ( - "time" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" -) - -type Value = value.Value - -func BoolValue(v bool) Value { - return value.BoolValue(v) -} - -func Uint64Value(v uint64) Value { - return value.Uint64Value(v) -} - -func Int64Value(v int64) Value { - return value.Int64Value(v) -} - -func Uint32Value(v uint32) Value { - return value.Uint32Value(v) -} - -func Int32Value(v int32) Value { - return value.Int32Value(v) -} - -func Uint16Value(v uint16) Value { - return value.Uint16Value(v) -} - -func Int16Value(v int16) Value { - return value.Int16Value(v) -} - -func Uint8Value(v uint8) Value { - return value.Uint8Value(v) -} - -func Int8Value(v int8) Value { - return value.Int8Value(v) -} - -func TextValue(v string) Value { - return value.TextValue(v) -} - -func BytesValue(v []byte) Value { - return value.BytesValue(v) -} - -func IntervalValue(v time.Duration) Value { - return value.IntervalValueFromDuration(v) -} - -func TimestampValue(v time.Time) Value { - return value.TimestampValueFromTime(v) -} - -func DatetimeValue(v time.Time) Value { - return value.DatetimeValueFromTime(v) -}