From f9c0cbac1262cfe4328ad048c273cb349698f7d8 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 29 Dec 2023 14:08:53 +0300 Subject: [PATCH] split app/server/utils package --- app/client/client.go | 20 +- app/{server/utils => common}/endpoint.go | 2 +- app/{server/utils => common}/errors.go | 2 +- app/{server/utils => common}/logger.go | 20 +- app/server/cmd.go | 4 +- app/server/data_source_collection.go | 9 +- app/server/datasource/interface.go | 6 + app/server/datasource/mock.go | 4 - .../rdbms/clickhouse/connection_manager.go | 7 +- .../rdbms/clickhouse/sql_formatter_test.go | 290 +++++++++--------- .../rdbms/clickhouse/type_mapper.go | 20 +- app/server/datasource/rdbms/data_source.go | 7 +- .../datasource/rdbms/data_source_factory.go | 6 +- .../datasource/rdbms/data_source_test.go | 14 +- .../rdbms/postgresql/connection_manager.go | 5 +- .../rdbms/postgresql/sql_formatter_test.go | 290 +++++++++--------- .../rdbms/postgresql/type_mapper.go | 20 +- .../datasource/rdbms/schema_builder_test.go | 13 +- .../rdbms/utils/default_schema_provider.go | 9 +- .../rdbms/utils/predicate_builder.go | 16 +- .../datasource/rdbms/utils/schema_builder.go | 11 +- .../datasource/rdbms/utils/select_helpers.go | 4 +- app/server/datasource/rdbms/utils/sql.go | 3 +- .../rdbms}/utils/unit_test_helpers.go | 0 .../rdbms}/utils/unit_test_helpers_test.go | 0 .../rdbms/ydb/connection_manager.go | 7 +- .../datasource/rdbms/ydb/schema_provider.go | 6 +- .../datasource/rdbms/ydb/type_mapper.go | 10 +- app/server/datasource/s3/data_source.go | 11 +- app/server/paging/read_limiter.go | 4 +- app/server/paging/sink.go | 3 +- app/server/paging/size.go | 3 +- app/server/paging/traffic_tracker.go | 3 +- app/server/paging/traffic_tracker_test.go | 4 +- app/server/service_connector.go | 36 +-- app/server/service_metrics.go | 4 +- app/server/service_pprof.go | 4 +- app/server/streaming/streamer.go | 20 +- app/server/streaming/streamer_test.go | 16 +- app/server/utils/arrow_helpers.go | 9 +- app/server/utils/protobuf.go | 26 -- app/server/utils/time.go | 8 +- app/server/utils/time_test.go | 20 +- app/server/utils/type_mapper.go | 11 - ...{select_helpers.go => ydb_type_helpers.go} | 5 +- app/server/validate.go | 28 +- tests/infra/connector/client.go | 6 +- tests/infra/connector/server.go | 4 +- 48 files changed, 510 insertions(+), 520 deletions(-) rename app/{server/utils => common}/endpoint.go (93%) rename app/{server/utils => common}/errors.go (99%) rename app/{server/utils => common}/logger.go (84%) rename app/server/{ => datasource/rdbms}/utils/unit_test_helpers.go (100%) rename app/server/{ => datasource/rdbms}/utils/unit_test_helpers_test.go (100%) delete mode 100644 app/server/utils/protobuf.go delete mode 100644 app/server/utils/type_mapper.go rename app/server/utils/{select_helpers.go => ydb_type_helpers.go} (82%) diff --git a/app/client/client.go b/app/client/client.go index 4f9b6918..ba22af18 100644 --- a/app/client/client.go +++ b/app/client/client.go @@ -21,8 +21,8 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) const ( @@ -53,7 +53,7 @@ func runClient(_ *cobra.Command, args []string) error { return fmt.Errorf("unknown instance: %w", err) } - logger := utils.NewDefaultLogger() + logger := common.NewDefaultLogger() if err := callServer(logger, cfg); err != nil { return fmt.Errorf("call server: %w", err) @@ -89,7 +89,7 @@ func makeConnection(logger *zap.Logger, cfg *config.ClientConfig) (*grpc.ClientC opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - conn, err := grpc.Dial(utils.EndpointToString(cfg.Endpoint), opts...) + conn, err := grpc.Dial(common.EndpointToString(cfg.Endpoint), opts...) if err != nil { return nil, fmt.Errorf("grpc dial: %w", err) } @@ -103,7 +103,7 @@ func callServer(logger *zap.Logger, cfg *config.ClientConfig) error { return fmt.Errorf("grpc dial: %w", err) } - defer utils.LogCloserError(logger, conn, "connection close") + defer common.LogCloserError(logger, conn, "connection close") connectorClient := api_service.NewConnectorClient(conn) @@ -150,7 +150,7 @@ func describeTable( return nil, fmt.Errorf("describe table: %w", err) } - if utils.IsSuccess(resp.Error) { + if common.IsSuccess(resp.Error) { logger.Debug("DescribeTable", zap.String("response", resp.String())) return resp.Schema, nil @@ -158,7 +158,7 @@ func describeTable( logger.Error("DescribeTable", zap.String("response", resp.String())) - return nil, utils.NewSTDErrorFromAPIError(resp.Error) + return nil, common.NewSTDErrorFromAPIError(resp.Error) } func listSplits( @@ -203,10 +203,10 @@ func listSplits( return nil, fmt.Errorf("stream list splits: %w", err) } - if !utils.IsSuccess(resp.Error) { + if !common.IsSuccess(resp.Error) { logger.Error("ListSplits", zap.String("response", resp.String())) - return splits, utils.NewSTDErrorFromAPIError(resp.Error) + return splits, common.NewSTDErrorFromAPIError(resp.Error) } logger.Debug("ListSplits", zap.String("response", resp.String())) @@ -247,8 +247,8 @@ func readSplits( return fmt.Errorf("stream list splits: %w", err) } - if !utils.IsSuccess(resp.Error) { - return utils.NewSTDErrorFromAPIError(resp.Error) + if !common.IsSuccess(resp.Error) { + return common.NewSTDErrorFromAPIError(resp.Error) } responses = append(responses, resp) diff --git a/app/server/utils/endpoint.go b/app/common/endpoint.go similarity index 93% rename from app/server/utils/endpoint.go rename to app/common/endpoint.go index 08540725..fdebe153 100644 --- a/app/server/utils/endpoint.go +++ b/app/common/endpoint.go @@ -1,4 +1,4 @@ -package utils +package common import ( "fmt" diff --git a/app/server/utils/errors.go b/app/common/errors.go similarity index 99% rename from app/server/utils/errors.go rename to app/common/errors.go index be5a9af7..cdfb5e7f 100644 --- a/app/server/utils/errors.go +++ b/app/common/errors.go @@ -1,4 +1,4 @@ -package utils +package common import ( "errors" diff --git a/app/server/utils/logger.go b/app/common/logger.go similarity index 84% rename from app/server/utils/logger.go rename to app/common/logger.go index 26ad4f3b..19ceb7ef 100644 --- a/app/server/utils/logger.go +++ b/app/common/logger.go @@ -1,4 +1,4 @@ -package utils +package common import ( "fmt" @@ -87,24 +87,6 @@ func newDefaultLoggerConfig() zap.Config { func NewTestLogger(t *testing.T) *zap.Logger { return zaptest.NewLogger(t) } -func DumpReadSplitsResponse(logger *zap.Logger, resp *api_service_protos.TReadSplitsResponse) { - switch t := resp.GetPayload().(type) { - case *api_service_protos.TReadSplitsResponse_ArrowIpcStreaming: - if dump := resp.GetArrowIpcStreaming(); dump != nil { - logger.Debug("response", zap.Int("arrow_blob_length", len(dump))) - } - case *api_service_protos.TReadSplitsResponse_ColumnSet: - for i := range t.ColumnSet.Data { - data := t.ColumnSet.Data[i] - meta := t.ColumnSet.Meta[i] - - logger.Debug("response", zap.Int("column_id", i), zap.String("meta", meta.String()), zap.String("data", data.String())) - } - default: - panic(fmt.Sprintf("unexpected message type %v", t)) - } -} - func SelectToFields(slct *api_service_protos.TSelect) []zap.Field { result := []zap.Field{ zap.Any("from", slct.From), diff --git a/app/server/cmd.go b/app/server/cmd.go index 1ad70c4b..3a625470 100644 --- a/app/server/cmd.go +++ b/app/server/cmd.go @@ -6,7 +6,7 @@ import ( "github.com/spf13/cobra" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" ) var Cmd = &cobra.Command{ @@ -42,7 +42,7 @@ func runFromCLI(cmd *cobra.Command, _ []string) error { return fmt.Errorf("new config: %w", err) } - logger, err := utils.NewLoggerFromConfig(cfg.Logger) + logger, err := common.NewLoggerFromConfig(cfg.Logger) if err != nil { return fmt.Errorf("new logger from config: %w", err) } diff --git a/app/server/data_source_collection.go b/app/server/data_source_collection.go index 6dff5be6..579a77d8 100644 --- a/app/server/data_source_collection.go +++ b/app/server/data_source_collection.go @@ -10,6 +10,7 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms" @@ -44,7 +45,7 @@ func (dsc *DataSourceCollection) DescribeTable( return ds.DescribeTable(ctx, logger, request) default: - return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported) + return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported) } } @@ -67,7 +68,7 @@ func (dsc *DataSourceCollection) DoReadSplit( return readSplit[string](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg) default: - return fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported) + return fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported) } } @@ -81,7 +82,7 @@ func readSplit[T utils.Acceptor]( readLimiterFactory *paging.ReadLimiterFactory, cfg *config.TServerConfig, ) error { - logger.Debug("split reading started", utils.SelectToFields(split.Select)...) + logger.Debug("split reading started", common.SelectToFields(split.Select)...) columnarBufferFactory, err := paging.NewColumnarBufferFactory[T]( logger, @@ -130,7 +131,7 @@ func readSplit[T utils.Acceptor]( } func NewDataSourceCollection( - queryLoggerFactory utils.QueryLoggerFactory, + queryLoggerFactory common.QueryLoggerFactory, memoryAllocator memory.Allocator, readLimiterFactory *paging.ReadLimiterFactory, cfg *config.TServerConfig, diff --git a/app/server/datasource/interface.go b/app/server/datasource/interface.go index d712f823..bdcdde14 100644 --- a/app/server/datasource/interface.go +++ b/app/server/datasource/interface.go @@ -5,6 +5,8 @@ import ( "go.uber.org/zap" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" "github.com/ydb-platform/fq-connector-go/app/server/paging" @@ -38,3 +40,7 @@ type DataSource[T utils.Acceptor] interface { sink paging.Sink[T], ) } + +type TypeMapper interface { + SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error) +} diff --git a/app/server/datasource/mock.go b/app/server/datasource/mock.go index 0a0e2c93..cca0f8b8 100644 --- a/app/server/datasource/mock.go +++ b/app/server/datasource/mock.go @@ -34,7 +34,3 @@ func (m *DataSourceMock[T]) ReadSplit( ) { m.Called(split, pagingWriter) } - -func (*DataSourceMock[T]) TypeMapper() utils.TypeMapper { - panic("not implemented") // TODO: Implement -} diff --git a/app/server/datasource/rdbms/clickhouse/connection_manager.go b/app/server/datasource/rdbms/clickhouse/connection_manager.go index 1269312f..5db6151d 100644 --- a/app/server/datasource/rdbms/clickhouse/connection_manager.go +++ b/app/server/datasource/rdbms/clickhouse/connection_manager.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -20,7 +21,7 @@ var _ rdbms_utils.Connection = (*Connection)(nil) type Connection struct { *sql.DB - logger utils.QueryLogger + logger common.QueryLogger } type rows struct { @@ -95,7 +96,7 @@ func (c *connectionManager) Make( } opts := &clickhouse.Options{ - Addr: []string{utils.EndpointToString(dsi.GetEndpoint())}, + Addr: []string{common.EndpointToString(dsi.GetEndpoint())}, Auth: clickhouse.Auth{ Database: dsi.Database, Username: dsi.Credentials.GetBasic().Username, @@ -140,7 +141,7 @@ func (c *connectionManager) Make( } func (*connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) { - utils.LogCloserError(logger, conn, "close clickhouse connection") + common.LogCloserError(logger, conn, "close clickhouse connection") } func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager { diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go index 4909f1bc..61f86f42 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go @@ -7,43 +7,43 @@ import ( "github.com/stretchr/testify/require" ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - api "github.com/ydb-platform/fq-connector-go/api/service/protos" + api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) func TestMakeSQLFormatterQuery(t *testing.T) { type testCase struct { testName string - selectReq *api.TSelect + selectReq *api_service_protos.TSelect outputQuery string outputArgs []any err error } - logger := utils.NewTestLogger(t) + logger := common.NewTestLogger(t) formatter := NewSQLFormatter() tcs := []testCase{ { testName: "empty_table_name", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "", }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: "", outputArgs: nil, - err: utils.ErrEmptyTableName, + err: common.ErrEmptyTableName, }, { testName: "empty_no columns", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: `SELECT 0 FROM "tab"`, outputArgs: []any{}, @@ -51,17 +51,17 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "select_col", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: "col", - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, @@ -74,16 +74,16 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "is_null", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col1"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col1"), }, }, }, @@ -95,16 +95,16 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "is_not_null", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_IsNotNull{ - IsNotNull: &api.TPredicate_TIsNotNull{ - Value: utils.NewColumnExpression("col2"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_IsNotNull{ + IsNotNull: &api_service_protos.TPredicate_TIsNotNull{ + Value: rdbms_utils.NewColumnExpression("col2"), }, }, }, @@ -116,16 +116,16 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "bool_column", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_BoolExpression{ - BoolExpression: &api.TPredicate_TBoolExpression{ - Value: utils.NewColumnExpression("col2"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_BoolExpression{ + BoolExpression: &api_service_protos.TPredicate_TBoolExpression{ + Value: rdbms_utils.NewColumnExpression("col2"), }, }, }, @@ -137,25 +137,25 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "complex_filter", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Disjunction{ - Disjunction: &api.TPredicate_TDisjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Disjunction{ + Disjunction: &api_service_protos.TPredicate_TDisjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Negation{ - Negation: &api.TPredicate_TNegation{ - Operand: &api.TPredicate{ - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_LE, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewInt32ValueExpression(42), + Payload: &api_service_protos.TPredicate_Negation{ + Negation: &api_service_protos.TPredicate_TNegation{ + Operand: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_LE, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewInt32ValueExpression(42), }, }, }, @@ -163,22 +163,22 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, }, { - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_NE, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewUint64ValueExpression(0), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_NE, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewUint64ValueExpression(0), }, }, }, { - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col3"), + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col3"), }, }, }, @@ -198,18 +198,18 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "unsupported_predicate", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Between{ - Between: &api.TPredicate_TBetween{ - Value: utils.NewColumnExpression("col2"), - Least: utils.NewColumnExpression("col1"), - Greatest: utils.NewColumnExpression("col3"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Between{ + Between: &api_service_protos.TPredicate_TBetween{ + Value: rdbms_utils.NewColumnExpression("col2"), + Least: rdbms_utils.NewColumnExpression("col1"), + Greatest: rdbms_utils.NewColumnExpression("col3"), }, }, }, @@ -221,18 +221,18 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "unsupported_type", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, @@ -244,32 +244,32 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "partial_filter_removes_and", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewInt32ValueExpression(32), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewInt32ValueExpression(32), }, }, }, { // Not supported - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, @@ -285,46 +285,46 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "partial_filter", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewInt32ValueExpression(32), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewInt32ValueExpression(32), }, }, }, { // Not supported - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, { - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col3"), + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col3"), }, }, }, { - Payload: &api.TPredicate_IsNotNull{ - IsNotNull: &api.TPredicate_TIsNotNull{ - Value: utils.NewColumnExpression("col4"), + Payload: &api_service_protos.TPredicate_IsNotNull{ + IsNotNull: &api_service_protos.TPredicate_TIsNotNull{ + Value: rdbms_utils.NewColumnExpression("col4"), }, }, }, @@ -340,11 +340,11 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "negative_sql_injection_by_table", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: `information_schema.columns; DROP TABLE information_schema.columns`, }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: `SELECT 0 FROM "information_schema.columns; DROP TABLE information_schema.columns"`, outputArgs: []any{}, @@ -352,17 +352,17 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "negative_sql_injection_by_col", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: `0; DROP TABLE information_schema.columns`, - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, @@ -375,17 +375,17 @@ func TestMakeSQLFormatterQuery(t *testing.T) { }, { testName: "negative_sql_injection_fake_quotes", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: `0"; DROP TABLE information_schema.columns;`, - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, diff --git a/app/server/datasource/rdbms/clickhouse/type_mapper.go b/app/server/datasource/rdbms/clickhouse/type_mapper.go index 03c17ec4..3069d1b5 100644 --- a/app/server/datasource/rdbms/clickhouse/type_mapper.go +++ b/app/server/datasource/rdbms/clickhouse/type_mapper.go @@ -10,10 +10,12 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) -var _ utils.TypeMapper = typeMapper{} +var _ datasource.TypeMapper = typeMapper{} type typeMapper struct { isFixedString *regexp.Regexp @@ -95,7 +97,7 @@ func (tm typeMapper) SQLTypeToYDBColumn( ydbType, overflow, err = makeYdbDateTimeType(Ydb.Type_DATETIME, rules.GetDateTimeFormat()) nullable = overflow || nullable default: - err = fmt.Errorf("convert type '%s': %w", typeName, utils.ErrDataTypeNotSupported) + err = fmt.Errorf("convert type '%s': %w", typeName, common.ErrDataTypeNotSupported) } if err != nil { @@ -121,7 +123,7 @@ func makeYdbDateTimeType(ydbTypeID Ydb.Type_PrimitiveTypeId, format api_service_ case api_service_protos.EDateTimeFormat_STRING_FORMAT: return &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}, false, nil default: - return nil, false, fmt.Errorf("unexpected datetime format '%s': %w", format, utils.ErrDataTypeNotSupported) + return nil, false, fmt.Errorf("unexpected datetime format '%s': %w", format, common.ErrDataTypeNotSupported) } } @@ -191,7 +193,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (utils.Ro case Ydb.Type_DATE: appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter]) default: - return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } case typeName == "Date32": acceptors = append(acceptors, new(*time.Time)) @@ -207,7 +209,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (utils.Ro case Ydb.Type_DATE: appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint16, *array.Uint16Builder, utils.DateConverter]) default: - return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } case isDateTime64.MatchString(typeName): acceptors = append(acceptors, new(*time.Time)) @@ -225,7 +227,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (utils.Ro appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint64, *array.Uint64Builder, utils.TimestampConverter]) default: - return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } case isDateTime.MatchString(typeName): acceptors = append(acceptors, new(*time.Time)) @@ -241,7 +243,7 @@ func transformerFromSQLTypes(typeNames []string, ydbTypes []*Ydb.Type) (utils.Ro case Ydb.Type_DATETIME: appenders = append(appenders, appendValueToArrowBuilder[time.Time, uint32, *array.Uint32Builder, utils.DatetimeConverter]) default: - return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with sql type %s: %w", ydbTypes[i], typeName, common.ErrDataTypeNotSupported) } default: return nil, fmt.Errorf("unknown type '%v'", typeName) @@ -269,7 +271,7 @@ func appendValueToArrowBuilder[IN utils.ValueType, OUT utils.ValueType, AB utils out, err := converter.Convert(value) if err != nil { - if errors.Is(err, utils.ErrValueOutOfTypeBounds) { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: write warning to logger builder.AppendNull() @@ -335,7 +337,7 @@ func (dateTime64ToStringConverter) Convert(in time.Time) (string, error) { return utils.TimestampToStringConverter{}.Convert(saturateDateTime(in, minClickHouseDatetime64, maxClickHouseDatetime64)) } -func NewTypeMapper() utils.TypeMapper { +func NewTypeMapper() datasource.TypeMapper { return typeMapper{ isFixedString: regexp.MustCompile(`FixedString\([0-9]+\)`), isDateTime: regexp.MustCompile(`DateTime(\('[\w,/]+'\))?`), diff --git a/app/server/datasource/rdbms/data_source.go b/app/server/datasource/rdbms/data_source.go index 849f4776..ac529d2d 100644 --- a/app/server/datasource/rdbms/data_source.go +++ b/app/server/datasource/rdbms/data_source.go @@ -7,6 +7,7 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/datasource" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" @@ -16,14 +17,14 @@ import ( type Preset struct { SQLFormatter rdbms_utils.SQLFormatter ConnectionManager rdbms_utils.ConnectionManager - TypeMapper utils.TypeMapper + TypeMapper datasource.TypeMapper SchemaProvider rdbms_utils.SchemaProvider } var _ datasource.DataSource[any] = (*dataSourceImpl)(nil) type dataSourceImpl struct { - typeMapper utils.TypeMapper + typeMapper datasource.TypeMapper sqlFormatter rdbms_utils.SQLFormatter connectionManager rdbms_utils.ConnectionManager schemaProvider rdbms_utils.SchemaProvider @@ -73,7 +74,7 @@ func (ds *dataSourceImpl) doReadSplit( return fmt.Errorf("query '%s' error: %w", query, err) } - defer func() { utils.LogCloserError(logger, rows, "close rows") }() + defer func() { common.LogCloserError(logger, rows, "close rows") }() ydbTypes, err := utils.SelectWhatToYDBTypes(split.Select.What) if err != nil { diff --git a/app/server/datasource/rdbms/data_source_factory.go b/app/server/datasource/rdbms/data_source_factory.go index e3d96242..832fe4be 100644 --- a/app/server/datasource/rdbms/data_source_factory.go +++ b/app/server/datasource/rdbms/data_source_factory.go @@ -6,12 +6,12 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/ydb" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) var _ datasource.Factory[any] = (*dataSourceFactory)(nil) @@ -34,10 +34,10 @@ func (dsf *dataSourceFactory) Make( case api_common.EDataSourceKind_YDB: return NewDataSource(logger, &dsf.ydb), nil default: - return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, utils.ErrDataSourceNotSupported) + return nil, fmt.Errorf("pick handler for data source type '%v': %w", dataSourceType, common.ErrDataSourceNotSupported) } } -func NewDataSourceFactory(qlf utils.QueryLoggerFactory) datasource.Factory[any] { +func NewDataSourceFactory(qlf common.QueryLoggerFactory) datasource.Factory[any] { connManagerCfg := rdbms_utils.ConnectionManagerBase{ QueryLoggerFactory: qlf, } diff --git a/app/server/datasource/rdbms/data_source_test.go b/app/server/datasource/rdbms/data_source_test.go index 4a1f3077..f3637175 100644 --- a/app/server/datasource/rdbms/data_source_test.go +++ b/app/server/datasource/rdbms/data_source_test.go @@ -11,10 +11,11 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" + "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) func TestReadSplit(t *testing.T) { @@ -49,7 +50,7 @@ func TestReadSplit(t *testing.T) { } t.Run("positive", func(t *testing.T) { - logger := utils.NewTestLogger(t) + logger := common.NewTestLogger(t) connectionManager := &rdbms_utils.ConnectionManagerMock{} @@ -78,7 +79,7 @@ func TestReadSplit(t *testing.T) { } rows.On("MakeTransformer", - []*Ydb.Type{utils.NewPrimitiveType(Ydb.Type_INT32), utils.NewPrimitiveType(Ydb.Type_UTF8)}, + []*Ydb.Type{rdbms_utils.NewPrimitiveType(Ydb.Type_INT32), utils.NewPrimitiveType(Ydb.Type_UTF8)}, ).Return(transformer, nil).Once() rows.On("Next").Return(true).Times(2) rows.On("Next").Return(false).Once() @@ -97,8 +98,7 @@ func TestReadSplit(t *testing.T) { }) t.Run("scan error", func(t *testing.T) { - logger := utils.NewTestLogger(t) - + logger := common.NewTestLogger(t) connectionManager := &rdbms_utils.ConnectionManagerMock{} preset := &Preset{ @@ -129,8 +129,8 @@ func TestReadSplit(t *testing.T) { rows.On("MakeTransformer", []*Ydb.Type{ - utils.NewPrimitiveType(Ydb.Type_INT32), - utils.NewPrimitiveType(Ydb.Type_UTF8), + rdbms_utils.NewPrimitiveType(Ydb.Type_INT32), + rdbms_utils.NewPrimitiveType(Ydb.Type_UTF8), }, ).Return(transformer, nil).Once() rows.On("Next").Return(true).Times(2) diff --git a/app/server/datasource/rdbms/postgresql/connection_manager.go b/app/server/datasource/rdbms/postgresql/connection_manager.go index 2e32d5f4..f8608771 100644 --- a/app/server/datasource/rdbms/postgresql/connection_manager.go +++ b/app/server/datasource/rdbms/postgresql/connection_manager.go @@ -11,6 +11,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -40,7 +41,7 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (utils.RowTransformer[any], type Connection struct { *pgx.Conn - logger utils.QueryLogger + logger common.QueryLogger } func (c Connection) Close() error { @@ -118,7 +119,7 @@ func (c *connectionManager) Make( } func (*connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) { - utils.LogCloserError(logger, conn, "close posgresql connection") + common.LogCloserError(logger, conn, "close posgresql connection") } func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager { diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go index 0a55a495..977f0640 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go @@ -7,43 +7,43 @@ import ( "github.com/stretchr/testify/require" ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - api "github.com/ydb-platform/fq-connector-go/api/service/protos" + api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) func TestMakeReadSplitQuery(t *testing.T) { type testCase struct { testName string - selectReq *api.TSelect + selectReq *api_service_protos.TSelect outputQuery string outputArgs []any err error } - logger := utils.NewTestLogger(t) + logger := common.NewTestLogger(t) formatter := NewSQLFormatter() tcs := []testCase{ { testName: "empty_table_name", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "", }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: "", outputArgs: nil, - err: utils.ErrEmptyTableName, + err: common.ErrEmptyTableName, }, { testName: "empty_no columns", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: `SELECT 0 FROM "tab"`, outputArgs: []any{}, @@ -51,17 +51,17 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "select_col", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: "col", - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, @@ -74,16 +74,16 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "is_null", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col1"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col1"), }, }, }, @@ -95,16 +95,16 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "is_not_null", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_IsNotNull{ - IsNotNull: &api.TPredicate_TIsNotNull{ - Value: utils.NewColumnExpression("col2"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_IsNotNull{ + IsNotNull: &api_service_protos.TPredicate_TIsNotNull{ + Value: rdbms_utils.NewColumnExpression("col2"), }, }, }, @@ -116,16 +116,16 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "bool_column", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_BoolExpression{ - BoolExpression: &api.TPredicate_TBoolExpression{ - Value: utils.NewColumnExpression("col2"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_BoolExpression{ + BoolExpression: &api_service_protos.TPredicate_TBoolExpression{ + Value: rdbms_utils.NewColumnExpression("col2"), }, }, }, @@ -137,25 +137,25 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "complex_filter", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Disjunction{ - Disjunction: &api.TPredicate_TDisjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Disjunction{ + Disjunction: &api_service_protos.TPredicate_TDisjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Negation{ - Negation: &api.TPredicate_TNegation{ - Operand: &api.TPredicate{ - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_LE, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewInt32ValueExpression(42), + Payload: &api_service_protos.TPredicate_Negation{ + Negation: &api_service_protos.TPredicate_TNegation{ + Operand: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_LE, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewInt32ValueExpression(42), }, }, }, @@ -163,22 +163,22 @@ func TestMakeReadSplitQuery(t *testing.T) { }, }, { - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_NE, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewUint64ValueExpression(0), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_NE, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewUint64ValueExpression(0), }, }, }, { - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col3"), + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col3"), }, }, }, @@ -198,18 +198,18 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "unsupported_predicate", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Between{ - Between: &api.TPredicate_TBetween{ - Value: utils.NewColumnExpression("col2"), - Least: utils.NewColumnExpression("col1"), - Greatest: utils.NewColumnExpression("col3"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Between{ + Between: &api_service_protos.TPredicate_TBetween{ + Value: rdbms_utils.NewColumnExpression("col2"), + Least: rdbms_utils.NewColumnExpression("col1"), + Greatest: rdbms_utils.NewColumnExpression("col3"), }, }, }, @@ -221,18 +221,18 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "unsupported_type", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, @@ -244,32 +244,32 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "partial_filter_removes_and", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewInt32ValueExpression(32), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewInt32ValueExpression(32), }, }, }, { // Not supported - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, @@ -285,46 +285,46 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "partial_filter", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: utils.NewDefaultWhat(), - Where: &api.TSelect_TWhere{ - FilterTyped: &api.TPredicate{ - Payload: &api.TPredicate_Conjunction{ - Conjunction: &api.TPredicate_TConjunction{ - Operands: []*api.TPredicate{ + What: rdbms_utils.NewDefaultWhat(), + Where: &api_service_protos.TSelect_TWhere{ + FilterTyped: &api_service_protos.TPredicate{ + Payload: &api_service_protos.TPredicate_Conjunction{ + Conjunction: &api_service_protos.TPredicate_TConjunction{ + Operands: []*api_service_protos.TPredicate{ { - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col1"), - RightValue: utils.NewInt32ValueExpression(32), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col1"), + RightValue: rdbms_utils.NewInt32ValueExpression(32), }, }, }, { // Not supported - Payload: &api.TPredicate_Comparison{ - Comparison: &api.TPredicate_TComparison{ - Operation: api.TPredicate_TComparison_EQ, - LeftValue: utils.NewColumnExpression("col2"), - RightValue: utils.NewTextValueExpression("text"), + Payload: &api_service_protos.TPredicate_Comparison{ + Comparison: &api_service_protos.TPredicate_TComparison{ + Operation: api_service_protos.TPredicate_TComparison_EQ, + LeftValue: rdbms_utils.NewColumnExpression("col2"), + RightValue: rdbms_utils.NewTextValueExpression("text"), }, }, }, { - Payload: &api.TPredicate_IsNull{ - IsNull: &api.TPredicate_TIsNull{ - Value: utils.NewColumnExpression("col3"), + Payload: &api_service_protos.TPredicate_IsNull{ + IsNull: &api_service_protos.TPredicate_TIsNull{ + Value: rdbms_utils.NewColumnExpression("col3"), }, }, }, { - Payload: &api.TPredicate_IsNotNull{ - IsNotNull: &api.TPredicate_TIsNotNull{ - Value: utils.NewColumnExpression("col4"), + Payload: &api_service_protos.TPredicate_IsNotNull{ + IsNotNull: &api_service_protos.TPredicate_TIsNotNull{ + Value: rdbms_utils.NewColumnExpression("col4"), }, }, }, @@ -340,11 +340,11 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "negative_sql_injection_by_table", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: `information_schema.columns; DROP TABLE information_schema.columns`, }, - What: &api.TSelect_TWhat{}, + What: &api_service_protos.TSelect_TWhat{}, }, outputQuery: `SELECT 0 FROM "information_schema.columns; DROP TABLE information_schema.columns"`, outputArgs: []any{}, @@ -352,17 +352,17 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "negative_sql_injection_by_col", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: `0; DROP TABLE information_schema.columns`, - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, @@ -375,17 +375,17 @@ func TestMakeReadSplitQuery(t *testing.T) { }, { testName: "negative_sql_injection_fake_quotes", - selectReq: &api.TSelect{ - From: &api.TSelect_TFrom{ + selectReq: &api_service_protos.TSelect{ + From: &api_service_protos.TSelect_TFrom{ Table: "tab", }, - What: &api.TSelect_TWhat{ - Items: []*api.TSelect_TWhat_TItem{ + What: &api_service_protos.TSelect_TWhat{ + Items: []*api_service_protos.TSelect_TWhat_TItem{ { - Payload: &api.TSelect_TWhat_TItem_Column{ + Payload: &api_service_protos.TSelect_TWhat_TItem_Column{ Column: &ydb.Column{ Name: `0"; DROP TABLE information_schema.columns;`, - Type: utils.NewPrimitiveType(ydb.Type_INT32), + Type: rdbms_utils.NewPrimitiveType(ydb.Type_INT32), }, }, }, diff --git a/app/server/datasource/rdbms/postgresql/type_mapper.go b/app/server/datasource/rdbms/postgresql/type_mapper.go index 6a0482c7..2bbb816a 100644 --- a/app/server/datasource/rdbms/postgresql/type_mapper.go +++ b/app/server/datasource/rdbms/postgresql/type_mapper.go @@ -10,10 +10,12 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) -var _ utils.TypeMapper = typeMapper{} +var _ datasource.TypeMapper = typeMapper{} type typeMapper struct{} @@ -45,7 +47,7 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser case api_service_protos.EDateTimeFormat_YQL_FORMAT: ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_DATE}} default: - return nil, fmt.Errorf("unexpected date format '%s': %w", rules.GetDateTimeFormat(), utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected date format '%s': %w", rules.GetDateTimeFormat(), common.ErrDataTypeNotSupported) } // TODO: PostgreSQL `time` data type has no direct counterparts in the YDB's type system; // but it can be supported when the PG-compatible types is added to YDB: @@ -58,10 +60,10 @@ func (typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_ser case api_service_protos.EDateTimeFormat_YQL_FORMAT: ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_TIMESTAMP}} default: - return nil, fmt.Errorf("unexpected timestamp format '%s': %w", rules.GetDateTimeFormat(), utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected timestamp format '%s': %w", rules.GetDateTimeFormat(), common.ErrDataTypeNotSupported) } default: - return nil, fmt.Errorf("convert type '%s': %w", typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("convert type '%s': %w", typeName, common.ErrDataTypeNotSupported) } // In PostgreSQL all columns are actually nullable, hence we wrap every T in Optional. @@ -173,7 +175,7 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (utils.RowTransfor cast.Time, builder, cast.Valid) }) default: - return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported) } case pgtype.TimestampOID: acceptors = append(acceptors, new(pgtype.Timestamp)) @@ -202,10 +204,10 @@ func transformerFromOIDs(oids []uint32, ydbTypes []*Ydb.Type) (utils.RowTransfor cast.Time, builder, cast.Valid) }) default: - return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected ydb type %v with type oid %d: %w", ydbTypes[i], oid, common.ErrDataTypeNotSupported) } default: - return nil, fmt.Errorf("convert type OID %d: %w", oid, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("convert type OID %d: %w", oid, common.ErrDataTypeNotSupported) } } @@ -229,7 +231,7 @@ func appendValueToArrowBuilder[IN utils.ValueType, OUT utils.ValueType, AB utils out, err := converter.Convert(cast) if err != nil { - if errors.Is(err, utils.ErrValueOutOfTypeBounds) { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: logger ? builder.AppendNull() @@ -244,4 +246,4 @@ func appendValueToArrowBuilder[IN utils.ValueType, OUT utils.ValueType, AB utils return nil } -func NewTypeMapper() utils.TypeMapper { return typeMapper{} } +func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/datasource/rdbms/schema_builder_test.go b/app/server/datasource/rdbms/schema_builder_test.go index f783d5d0..721e0a46 100644 --- a/app/server/datasource/rdbms/schema_builder_test.go +++ b/app/server/datasource/rdbms/schema_builder_test.go @@ -9,10 +9,11 @@ import ( "google.golang.org/protobuf/proto" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) func TestSchemaBuilder(t *testing.T) { @@ -23,7 +24,7 @@ func TestSchemaBuilder(t *testing.T) { type testCase struct { name string - typeMapper utils.TypeMapper + typeMapper datasource.TypeMapper supportedTypesMatch []nameToType unsupportedTypes []nameToType } @@ -90,7 +91,7 @@ func TestSchemaBuilder(t *testing.T) { unsuppType.name)) // yet unsupported } - logger := utils.NewTestLogger(t) + logger := common.NewTestLogger(t) schema, err := sb.Build(logger) require.NoError(t, err) require.NotNil(t, schema) @@ -121,7 +122,7 @@ func TestSchemaBuilder(t *testing.T) { unsuppType.name)) // yet unsupported } - schema, err := sb.Build(utils.NewTestLogger(t)) + schema, err := sb.Build(common.NewTestLogger(t)) require.NoError(t, err) require.NotNil(t, schema) require.Len(t, schema.Columns, 0) @@ -130,8 +131,8 @@ func TestSchemaBuilder(t *testing.T) { t.Run("NonExistingTable", func(t *testing.T) { sb := &rdbms_utils.SchemaBuilder{} - schema, err := sb.Build(utils.NewTestLogger(t)) - require.ErrorIs(t, err, utils.ErrTableDoesNotExist) + schema, err := sb.Build(common.NewTestLogger(t)) + require.ErrorIs(t, err, common.ErrTableDoesNotExist) require.Nil(t, schema) }) } diff --git a/app/server/datasource/rdbms/utils/default_schema_provider.go b/app/server/datasource/rdbms/utils/default_schema_provider.go index aa3655f0..25a8b94f 100644 --- a/app/server/datasource/rdbms/utils/default_schema_provider.go +++ b/app/server/datasource/rdbms/utils/default_schema_provider.go @@ -7,11 +7,12 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" ) type DefaultSchemaProvider struct { - typeMapper utils.TypeMapper + typeMapper datasource.TypeMapper getArgsAndQuery func(request *api_service_protos.TDescribeTableRequest) (string, []any) } @@ -30,7 +31,7 @@ func (f *DefaultSchemaProvider) GetSchema( return nil, fmt.Errorf("query builder error: %w", err) } - defer func() { utils.LogCloserError(logger, rows, "close rows") }() + defer func() { common.LogCloserError(logger, rows, "close rows") }() var ( columnName string @@ -62,7 +63,7 @@ func (f *DefaultSchemaProvider) GetSchema( } func NewDefaultSchemaProvider( - typeMapper utils.TypeMapper, + typeMapper datasource.TypeMapper, getArgsAndQueryFunc func(request *api_service_protos.TDescribeTableRequest) (string, []any), ) SchemaProvider { return &DefaultSchemaProvider{ diff --git a/app/server/datasource/rdbms/utils/predicate_builder.go b/app/server/datasource/rdbms/utils/predicate_builder.go index 9d238b57..70f3715d 100644 --- a/app/server/datasource/rdbms/utils/predicate_builder.go +++ b/app/server/datasource/rdbms/utils/predicate_builder.go @@ -7,7 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" ) func formatValue(formatter SQLFormatter, args []any, value *Ydb.TypedValue) (string, []any, error) { @@ -27,7 +27,7 @@ func formatValue(formatter SQLFormatter, args []any, value *Ydb.TypedValue) (str case *Ydb.Value_DoubleValue: return formatter.GetPlaceholder(len(args)), append(args, v.DoubleValue), nil default: - return "", args, fmt.Errorf("%w, type: %T", utils.ErrUnimplementedTypedValue, v) + return "", args, fmt.Errorf("%w, type: %T", common.ErrUnimplementedTypedValue, v) } } @@ -60,7 +60,7 @@ func formatArithmeticalExpression( case api_service_protos.TExpression_TArithmeticalExpression_BIT_XOR: operation = " ^ " default: - return "", args, fmt.Errorf("%w, op: %d", utils.ErrUnimplementedArithmeticalExpression, op) + return "", args, fmt.Errorf("%w, op: %d", common.ErrUnimplementedArithmeticalExpression, op) } left, args, err := formatExpression(formatter, args, expression.LeftValue) @@ -78,7 +78,7 @@ func formatArithmeticalExpression( func formatExpression(formatter SQLFormatter, args []any, expression *api_service_protos.TExpression) (string, []any, error) { if !formatter.SupportsPushdownExpression(expression) { - return "", args, utils.ErrUnsupportedExpression + return "", args, common.ErrUnsupportedExpression } switch e := expression.Payload.(type) { @@ -91,7 +91,7 @@ func formatExpression(formatter SQLFormatter, args []any, expression *api_servic case *api_service_protos.TExpression_Null: return formatNull(formatter, args, e.Null) default: - return "", args, fmt.Errorf("%w, type: %T", utils.ErrUnimplementedExpression, e) + return "", args, fmt.Errorf("%w, type: %T", common.ErrUnimplementedExpression, e) } } @@ -112,7 +112,7 @@ func formatComparison(formatter SQLFormatter, args []any, comparison *api_servic case api_service_protos.TPredicate_TComparison_G: operation = " > " default: - return "", args, fmt.Errorf("%w, op: %d", utils.ErrUnimplementedOperation, op) + return "", args, fmt.Errorf("%w, op: %d", common.ErrUnimplementedOperation, op) } left, args, err := formatExpression(formatter, args, comparison.LeftValue) @@ -269,13 +269,13 @@ func formatPredicate(formatter SQLFormatter, args []any, predicate *api_service_ case *api_service_protos.TPredicate_BoolExpression: return formatExpression(formatter, args, p.BoolExpression.Value) default: - return "", args, fmt.Errorf("%w, type: %T", utils.ErrUnimplementedPredicateType, p) + return "", args, fmt.Errorf("%w, type: %T", common.ErrUnimplementedPredicateType, p) } } func formatWhereClause(formatter SQLFormatter, where *api_service_protos.TSelect_TWhere) (string, []any, error) { if where.FilterTyped == nil { - return "", nil, utils.ErrUnimplemented + return "", nil, common.ErrUnimplemented } args := make([]any, 0) diff --git a/app/server/datasource/rdbms/utils/schema_builder.go b/app/server/datasource/rdbms/utils/schema_builder.go index a1519e9b..0d8753ec 100644 --- a/app/server/datasource/rdbms/utils/schema_builder.go +++ b/app/server/datasource/rdbms/utils/schema_builder.go @@ -8,7 +8,8 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" ) type schemaItem struct { @@ -18,7 +19,7 @@ type schemaItem struct { } type SchemaBuilder struct { - typeMapper utils.TypeMapper + typeMapper datasource.TypeMapper typeMappingSettings *api_service_protos.TTypeMappingSettings items []*schemaItem } @@ -32,7 +33,7 @@ func (sb *SchemaBuilder) AddColumn(columnName, columnType string) error { var err error item.ydbColumn, err = sb.typeMapper.SQLTypeToYDBColumn(columnName, columnType, sb.typeMappingSettings) - if err != nil && !errors.Is(err, utils.ErrDataTypeNotSupported) { + if err != nil && !errors.Is(err, common.ErrDataTypeNotSupported) { return fmt.Errorf("sql type to ydb column (%s, %s): %w", columnName, columnType, err) } @@ -43,7 +44,7 @@ func (sb *SchemaBuilder) AddColumn(columnName, columnType string) error { func (sb *SchemaBuilder) Build(logger *zap.Logger) (*api_service_protos.TSchema, error) { if len(sb.items) == 0 { - return nil, utils.ErrTableDoesNotExist + return nil, common.ErrTableDoesNotExist } var ( @@ -70,7 +71,7 @@ func (sb *SchemaBuilder) Build(logger *zap.Logger) (*api_service_protos.TSchema, } func NewSchemaBuilder( - typeMapper utils.TypeMapper, + typeMapper datasource.TypeMapper, typeMappingSettings *api_service_protos.TTypeMappingSettings, ) *SchemaBuilder { return &SchemaBuilder{ diff --git a/app/server/datasource/rdbms/utils/select_helpers.go b/app/server/datasource/rdbms/utils/select_helpers.go index 753e997f..a22356f9 100644 --- a/app/server/datasource/rdbms/utils/select_helpers.go +++ b/app/server/datasource/rdbms/utils/select_helpers.go @@ -7,7 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" ) func selectWhatToYDBColumns(selectWhat *api_service_protos.TSelect_TWhat) ([]*Ydb.Column, error) { @@ -33,7 +33,7 @@ func formatSelectColumns( ) (string, error) { // SELECT $columns FROM $from if tableName == "" { - return "", utils.ErrEmptyTableName + return "", common.ErrEmptyTableName } var sb strings.Builder diff --git a/app/server/datasource/rdbms/utils/sql.go b/app/server/datasource/rdbms/utils/sql.go index fa5cb208..b35c8aba 100644 --- a/app/server/datasource/rdbms/utils/sql.go +++ b/app/server/datasource/rdbms/utils/sql.go @@ -8,6 +8,7 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -30,7 +31,7 @@ type ConnectionManager interface { } type ConnectionManagerBase struct { - QueryLoggerFactory utils.QueryLoggerFactory + QueryLoggerFactory common.QueryLoggerFactory } type SQLFormatter interface { diff --git a/app/server/utils/unit_test_helpers.go b/app/server/datasource/rdbms/utils/unit_test_helpers.go similarity index 100% rename from app/server/utils/unit_test_helpers.go rename to app/server/datasource/rdbms/utils/unit_test_helpers.go diff --git a/app/server/utils/unit_test_helpers_test.go b/app/server/datasource/rdbms/utils/unit_test_helpers_test.go similarity index 100% rename from app/server/utils/unit_test_helpers_test.go rename to app/server/datasource/rdbms/utils/unit_test_helpers_test.go diff --git a/app/server/datasource/rdbms/ydb/connection_manager.go b/app/server/datasource/rdbms/ydb/connection_manager.go index 0b90e615..52a2d9f6 100644 --- a/app/server/datasource/rdbms/ydb/connection_manager.go +++ b/app/server/datasource/rdbms/ydb/connection_manager.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" api_common "github.com/ydb-platform/fq-connector-go/api/common" + "github.com/ydb-platform/fq-connector-go/app/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -20,7 +21,7 @@ var _ rdbms_utils.Connection = (*Connection)(nil) type Connection struct { *sql.DB - logger utils.QueryLogger + logger common.QueryLogger } type rows struct { @@ -80,7 +81,7 @@ func (c *connectionManager) Make( dsi *api_common.TDataSourceInstance, ) (rdbms_utils.Connection, error) { // TODO: add credentials (iam and basic) support - endpoint := utils.EndpointToString(dsi.Endpoint) + endpoint := common.EndpointToString(dsi.Endpoint) dsn := sugar.DSN(endpoint, dsi.Database, dsi.UseTls) ydbDriver, err := ydb_sdk.Open(ctx, dsn) @@ -111,7 +112,7 @@ func (c *connectionManager) Make( } func (*connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) { - utils.LogCloserError(logger, conn, "close clickhouse connection") + common.LogCloserError(logger, conn, "close clickhouse connection") } func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager { diff --git a/app/server/datasource/rdbms/ydb/schema_provider.go b/app/server/datasource/rdbms/ydb/schema_provider.go index 30e40aee..b03b7f2f 100644 --- a/app/server/datasource/rdbms/ydb/schema_provider.go +++ b/app/server/datasource/rdbms/ydb/schema_provider.go @@ -11,12 +11,12 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) type schemaProvider struct { - typeMapper utils.TypeMapper + typeMapper datasource.TypeMapper } var _ rdbms_utils.SchemaProvider = (*schemaProvider)(nil) @@ -68,7 +68,7 @@ func (f *schemaProvider) GetSchema( } func NewSchemaProvider( - typeMapper utils.TypeMapper, + typeMapper datasource.TypeMapper, ) rdbms_utils.SchemaProvider { return &schemaProvider{ typeMapper: typeMapper, diff --git a/app/server/datasource/rdbms/ydb/type_mapper.go b/app/server/datasource/rdbms/ydb/type_mapper.go index 24f87b1b..e3139ffb 100644 --- a/app/server/datasource/rdbms/ydb/type_mapper.go +++ b/app/server/datasource/rdbms/ydb/type_mapper.go @@ -9,10 +9,12 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" + "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) -var _ utils.TypeMapper = typeMapper{} +var _ datasource.TypeMapper = typeMapper{} type typeMapper struct { } @@ -90,7 +92,7 @@ func makePrimitiveType(typeName string) (*Ydb.Type, error) { case typeName == Utf8Type: return &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}, nil default: - return nil, fmt.Errorf("convert type '%s': %w", typeName, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("convert type '%s': %w", typeName, common.ErrDataTypeNotSupported) } } @@ -116,7 +118,7 @@ func appendValueToArrowBuilder[IN utils.ValueType, OUT utils.ValueType, AB utils out, err := converter.Convert(value) if err != nil { - if errors.Is(err, utils.ErrValueOutOfTypeBounds) { + if errors.Is(err, common.ErrValueOutOfTypeBounds) { // TODO: write warning to logger builder.AppendNull() @@ -232,6 +234,6 @@ func makeOptionalTransformers(typeName string) (any, func(acceptor any, builder } } -func NewTypeMapper() utils.TypeMapper { +func NewTypeMapper() datasource.TypeMapper { return typeMapper{} } diff --git a/app/server/datasource/s3/data_source.go b/app/server/datasource/s3/data_source.go index 3c1552cc..5751332d 100644 --- a/app/server/datasource/s3/data_source.go +++ b/app/server/datasource/s3/data_source.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/datasource" "github.com/ydb-platform/fq-connector-go/app/server/paging" "github.com/ydb-platform/fq-connector-go/app/server/utils" @@ -30,7 +31,7 @@ func (*dataSource) DescribeTable( _ *zap.Logger, _ *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TDescribeTableResponse, error) { - return nil, fmt.Errorf("table description is not implemented for schemaless data sources: %w", utils.ErrMethodNotSupported) + return nil, fmt.Errorf("table description is not implemented for schemaless data sources: %w", common.ErrMethodNotSupported) } func (ds *dataSource) ReadSplit(ctx context.Context, logger *zap.Logger, split *api_service_protos.TSplit, sink paging.Sink[string]) { @@ -54,11 +55,11 @@ func (*dataSource) doReadSplit( ) if bucket = split.Select.DataSourceInstance.GetS3Options().GetBucket(); bucket == "" { - return fmt.Errorf("empty field `bucket`: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty field `bucket`: %w", common.ErrInvalidRequest) } if key = split.Select.From.GetObjectKey(); key == "" { - return fmt.Errorf("empty field `key`: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty field `key`: %w", common.ErrInvalidRequest) } params := &s3.GetObjectInput{ @@ -105,7 +106,7 @@ func makeAppender(ydbType *Ydb.Type) (func(acceptor string, builder array.Builde return nil } default: - return nil, fmt.Errorf("unexpected type %v: %w", typeID, utils.ErrDataTypeNotSupported) + return nil, fmt.Errorf("unexpected type %v: %w", typeID, common.ErrDataTypeNotSupported) } return appender, nil @@ -136,7 +137,7 @@ func prepareReading( if len(result) != len(selectWhat.Items) { return nil, nil, fmt.Errorf( "requested column with schema mismatch (wanted %d columns, found only %d): %w", - len(selectWhat.Items), len(result), utils.ErrInvalidRequest, + len(selectWhat.Items), len(result), common.ErrInvalidRequest, ) } diff --git a/app/server/paging/read_limiter.go b/app/server/paging/read_limiter.go index 07ff8967..a444a8d0 100644 --- a/app/server/paging/read_limiter.go +++ b/app/server/paging/read_limiter.go @@ -5,8 +5,8 @@ import ( "go.uber.org/zap" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) // ReadLimiter helps to limitate amount of data returned by Connector server in every read request. @@ -29,7 +29,7 @@ func (rl *readLimiterRows) addRow() error { if rl.rowsRead >= rl.rowsLimit { return fmt.Errorf("can read only %d line(s) from data source per request: %w", rl.rowsLimit, - utils.ErrReadLimitExceeded) + common.ErrReadLimitExceeded) } rl.rowsRead++ diff --git a/app/server/paging/sink.go b/app/server/paging/sink.go index 64f6d791..0d572df4 100644 --- a/app/server/paging/sink.go +++ b/app/server/paging/sink.go @@ -8,6 +8,7 @@ import ( "go.uber.org/zap" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -142,7 +143,7 @@ func (s *sinkImpl[T]) respondWith( func (s *sinkImpl[T]) unexpectedState(expected ...sinkState) error { return fmt.Errorf( "unexpected state '%v' (expected are '%v'): %w", - s.state, expected, utils.ErrInvariantViolation) + s.state, expected, common.ErrInvariantViolation) } func NewSink[T utils.Acceptor]( diff --git a/app/server/paging/size.go b/app/server/paging/size.go index 9c65c695..2a9079f9 100644 --- a/app/server/paging/size.go +++ b/app/server/paging/size.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -125,6 +126,6 @@ func sizeOfValue(v any) (uint64, acceptorKind, error) { case pgtype.Timestamp: return 16, fixedSize, nil default: - return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, utils.ErrDataTypeNotSupported) + return 0, 0, fmt.Errorf("value %v of unexpected data type %T: %w", t, t, common.ErrDataTypeNotSupported) } } diff --git a/app/server/paging/traffic_tracker.go b/app/server/paging/traffic_tracker.go index 5ad2c710..17e59b5e 100644 --- a/app/server/paging/traffic_tracker.go +++ b/app/server/paging/traffic_tracker.go @@ -4,6 +4,7 @@ import ( "fmt" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" "github.com/ydb-platform/fq-connector-go/app/server/utils" ) @@ -72,7 +73,7 @@ func (tt *TrafficTracker[T]) checkPageSizeLimit(bytesDelta, rowsDelta uint64) (b "single row size exceeds page size limit (%d > %d bytes): %w", bytesDelta, tt.pagination.BytesPerPage, - utils.ErrPageSizeExceeded) + common.ErrPageSizeExceeded) return true, err } diff --git a/app/server/paging/traffic_tracker_test.go b/app/server/paging/traffic_tracker_test.go index 86b4a613..f59f5b89 100644 --- a/app/server/paging/traffic_tracker_test.go +++ b/app/server/paging/traffic_tracker_test.go @@ -8,8 +8,8 @@ import ( "github.com/stretchr/testify/require" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) func TestTrafficTracker(t *testing.T) { @@ -135,7 +135,7 @@ func TestTrafficTracker(t *testing.T) { *col1Acceptor = 1 // 4 bytes > 1 byte ok, err := tt.tryAddRow(acceptors) - require.True(t, errors.Is(err, utils.ErrPageSizeExceeded)) + require.True(t, errors.Is(err, common.ErrPageSizeExceeded)) require.False(t, ok) }) } diff --git a/app/server/service_connector.go b/app/server/service_connector.go index 9f8ac247..9c8fb5e1 100644 --- a/app/server/service_connector.go +++ b/app/server/service_connector.go @@ -15,9 +15,9 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/library/go/core/metrics/solomon" ) @@ -38,14 +38,14 @@ func (s *serviceConnector) DescribeTable( ctx context.Context, request *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TDescribeTableResponse, error) { - logger := utils.AnnotateLoggerForUnaryCall(s.logger, "DescribeTable", request.DataSourceInstance) + logger := common.AnnotateLoggerForUnaryCall(s.logger, "DescribeTable", request.DataSourceInstance) logger.Info("request handling started", zap.String("table", request.GetTable())) if err := ValidateDescribeTableRequest(logger, request); err != nil { logger.Error("request handling failed", zap.Error(err)) return &api_service_protos.TDescribeTableResponse{ - Error: utils.NewAPIErrorFromStdError(err), + Error: common.NewAPIErrorFromStdError(err), }, nil } @@ -53,24 +53,24 @@ func (s *serviceConnector) DescribeTable( if err != nil { logger.Error("request handling failed", zap.Error(err)) - out = &api_service_protos.TDescribeTableResponse{Error: utils.NewAPIErrorFromStdError(err)} + out = &api_service_protos.TDescribeTableResponse{Error: common.NewAPIErrorFromStdError(err)} return out, nil } - out.Error = utils.NewSuccess() + out.Error = common.NewSuccess() logger.Info("request handling finished", zap.String("response", out.String())) return out, nil } func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsRequest, stream api_service.Connector_ListSplitsServer) error { - logger := utils.AnnotateLoggerWithMethod(s.logger, "ListSplits") + logger := common.AnnotateLoggerWithMethod(s.logger, "ListSplits") logger.Info("request handling started", zap.Int("total selects", len(request.Selects))) if err := ValidateListSplitsRequest(logger, request); err != nil { return s.doListSplitsResponse(logger, stream, - &api_service_protos.TListSplitsResponse{Error: utils.NewAPIErrorFromStdError(err)}) + &api_service_protos.TListSplitsResponse{Error: common.NewAPIErrorFromStdError(err)}) } // Make a trivial copy of requested selects @@ -95,17 +95,17 @@ func (s *serviceConnector) doListSplitsHandleSelect( slct *api_service_protos.TSelect, totalSplits *int, ) error { - logger = utils.AnnotateLoggerWithDataSourceInstance(logger, slct.DataSourceInstance) + logger = common.AnnotateLoggerWithDataSourceInstance(logger, slct.DataSourceInstance) args := []zap.Field{ zap.Int("split_id", *totalSplits), } - args = append(args, utils.SelectToFields(slct)...) + args = append(args, common.SelectToFields(slct)...) logger.Debug("responding selects", args...) resp := &api_service_protos.TListSplitsResponse{ - Error: utils.NewSuccess(), + Error: common.NewSuccess(), Splits: []*api_service_protos.TSplit{{Select: slct}}, } @@ -113,7 +113,7 @@ func (s *serviceConnector) doListSplitsHandleSelect( args := []zap.Field{ zap.Int("split_id", *totalSplits), } - args = append(args, utils.SelectToFields(split.Select)...) + args = append(args, common.SelectToFields(split.Select)...) logger.Debug("responding split", args...) @@ -132,8 +132,8 @@ func (*serviceConnector) doListSplitsResponse( stream api_service.Connector_ListSplitsServer, response *api_service_protos.TListSplitsResponse, ) error { - if !utils.IsSuccess(response.Error) { - logger.Error("request handling failed", utils.APIErrorToLogFields(response.Error)...) + if !common.IsSuccess(response.Error) { + logger.Error("request handling failed", common.APIErrorToLogFields(response.Error)...) } if err := stream.Send(response); err != nil { @@ -149,14 +149,14 @@ func (s *serviceConnector) ReadSplits( request *api_service_protos.TReadSplitsRequest, stream api_service.Connector_ReadSplitsServer, ) error { - logger := utils.AnnotateLoggerWithMethod(s.logger, "ReadSplits") + logger := common.AnnotateLoggerWithMethod(s.logger, "ReadSplits") logger.Info("request handling started", zap.Int("total_splits", len(request.Splits))) err := s.doReadSplits(logger, request, stream) if err != nil { logger.Error("request handling failed", zap.Error(err)) - response := &api_service_protos.TReadSplitsResponse{Error: utils.NewAPIErrorFromStdError(err)} + response := &api_service_protos.TReadSplitsResponse{Error: common.NewAPIErrorFromStdError(err)} if err := stream.Send(response); err != nil { return fmt.Errorf("stream send: %w", err) @@ -178,7 +178,7 @@ func (s *serviceConnector) doReadSplits( } for i, split := range request.Splits { - splitLogger := utils. + splitLogger := common. AnnotateLoggerWithDataSourceInstance(logger, split.Select.DataSourceInstance). With(zap.Int("split_id", i)) @@ -256,7 +256,7 @@ func newServiceConnector( cfg *config.TServerConfig, registry *solomon.Registry, ) (service, error) { - queryLoggerFactory := utils.NewQueryLoggerFactory(cfg.Logger) + queryLoggerFactory := common.NewQueryLoggerFactory(cfg.Logger) // TODO: drop deprecated fields after YQ-2057 var endpoint *api_common.TEndpoint @@ -272,7 +272,7 @@ func newServiceConnector( return nil, fmt.Errorf("invalid config: no endpoint") } - listener, err := net.Listen("tcp", utils.EndpointToString(endpoint)) + listener, err := net.Listen("tcp", common.EndpointToString(endpoint)) if err != nil { return nil, fmt.Errorf("net listen: %w", err) } diff --git a/app/server/service_metrics.go b/app/server/service_metrics.go index a4ed9168..ae6684bb 100644 --- a/app/server/service_metrics.go +++ b/app/server/service_metrics.go @@ -7,8 +7,8 @@ import ( "go.uber.org/zap" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" "github.com/ydb-platform/fq-connector-go/library/go/core/metrics/solomon" ) @@ -43,7 +43,7 @@ func newServiceMetrics(logger *zap.Logger, cfg *config.TMetricsServerConfig, reg mux.Handle("/metrics", NewHTTPPullerHandler(logger, registry, WithSpack())) httpServer := &http.Server{ - Addr: utils.EndpointToString(cfg.Endpoint), + Addr: common.EndpointToString(cfg.Endpoint), Handler: mux, } diff --git a/app/server/service_pprof.go b/app/server/service_pprof.go index e7b7beb7..9570ea8c 100644 --- a/app/server/service_pprof.go +++ b/app/server/service_pprof.go @@ -9,8 +9,8 @@ import ( "go.uber.org/zap" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) type servicePprof struct { @@ -49,7 +49,7 @@ func newServicePprof(logger *zap.Logger, cfg *config.TPprofServerConfig) service mux.HandleFunc("/debug/pprof/trace", pprof.Trace) httpServer := &http.Server{ - Addr: utils.EndpointToString(cfg.Endpoint), + Addr: common.EndpointToString(cfg.Endpoint), Handler: mux, } diff --git a/app/server/streaming/streamer.go b/app/server/streaming/streamer.go index 7aa802e7..0b573758 100644 --- a/app/server/streaming/streamer.go +++ b/app/server/streaming/streamer.go @@ -63,7 +63,7 @@ func (s *Streamer[T]) sendResultToStream(result *paging.ReadResult[T]) error { resp.Stats = result.Stats - utils.DumpReadSplitsResponse(s.logger, resp) + dumpReadSplitsResponse(s.logger, resp) if err := s.stream.Send(resp); err != nil { return fmt.Errorf("stream send: %w", err) @@ -72,6 +72,24 @@ func (s *Streamer[T]) sendResultToStream(result *paging.ReadResult[T]) error { return nil } +func dumpReadSplitsResponse(logger *zap.Logger, resp *api_service_protos.TReadSplitsResponse) { + switch t := resp.GetPayload().(type) { + case *api_service_protos.TReadSplitsResponse_ArrowIpcStreaming: + if dump := resp.GetArrowIpcStreaming(); dump != nil { + logger.Debug("response", zap.Int("arrow_blob_length", len(dump))) + } + case *api_service_protos.TReadSplitsResponse_ColumnSet: + for i := range t.ColumnSet.Data { + data := t.ColumnSet.Data[i] + meta := t.ColumnSet.Meta[i] + + logger.Debug("response", zap.Int("column_id", i), zap.String("meta", meta.String()), zap.String("data", data.String())) + } + default: + panic(fmt.Sprintf("unexpected message type %v", t)) + } +} + func (s *Streamer[T]) Run() error { wg := &sync.WaitGroup{} wg.Add(1) diff --git a/app/server/streaming/streamer_test.go b/app/server/streaming/streamer_test.go index 84a2ce5d..4c448f9a 100644 --- a/app/server/streaming/streamer_test.go +++ b/app/server/streaming/streamer_test.go @@ -18,12 +18,12 @@ import ( api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/paging" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) var _ api_service.Connector_ReadSplitsServer = (*streamMock)(nil) @@ -142,8 +142,8 @@ func (tc testCaseStreaming) messageParams() (sentMessages, rowsInLastMessage int } func (tc testCaseStreaming) execute(t *testing.T) { - logger := utils.NewTestLogger(t) - split := utils.MakeTestSplit() + logger := common.NewTestLogger(t) + split := rdbms_utils.MakeTestSplit() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -179,8 +179,8 @@ func (tc testCaseStreaming) execute(t *testing.T) { rows.On( "MakeTransformer", []*Ydb.Type{ - utils.NewPrimitiveType(Ydb.Type_INT32), - utils.NewPrimitiveType(Ydb.Type_STRING), + rdbms_utils.NewPrimitiveType(Ydb.Type_INT32), + rdbms_utils.NewPrimitiveType(Ydb.Type_STRING), }).Return(transformer, nil).Once() rows.On("Next").Return(true).Times(len(rows.PredefinedData)) rows.On("Next").Return(false).Once() @@ -190,8 +190,8 @@ func (tc testCaseStreaming) execute(t *testing.T) { } else { rows.On("MakeTransformer", []*Ydb.Type{ - utils.NewPrimitiveType(Ydb.Type_INT32), - utils.NewPrimitiveType(Ydb.Type_STRING), + rdbms_utils.NewPrimitiveType(Ydb.Type_INT32), + rdbms_utils.NewPrimitiveType(Ydb.Type_STRING), }).Return(transformer, nil).Once() rows.On("Next").Return(true).Times(len(rows.PredefinedData) + 1) rows.On("Scan", transformer.GetAcceptors()...).Return(nil).Times(len(rows.PredefinedData)) @@ -203,7 +203,7 @@ func (tc testCaseStreaming) execute(t *testing.T) { totalMessages, rowsInLastMessage := tc.messageParams() - expectedColumnarBlocks := utils.DataConverter{}.RowsToColumnBlocks(rows.PredefinedData, tc.rowsPerPage) + expectedColumnarBlocks := rdbms_utils.DataConverter{}.RowsToColumnBlocks(rows.PredefinedData, tc.rowsPerPage) if tc.sendErr == nil { for sendCallID := 0; sendCallID < totalMessages; sendCallID++ { diff --git a/app/server/utils/arrow_helpers.go b/app/server/utils/arrow_helpers.go index b76e8bbf..7dd74578 100644 --- a/app/server/utils/arrow_helpers.go +++ b/app/server/utils/arrow_helpers.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" ) type ArrowBuilder[VT ValueType] interface { @@ -46,7 +47,7 @@ func SelectWhatToArrowSchema(selectWhat *api_service_protos.TSelect_TWhat) (*arr return nil, fmt.Errorf("optional YDB type to arrow field: %w", err) } default: - return nil, fmt.Errorf("only primitive and optional types are supported, got '%T' instead: %w", t, ErrDataTypeNotSupported) + return nil, fmt.Errorf("only primitive and optional types are supported, got '%T' instead: %w", t, common.ErrDataTypeNotSupported) } fields = append(fields, field) @@ -78,7 +79,7 @@ func YdbTypesToArrowBuilders(ydbTypes []*Ydb.Type, arrowAllocator memory.Allocat return nil, fmt.Errorf("optional YDB type to Arrow builder: %w", err) } default: - return nil, fmt.Errorf("only primitive and optional types are supported, got '%T' instead: %w", t, ErrDataTypeNotSupported) + return nil, fmt.Errorf("only primitive and optional types are supported, got '%T' instead: %w", t, common.ErrDataTypeNotSupported) } builders = append(builders, builder) @@ -129,7 +130,7 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor case Ydb.Type_TIMESTAMP: builder = array.NewUint64Builder(arrowAllocator) default: - return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported) + return nil, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported) } return builder, nil @@ -177,7 +178,7 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a case Ydb.Type_TIMESTAMP: field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Uint64} default: - return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported) + return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported) } return field, nil diff --git a/app/server/utils/protobuf.go b/app/server/utils/protobuf.go deleted file mode 100644 index b6528a96..00000000 --- a/app/server/utils/protobuf.go +++ /dev/null @@ -1,26 +0,0 @@ -package utils - -import ( - "fmt" - "io" - - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" -) - -func DumpProtoMessageToJSON(msg proto.Message, stream io.Writer) error { - opts := protojson.MarshalOptions{ - Indent: " ", - } - - data, err := opts.Marshal(msg) - if err != nil { - return fmt.Errorf("protojson marshal: %w", err) - } - - if _, err := stream.Write(data); err != nil { - return fmt.Errorf("stream write: %w", err) - } - - return nil -} diff --git a/app/server/utils/time.go b/app/server/utils/time.go index d2131026..01702f65 100644 --- a/app/server/utils/time.go +++ b/app/server/utils/time.go @@ -3,6 +3,8 @@ package utils import ( "fmt" "time" + + "github.com/ydb-platform/fq-connector-go/app/common" ) var ( @@ -13,7 +15,7 @@ var ( func TimeToYDBDate(t *time.Time) (uint16, error) { if t.Before(minYDBTime) || t.After(maxYDBTime) { - return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds) + return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds) } days := t.Sub(minYDBTime).Hours() / 24 @@ -23,7 +25,7 @@ func TimeToYDBDate(t *time.Time) (uint16, error) { func TimeToYDBDatetime(t *time.Time) (uint32, error) { if t.Before(minYDBTime) || t.After(maxYDBTime) { - return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds) + return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds) } seconds := t.Unix() @@ -33,7 +35,7 @@ func TimeToYDBDatetime(t *time.Time) (uint32, error) { func TimeToYDBTimestamp(t *time.Time) (uint64, error) { if t.Before(minYDBTime) || t.After(maxYDBTime) { - return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds) + return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds) } seconds := t.UnixMicro() diff --git a/app/server/utils/time_test.go b/app/server/utils/time_test.go index aac6cb8c..4d655169 100644 --- a/app/server/utils/time_test.go +++ b/app/server/utils/time_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/fq-connector-go/app/common" ) func TestTimeToYDBDate(t *testing.T) { @@ -29,12 +31,12 @@ func TestTimeToYDBDate(t *testing.T) { { input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, { input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, } @@ -46,7 +48,7 @@ func TestTimeToYDBDate(t *testing.T) { require.Equal(t, tc.output, output) if tc.err != nil { - require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds)) } else { require.NoError(t, err) } @@ -75,12 +77,12 @@ func TestTimeToYDBDatetime(t *testing.T) { { input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, { input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, } @@ -92,7 +94,7 @@ func TestTimeToYDBDatetime(t *testing.T) { require.Equal(t, tc.output, output) if tc.err != nil { - require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds)) } else { require.NoError(t, err) } @@ -121,12 +123,12 @@ func TestTimeToYDBTimestamp(t *testing.T) { { input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, { input: time.Date(29427, 01, 01, 00, 00, 00, 00, time.UTC), output: 0, - err: ErrValueOutOfTypeBounds, + err: common.ErrValueOutOfTypeBounds, }, } @@ -138,7 +140,7 @@ func TestTimeToYDBTimestamp(t *testing.T) { require.Equal(t, tc.output, output) if tc.err != nil { - require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds)) + require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds)) } else { require.NoError(t, err) } diff --git a/app/server/utils/type_mapper.go b/app/server/utils/type_mapper.go deleted file mode 100644 index d36732e6..00000000 --- a/app/server/utils/type_mapper.go +++ /dev/null @@ -1,11 +0,0 @@ -package utils - -import ( - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - - api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" -) - -type TypeMapper interface { - SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error) -} diff --git a/app/server/utils/select_helpers.go b/app/server/utils/ydb_type_helpers.go similarity index 82% rename from app/server/utils/select_helpers.go rename to app/server/utils/ydb_type_helpers.go index e4a065ea..cd526d82 100644 --- a/app/server/utils/select_helpers.go +++ b/app/server/utils/ydb_type_helpers.go @@ -6,6 +6,7 @@ import ( "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" ) func SelectWhatToYDBTypes(selectWhat *api_service_protos.TSelect_TWhat) ([]*Ydb.Type, error) { @@ -33,9 +34,9 @@ func YdbTypeToYdbPrimitiveTypeID(ydbType *Ydb.Type) (Ydb.Type_PrimitiveTypeId, e return t.OptionalType.Item.GetTypeId(), nil default: return Ydb.Type_PRIMITIVE_TYPE_ID_UNSPECIFIED, - fmt.Errorf("unexpected type %v: %w", t.OptionalType.Item, ErrDataTypeNotSupported) + fmt.Errorf("unexpected type %v: %w", t.OptionalType.Item, common.ErrDataTypeNotSupported) } default: - return Ydb.Type_PRIMITIVE_TYPE_ID_UNSPECIFIED, fmt.Errorf("unexpected type %v: %w", t, ErrDataTypeNotSupported) + return Ydb.Type_PRIMITIVE_TYPE_ID_UNSPECIFIED, fmt.Errorf("unexpected type %v: %w", t, common.ErrDataTypeNotSupported) } } diff --git a/app/server/validate.go b/app/server/validate.go index a1e6e6af..f1873a8a 100644 --- a/app/server/validate.go +++ b/app/server/validate.go @@ -7,7 +7,7 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" - "github.com/ydb-platform/fq-connector-go/app/server/utils" + "github.com/ydb-platform/fq-connector-go/app/common" ) func ValidateDescribeTableRequest(logger *zap.Logger, request *api_service_protos.TDescribeTableRequest) error { @@ -16,7 +16,7 @@ func ValidateDescribeTableRequest(logger *zap.Logger, request *api_service_proto } if request.GetTable() == "" { - return fmt.Errorf("empty table: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty table: %w", common.ErrInvalidRequest) } return nil @@ -24,7 +24,7 @@ func ValidateDescribeTableRequest(logger *zap.Logger, request *api_service_proto func ValidateListSplitsRequest(logger *zap.Logger, request *api_service_protos.TListSplitsRequest) error { if len(request.Selects) == 0 { - return fmt.Errorf("empty select list: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty select list: %w", common.ErrInvalidRequest) } for i, slct := range request.Selects { @@ -38,7 +38,7 @@ func ValidateListSplitsRequest(logger *zap.Logger, request *api_service_protos.T func ValidateReadSplitsRequest(logger *zap.Logger, request *api_service_protos.TReadSplitsRequest) error { if len(request.Splits) == 0 { - return fmt.Errorf("splits are empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("splits are empty: %w", common.ErrInvalidRequest) } for i, split := range request.Splits { @@ -60,7 +60,7 @@ func validateSplit(logger *zap.Logger, split *api_service_protos.TSplit) error { func validateSelect(logger *zap.Logger, slct *api_service_protos.TSelect) error { if slct == nil { - return fmt.Errorf("select is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("select is empty: %w", common.ErrInvalidRequest) } if err := validateDataSourceInstance(logger, slct.GetDataSourceInstance()); err != nil { @@ -72,27 +72,27 @@ func validateSelect(logger *zap.Logger, slct *api_service_protos.TSelect) error func validateDataSourceInstance(logger *zap.Logger, dsi *api_common.TDataSourceInstance) error { if dsi == nil { - return fmt.Errorf("empty data source instance: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty data source instance: %w", common.ErrInvalidRequest) } if dsi.GetKind() == api_common.EDataSourceKind_DATA_SOURCE_KIND_UNSPECIFIED { - return fmt.Errorf("empty kind: %w", utils.ErrInvalidRequest) + return fmt.Errorf("empty kind: %w", common.ErrInvalidRequest) } if dsi.Endpoint == nil { - return fmt.Errorf("endpoint is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("endpoint is empty: %w", common.ErrInvalidRequest) } if dsi.Endpoint.Host == "" { - return fmt.Errorf("endpoint.host is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("endpoint.host is empty: %w", common.ErrInvalidRequest) } if dsi.Endpoint.Port == 0 { - return fmt.Errorf("endpoint.port is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("endpoint.port is empty: %w", common.ErrInvalidRequest) } if dsi.Database == "" { - return fmt.Errorf("database field is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("database field is empty: %w", common.ErrInvalidRequest) } if dsi.UseTls { @@ -102,18 +102,18 @@ func validateDataSourceInstance(logger *zap.Logger, dsi *api_common.TDataSourceI } if dsi.Protocol == api_common.EProtocol_PROTOCOL_UNSPECIFIED { - return fmt.Errorf("protocol field is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("protocol field is empty: %w", common.ErrInvalidRequest) } switch dsi.GetKind() { case api_common.EDataSourceKind_POSTGRESQL: if dsi.GetPgOptions().Schema == "" { - return fmt.Errorf("schema field is empty: %w", utils.ErrInvalidRequest) + return fmt.Errorf("schema field is empty: %w", common.ErrInvalidRequest) } case api_common.EDataSourceKind_CLICKHOUSE, api_common.EDataSourceKind_S3, api_common.EDataSourceKind_YDB: default: - return fmt.Errorf("unsupported data source: %w", utils.ErrInvalidRequest) + return fmt.Errorf("unsupported data source: %w", common.ErrInvalidRequest) } return nil diff --git a/tests/infra/connector/client.go b/tests/infra/connector/client.go index 353fd6a9..9e56090d 100644 --- a/tests/infra/connector/client.go +++ b/tests/infra/connector/client.go @@ -12,8 +12,8 @@ import ( api_common "github.com/ydb-platform/fq-connector-go/api/common" api_service "github.com/ydb-platform/fq-connector-go/api/service" api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/config" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) type Client interface { @@ -118,12 +118,12 @@ func dumpStream[T StreamResponse](rcvStream stream[T]) ([]T, error) { } func (c *clientImpl) stop() { - utils.LogCloserError(c.logger, c.conn, "client GRPC connection") + common.LogCloserError(c.logger, c.conn, "client GRPC connection") } func newClient(logger *zap.Logger, cfg *config.TServerConfig) (Client, error) { conn, err := grpc.Dial( - utils.EndpointToString(cfg.ConnectorServer.Endpoint), + common.EndpointToString(cfg.ConnectorServer.Endpoint), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { diff --git a/tests/infra/connector/server.go b/tests/infra/connector/server.go index 919d0aec..2a8636b8 100644 --- a/tests/infra/connector/server.go +++ b/tests/infra/connector/server.go @@ -9,8 +9,8 @@ import ( "go.uber.org/zap" + "github.com/ydb-platform/fq-connector-go/app/common" "github.com/ydb-platform/fq-connector-go/app/server" - "github.com/ydb-platform/fq-connector-go/app/server/utils" ) type Server struct { @@ -71,7 +71,7 @@ func (s *Server) Stop() { func NewServer() (*Server, error) { cfg := server.NewDefaultConfig() - logger := utils.NewDefaultLogger() + logger := common.NewDefaultLogger() launcher, err := server.NewLauncher(logger, cfg) if err != nil {