Skip to content

Commit

Permalink
split app/server/utils package
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Dec 29, 2023
1 parent 3dc52d5 commit f9c0cba
Show file tree
Hide file tree
Showing 48 changed files with 510 additions and 520 deletions.
20 changes: 10 additions & 10 deletions app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service "github.com/ydb-platform/fq-connector-go/api/service"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
)

const (
Expand Down Expand Up @@ -53,7 +53,7 @@ func runClient(_ *cobra.Command, args []string) error {
return fmt.Errorf("unknown instance: %w", err)
}

logger := utils.NewDefaultLogger()
logger := common.NewDefaultLogger()

if err := callServer(logger, cfg); err != nil {
return fmt.Errorf("call server: %w", err)
Expand Down Expand Up @@ -89,7 +89,7 @@ func makeConnection(logger *zap.Logger, cfg *config.ClientConfig) (*grpc.ClientC
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.Dial(utils.EndpointToString(cfg.Endpoint), opts...)
conn, err := grpc.Dial(common.EndpointToString(cfg.Endpoint), opts...)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
}
Expand All @@ -103,7 +103,7 @@ func callServer(logger *zap.Logger, cfg *config.ClientConfig) error {
return fmt.Errorf("grpc dial: %w", err)
}

defer utils.LogCloserError(logger, conn, "connection close")
defer common.LogCloserError(logger, conn, "connection close")

connectorClient := api_service.NewConnectorClient(conn)

Expand Down Expand Up @@ -150,15 +150,15 @@ func describeTable(
return nil, fmt.Errorf("describe table: %w", err)
}

if utils.IsSuccess(resp.Error) {
if common.IsSuccess(resp.Error) {
logger.Debug("DescribeTable", zap.String("response", resp.String()))

return resp.Schema, nil
}

logger.Error("DescribeTable", zap.String("response", resp.String()))

return nil, utils.NewSTDErrorFromAPIError(resp.Error)
return nil, common.NewSTDErrorFromAPIError(resp.Error)
}

func listSplits(
Expand Down Expand Up @@ -203,10 +203,10 @@ func listSplits(
return nil, fmt.Errorf("stream list splits: %w", err)
}

if !utils.IsSuccess(resp.Error) {
if !common.IsSuccess(resp.Error) {
logger.Error("ListSplits", zap.String("response", resp.String()))

return splits, utils.NewSTDErrorFromAPIError(resp.Error)
return splits, common.NewSTDErrorFromAPIError(resp.Error)
}

logger.Debug("ListSplits", zap.String("response", resp.String()))
Expand Down Expand Up @@ -247,8 +247,8 @@ func readSplits(
return fmt.Errorf("stream list splits: %w", err)
}

if !utils.IsSuccess(resp.Error) {
return utils.NewSTDErrorFromAPIError(resp.Error)
if !common.IsSuccess(resp.Error) {
return common.NewSTDErrorFromAPIError(resp.Error)
}

responses = append(responses, resp)
Expand Down
2 changes: 1 addition & 1 deletion app/server/utils/endpoint.go → app/common/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion app/server/utils/errors.go → app/common/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"errors"
Expand Down
20 changes: 1 addition & 19 deletions app/server/utils/logger.go → app/common/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down Expand Up @@ -87,24 +87,6 @@ func newDefaultLoggerConfig() zap.Config {

func NewTestLogger(t *testing.T) *zap.Logger { return zaptest.NewLogger(t) }

func DumpReadSplitsResponse(logger *zap.Logger, resp *api_service_protos.TReadSplitsResponse) {
switch t := resp.GetPayload().(type) {
case *api_service_protos.TReadSplitsResponse_ArrowIpcStreaming:
if dump := resp.GetArrowIpcStreaming(); dump != nil {
logger.Debug("response", zap.Int("arrow_blob_length", len(dump)))
}
case *api_service_protos.TReadSplitsResponse_ColumnSet:
for i := range t.ColumnSet.Data {
data := t.ColumnSet.Data[i]
meta := t.ColumnSet.Meta[i]

logger.Debug("response", zap.Int("column_id", i), zap.String("meta", meta.String()), zap.String("data", data.String()))
}
default:
panic(fmt.Sprintf("unexpected message type %v", t))
}
}

func SelectToFields(slct *api_service_protos.TSelect) []zap.Field {
result := []zap.Field{
zap.Any("from", slct.From),
Expand Down
4 changes: 2 additions & 2 deletions app/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/spf13/cobra"

"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/app/common"
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -42,7 +42,7 @@ func runFromCLI(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("new config: %w", err)
}

logger, err := utils.NewLoggerFromConfig(cfg.Logger)
logger, err := common.NewLoggerFromConfig(cfg.Logger)

Check warning on line 45 in app/server/cmd.go

View check run for this annotation

Codecov / codecov/patch

app/server/cmd.go#L45

Added line #L45 was not covered by tests
if err != nil {
return fmt.Errorf("new logger from config: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service "github.com/ydb-platform/fq-connector-go/api/service"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/datasource"
"github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms"
Expand Down Expand Up @@ -44,7 +45,7 @@ func (dsc *DataSourceCollection) DescribeTable(

return ds.DescribeTable(ctx, logger, request)
default:
return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported)
return nil, fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)

Check warning on line 48 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L48

Added line #L48 was not covered by tests
}
}

Expand All @@ -67,7 +68,7 @@ func (dsc *DataSourceCollection) DoReadSplit(

return readSplit[string](logger, stream, request.GetFormat(), split, ds, dsc.memoryAllocator, dsc.readLimiterFactory, dsc.cfg)

Check warning on line 69 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L69

Added line #L69 was not covered by tests
default:
return fmt.Errorf("unsupported data source type '%v': %w", kind, utils.ErrDataSourceNotSupported)
return fmt.Errorf("unsupported data source type '%v': %w", kind, common.ErrDataSourceNotSupported)

Check warning on line 71 in app/server/data_source_collection.go

View check run for this annotation

Codecov / codecov/patch

app/server/data_source_collection.go#L71

Added line #L71 was not covered by tests
}
}

Expand All @@ -81,7 +82,7 @@ func readSplit[T utils.Acceptor](
readLimiterFactory *paging.ReadLimiterFactory,
cfg *config.TServerConfig,
) error {
logger.Debug("split reading started", utils.SelectToFields(split.Select)...)
logger.Debug("split reading started", common.SelectToFields(split.Select)...)

columnarBufferFactory, err := paging.NewColumnarBufferFactory[T](
logger,
Expand Down Expand Up @@ -130,7 +131,7 @@ func readSplit[T utils.Acceptor](
}

func NewDataSourceCollection(
queryLoggerFactory utils.QueryLoggerFactory,
queryLoggerFactory common.QueryLoggerFactory,
memoryAllocator memory.Allocator,
readLimiterFactory *paging.ReadLimiterFactory,
cfg *config.TServerConfig,
Expand Down
6 changes: 6 additions & 0 deletions app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"go.uber.org/zap"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
Expand Down Expand Up @@ -38,3 +40,7 @@ type DataSource[T utils.Acceptor] interface {
sink paging.Sink[T],
)
}

type TypeMapper interface {
SQLTypeToYDBColumn(columnName, typeName string, rules *api_service_protos.TTypeMappingSettings) (*Ydb.Column, error)
}
4 changes: 0 additions & 4 deletions app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,3 @@ func (m *DataSourceMock[T]) ReadSplit(
) {
m.Called(split, pagingWriter)
}

func (*DataSourceMock[T]) TypeMapper() utils.TypeMapper {
panic("not implemented") // TODO: Implement
}
7 changes: 4 additions & 3 deletions app/server/datasource/rdbms/clickhouse/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
"github.com/ydb-platform/fq-connector-go/app/common"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
)
Expand All @@ -20,7 +21,7 @@ var _ rdbms_utils.Connection = (*Connection)(nil)

type Connection struct {
*sql.DB
logger utils.QueryLogger
logger common.QueryLogger
}

type rows struct {
Expand Down Expand Up @@ -95,7 +96,7 @@ func (c *connectionManager) Make(
}

opts := &clickhouse.Options{
Addr: []string{utils.EndpointToString(dsi.GetEndpoint())},
Addr: []string{common.EndpointToString(dsi.GetEndpoint())},
Auth: clickhouse.Auth{
Database: dsi.Database,
Username: dsi.Credentials.GetBasic().Username,
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *connectionManager) Make(
}

func (*connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
common.LogCloserError(logger, conn, "close clickhouse connection")
}

func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager {
Expand Down
Loading

0 comments on commit f9c0cba

Please sign in to comment.