Skip to content

Commit

Permalink
Merge pull request #13 from ydb-platform/YQ-2741.logging
Browse files Browse the repository at this point in the history
YQ-2371: use pure zap logger
  • Loading branch information
vitalyisaev2 authored Dec 25, 2023
2 parents 8342379 + 9700d25 commit 658501b
Show file tree
Hide file tree
Showing 71 changed files with 185 additions and 5,142 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
fq-connector-go
coverage.out
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
build:
go build -o fq-connector-go ./app

run: build
./fq-connector-go server -c ./example.conf

unit_test:
go test -v ./app/...

Expand Down
32 changes: 16 additions & 16 deletions app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -64,7 +64,7 @@ func runClient(_ *cobra.Command, args []string) error {
return nil
}

func makeConnection(logger log.Logger, cfg *config.ClientConfig) (*grpc.ClientConn, error) {
func makeConnection(logger *zap.Logger, cfg *config.ClientConfig) (*grpc.ClientConn, error) {
var opts []grpc.DialOption

if cfg.Tls != nil {
Expand Down Expand Up @@ -99,7 +99,7 @@ func makeConnection(logger log.Logger, cfg *config.ClientConfig) (*grpc.ClientCo
return conn, nil
}

func callServer(logger log.Logger, cfg *config.ClientConfig) error {
func callServer(logger *zap.Logger, cfg *config.ClientConfig) error {
conn, err := makeConnection(logger, cfg)
if err != nil {
return fmt.Errorf("grpc dial: %w", err)
Expand Down Expand Up @@ -140,31 +140,31 @@ func callServer(logger log.Logger, cfg *config.ClientConfig) error {
}

func describeTable(
logger log.Logger,
logger *zap.Logger,
connectorClient api_service.ConnectorClient,
dsi *api_common.TDataSourceInstance,
) (*api_service_protos.TSchema, error) {
req := &api_service_protos.TDescribeTableRequest{Table: tableName, DataSourceInstance: dsi}
logger.Debug("DescribeTable", log.String("request", req.String()))
logger.Debug("DescribeTable", zap.String("request", req.String()))

resp, err := connectorClient.DescribeTable(context.TODO(), req)
if err != nil {
return nil, fmt.Errorf("describe table: %w", err)
}

if utils.IsSuccess(resp.Error) {
logger.Debug("DescribeTable", log.String("response", resp.String()))
logger.Debug("DescribeTable", zap.String("response", resp.String()))

return resp.Schema, nil
}

logger.Error("DescribeTable", log.String("response", resp.String()))
logger.Error("DescribeTable", zap.String("response", resp.String()))

return nil, utils.NewSTDErrorFromAPIError(resp.Error)
}

func listSplits(
logger log.Logger,
logger *zap.Logger,
schema *api_service_protos.TSchema,
connectorClient api_service.ConnectorClient,
dsi *api_common.TDataSourceInstance,
Expand All @@ -186,7 +186,7 @@ func listSplits(
},
},
}
logger.Debug("ListSplits", log.String("request", req.String()))
logger.Debug("ListSplits", zap.String("request", req.String()))

streamListSplits, err := connectorClient.ListSplits(context.TODO(), req)
if err != nil {
Expand All @@ -206,12 +206,12 @@ func listSplits(
}

if !utils.IsSuccess(resp.Error) {
logger.Error("ListSplits", log.String("response", resp.String()))
logger.Error("ListSplits", zap.String("response", resp.String()))

return splits, utils.NewSTDErrorFromAPIError(resp.Error)
}

logger.Debug("ListSplits", log.String("response", resp.String()))
logger.Debug("ListSplits", zap.String("response", resp.String()))
splits = append(splits, resp.Splits...)
}

Expand All @@ -223,14 +223,14 @@ func listSplits(
}

func readSplits(
logger log.Logger,
logger *zap.Logger,
splits []*api_service_protos.TSplit,
format api_service_protos.TReadSplitsRequest_EFormat,
connectorClient api_service.ConnectorClient,
dsi *api_common.TDataSourceInstance,
) error {
req := &api_service_protos.TReadSplitsRequest{Splits: splits, Format: format, DataSourceInstance: dsi}
logger.Debug("ReadSplits", log.String("request", req.String()))
logger.Debug("ReadSplits", zap.String("request", req.String()))

streamReadSplits, err := connectorClient.ReadSplits(context.Background(), req)
if err != nil {
Expand Down Expand Up @@ -264,7 +264,7 @@ func readSplits(
}

func dumpReadResponses(
logger log.Logger,
logger *zap.Logger,
format api_service_protos.TReadSplitsRequest_EFormat,
responses []*api_service_protos.TReadSplitsResponse,
) error {
Expand All @@ -279,10 +279,10 @@ func dumpReadResponses(

for reader.Next() {
record := reader.Record()
logger.Debug("schema", log.String("schema", record.Schema().String()))
logger.Debug("schema", zap.String("schema", record.Schema().String()))

for i, column := range record.Columns() {
logger.Debug("column", log.Int("id", i), log.String("data", column.String()))
logger.Debug("column", zap.Int("id", i), zap.String("data", column.String()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions app/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package server

import (
"fmt"
"log"
"os"

"github.com/spf13/cobra"
Expand All @@ -25,6 +24,7 @@ func init() {
Cmd.Flags().StringP(configFlag, "c", "", "path to server config file")

if err := Cmd.MarkFlagRequired(configFlag); err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
}
12 changes: 6 additions & 6 deletions app/server/data_source_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/app/server/streaming"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
)

type DataSourceCollection struct {
Expand All @@ -26,7 +26,7 @@ type DataSourceCollection struct {
}

func (dsc *DataSourceCollection) DescribeTable(
ctx context.Context, logger log.Logger, request *api_service_protos.TDescribeTableRequest,
ctx context.Context, logger *zap.Logger, request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
kind := request.GetDataSourceInstance().GetKind()

Expand All @@ -48,7 +48,7 @@ func (dsc *DataSourceCollection) DescribeTable(
}

func (dsc *DataSourceCollection) DoReadSplit(
logger log.Logger,
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
Expand All @@ -71,7 +71,7 @@ func (dsc *DataSourceCollection) DoReadSplit(
}

func readSplit[T utils.Acceptor](
logger log.Logger,
logger *zap.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
Expand Down Expand Up @@ -122,8 +122,8 @@ func readSplit[T utils.Acceptor](

logger.Debug(
"split reading finished",
log.UInt64("total_bytes", readStats.GetBytes()),
log.UInt64("total_rows", readStats.GetRows()),
zap.Uint64("total_bytes", readStats.GetBytes()),
zap.Uint64("total_rows", readStats.GetRows()),
)

return nil
Expand Down
8 changes: 4 additions & 4 deletions app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
)

type DataSourceFactory[T utils.Acceptor] interface {
Make(
logger log.Logger,
logger *zap.Logger,
dataSourceType api_common.EDataSourceKind,
) (DataSource[T], error)
}
Expand All @@ -25,14 +25,14 @@ type DataSource[T utils.Acceptor] interface {
// located within a particular database in a data source cluster.
DescribeTable(
ctx context.Context,
logger log.Logger,
logger *zap.Logger,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error)

// ReadSplit is a main method for reading data from the table.
ReadSplit(
ctx context.Context,
logger log.Logger,
logger *zap.Logger,
split *api_service_protos.TSplit,
sink paging.Sink[T],
)
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
)

var _ DataSource[any] = (*DataSourceMock[any])(nil)
Expand All @@ -18,15 +18,15 @@ type DataSourceMock[T utils.Acceptor] struct {

func (m *DataSourceMock[T]) DescribeTable(
_ context.Context,
_ log.Logger,
_ *zap.Logger,
_ *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
panic("not implemented") // TODO: Implement
}

func (m *DataSourceMock[T]) ReadSplit(
_ context.Context,
_ log.Logger,
_ *zap.Logger,
split *api_service_protos.TSplit,
pagingWriter paging.Sink[T],
) {
Expand Down
10 changes: 5 additions & 5 deletions app/server/datasource/rdbms/clickhouse/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
api_common "github.com/ydb-platform/fq-connector-go/api/common"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"go.uber.org/zap"
)

var _ rdbms_utils.Connection = (*Connection)(nil)
Expand Down Expand Up @@ -56,7 +56,7 @@ func (c Connection) Query(ctx context.Context, query string, args ...any) (rdbms
if err := out.Err(); err != nil {
defer func() {
if closeErr := out.Close(); closeErr != nil {
c.logger.Error("close rows", log.Error(closeErr))
c.logger.Error("close rows", zap.Error(closeErr))
}
}()

Expand All @@ -75,7 +75,7 @@ type connectionManager struct {

func (c *connectionManager) Make(
_ context.Context,
logger log.Logger,
logger *zap.Logger,
dsi *api_common.TDataSourceInstance,
) (rdbms_utils.Connection, error) {
if dsi.GetCredentials().GetBasic() == nil {
Expand Down Expand Up @@ -103,7 +103,7 @@ func (c *connectionManager) Make(
// Set this field to true if you want to see ClickHouse driver's debug output
Debug: false,
Debugf: func(format string, v ...any) {
logger.Debugf(format, v...)
logger.Debug(format, zap.Any("args", v))
},
// TODO: make it configurable via Connector API
Compression: &clickhouse.Compression{
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *connectionManager) Make(
return &Connection{DB: conn, logger: queryLogger}, nil
}

func (c *connectionManager) Release(logger log.Logger, conn rdbms_utils.Connection) {
func (c *connectionManager) Release(logger *zap.Logger, conn rdbms_utils.Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
}

Expand Down
12 changes: 6 additions & 6 deletions app/server/datasource/rdbms/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
)

type Preset struct {
Expand All @@ -26,12 +26,12 @@ type dataSourceImpl struct {
sqlFormatter rdbms_utils.SQLFormatter
connectionManager rdbms_utils.ConnectionManager
schemaProvider rdbms_utils.SchemaProvider
logger log.Logger
logger *zap.Logger
}

func (ds *dataSourceImpl) DescribeTable(
ctx context.Context,
logger log.Logger,
logger *zap.Logger,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
conn, err := ds.connectionManager.Make(ctx, logger, request.DataSourceInstance)
Expand All @@ -51,7 +51,7 @@ func (ds *dataSourceImpl) DescribeTable(

func (ds *dataSourceImpl) doReadSplit(
ctx context.Context,
logger log.Logger,
logger *zap.Logger,
split *api_service_protos.TSplit,
sink paging.Sink[any],
) error {
Expand Down Expand Up @@ -105,7 +105,7 @@ func (ds *dataSourceImpl) doReadSplit(

func (ds *dataSourceImpl) ReadSplit(
ctx context.Context,
logger log.Logger,
logger *zap.Logger,
split *api_service_protos.TSplit,
sink paging.Sink[any],
) {
Expand All @@ -118,7 +118,7 @@ func (ds *dataSourceImpl) ReadSplit(
}

func NewDataSource(
logger log.Logger,
logger *zap.Logger,
preset *Preset,
) datasource.DataSource[any] {
return &dataSourceImpl{
Expand Down
4 changes: 2 additions & 2 deletions app/server/datasource/rdbms/data_source_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/ydb"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
"go.uber.org/zap"
)

var _ datasource.DataSourceFactory[any] = (*dataSourceFactory)(nil)
Expand All @@ -22,7 +22,7 @@ type dataSourceFactory struct {
}

func (dsf *dataSourceFactory) Make(
logger log.Logger,
logger *zap.Logger,
dataSourceType api_common.EDataSourceKind,
) (datasource.DataSource[any], error) {
switch dataSourceType {
Expand Down
Loading

0 comments on commit 658501b

Please sign in to comment.