Skip to content

Commit

Permalink
YQ Connector: YQ-2745: integration tests with Arrow data checks for C…
Browse files Browse the repository at this point in the history
…lickHouse
  • Loading branch information
vitalyisaev2 committed Jan 9, 2024
1 parent d30131f commit 0d39d6c
Show file tree
Hide file tree
Showing 22 changed files with 631 additions and 150 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ integration_test: integration_test_env
./fq-connector-go-tests -projectPath=$(PROJECT_PATH)

integration_test_env:
docker-compose -f ./tests/infra/datasource/docker-compose.yaml stop
docker-compose -f ./tests/infra/datasource/docker-compose.yaml rm -f -v
docker-compose -f ./tests/infra/datasource/docker-compose.yaml up -d

Expand Down
47 changes: 19 additions & 28 deletions app/client/app.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package client

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"

"github.com/apache/arrow/go/v13/arrow/ipc"
"github.com/apache/arrow/go/v13/arrow"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/prototext"
Expand Down Expand Up @@ -67,7 +66,11 @@ func callServer(logger *zap.Logger, cfg *config.TClientConfig) error {

switch cfg.DataSourceInstance.Kind {
case api_common.EDataSourceKind_CLICKHOUSE, api_common.EDataSourceKind_POSTGRESQL, api_common.EDataSourceKind_YDB:
splits, err = prepareSplits(cl, tableName, cfg.DataSourceInstance)
typeMappingSettings := &api_service_protos.TTypeMappingSettings{
DateTimeFormat: api_service_protos.EDateTimeFormat_YQL_FORMAT,

Check warning on line 70 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

splits, err = prepareSplits(cl, cfg.DataSourceInstance, typeMappingSettings, tableName)

Check warning on line 73 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L73

Added line #L73 was not covered by tests

if err != nil {
return fmt.Errorf("prepare splits: %w", err)
Expand All @@ -86,11 +89,12 @@ func callServer(logger *zap.Logger, cfg *config.TClientConfig) error {

func prepareSplits(
cl Client,
tableName string,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) ([]*api_service_protos.TSplit, error) {
// DescribeTable
describeTableResponse, err := cl.DescribeTable(context.TODO(), dsi, tableName)
describeTableResponse, err := cl.DescribeTable(context.TODO(), dsi, typeMappingSettings, tableName)

Check warning on line 97 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L97

Added line #L97 was not covered by tests
if err != nil {
return nil, fmt.Errorf("describe table: %w", err)
}
Expand Down Expand Up @@ -126,38 +130,25 @@ func readSplits(
return fmt.Errorf("read splits: %w", err)
}

if err := dumpReadResponses(logger, readSplitsResponses); err != nil {
return fmt.Errorf("dump read responses: %w", err)
records, err := common.ReadResponsesToArrowRecords(readSplitsResponses)
if err != nil {
return fmt.Errorf("read responses to arrow records: %w", err)

Check warning on line 135 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L133-L135

Added lines #L133 - L135 were not covered by tests
}

dumpReadResponses(logger, records)

Check warning on line 138 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L138

Added line #L138 was not covered by tests

return nil
}

func dumpReadResponses(
logger *zap.Logger,
responses []*api_service_protos.TReadSplitsResponse,
) error {
for _, resp := range responses {
buf := bytes.NewBuffer(resp.GetArrowIpcStreaming())

reader, err := ipc.NewReader(buf)
if err != nil {
return fmt.Errorf("new reader: %w", err)
}

for reader.Next() {
record := reader.Record()
logger.Debug("schema", zap.String("schema", record.Schema().String()))

for i, column := range record.Columns() {
logger.Debug("column", zap.Int("id", i), zap.String("data", column.String()))
}
records []arrow.Record,
) {
for _, record := range records {
for i, column := range record.Columns() {
logger.Debug("column", zap.Int("id", i), zap.String("data", column.String()))

Check warning on line 149 in app/client/app.go

View check run for this annotation

Codecov / codecov/patch

app/client/app.go#L146-L149

Added lines #L146 - L149 were not covered by tests
}

reader.Release()
}

return nil
}

var Cmd = &cobra.Command{
Expand Down
7 changes: 5 additions & 2 deletions app/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Client interface {
DescribeTable(
ctx context.Context,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) (*api_service_protos.TDescribeTableResponse, error)

Expand All @@ -49,11 +50,13 @@ type clientImpl struct {
func (c *clientImpl) DescribeTable(
ctx context.Context,
dsi *api_common.TDataSourceInstance,
typeMappingSettings *api_service_protos.TTypeMappingSettings,
tableName string,
) (*api_service_protos.TDescribeTableResponse, error) {
request := &api_service_protos.TDescribeTableRequest{
DataSourceInstance: dsi,
Table: tableName,
DataSourceInstance: dsi,
Table: tableName,
TypeMappingSettings: typeMappingSettings,
}

return c.client.DescribeTable(ctx, request)
Expand Down
30 changes: 30 additions & 0 deletions app/common/api_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package common

import (
"bytes"
"fmt"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/ipc"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
)

Expand Down Expand Up @@ -42,3 +48,27 @@ func SchemaToSelectWhatItems(

return out
}

func ReadResponsesToArrowRecords(responses []*api_service_protos.TReadSplitsResponse) ([]arrow.Record, error) {
var out []arrow.Record

for _, resp := range responses {
buf := bytes.NewBuffer(resp.GetArrowIpcStreaming())

reader, err := ipc.NewReader(buf)
if err != nil {
return nil, fmt.Errorf("new reader: %w", err)

Check warning on line 60 in app/common/api_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/api_helpers.go#L60

Added line #L60 was not covered by tests
}

for reader.Next() {
record := reader.Record()

record.Retain()
out = append(out, record)
}

reader.Release()
}

return out, nil
}
33 changes: 27 additions & 6 deletions app/server/utils/arrow_helpers.go → app/common/arrow_helpers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
package utils
package common

import (
"fmt"
"time"

"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/common"
)

type ValueType interface {
bool |
int8 | int16 | int32 | int64 |
uint8 | uint16 | uint32 | uint64 |
float32 | float64 |
string | []byte |
time.Time
}

type ArrowArrayType[VT ValueType] interface {
*array.Boolean |
*array.Int8 | *array.Int16 | *array.Int32 | *array.Int64 |
*array.Uint8 | *array.Uint16 | *array.Uint32 | *array.Uint64 |
*array.Float32 | *array.Float64 |
*array.String | *array.Binary

Len() int
Value(int) VT
IsNull(int) bool
}

type ArrowBuilder[VT ValueType] interface {
AppendNull()
Append(value VT)
Expand Down Expand Up @@ -49,7 +70,7 @@ func SelectWhatToArrowSchema(selectWhat *api_service_protos.TSelect_TWhat) (*arr
default:
err := fmt.Errorf(
"only primitive and optional types are supported, got '%T' instead: %w",
t, common.ErrDataTypeNotSupported,
t, ErrDataTypeNotSupported,

Check warning on line 73 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L73

Added line #L73 was not covered by tests
)

return nil, err
Expand Down Expand Up @@ -86,7 +107,7 @@ func YdbTypesToArrowBuilders(ydbTypes []*Ydb.Type, arrowAllocator memory.Allocat
default:
err := fmt.Errorf(
"only primitive and optional types are supported, got '%T' instead: %w",
t, common.ErrDataTypeNotSupported,
t, ErrDataTypeNotSupported,

Check warning on line 110 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L110

Added line #L110 was not covered by tests
)

return nil, err
Expand Down Expand Up @@ -140,7 +161,7 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
case Ydb.Type_TIMESTAMP:
builder = array.NewUint64Builder(arrowAllocator)
default:
return nil, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported)
return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)

Check warning on line 164 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L164

Added line #L164 was not covered by tests
}

return builder, nil
Expand Down Expand Up @@ -188,7 +209,7 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a
case Ydb.Type_TIMESTAMP:
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Uint64}
default:
return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, common.ErrDataTypeNotSupported)
return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)

Check warning on line 212 in app/common/arrow_helpers.go

View check run for this annotation

Codecov / codecov/patch

app/common/arrow_helpers.go#L212

Added line #L212 was not covered by tests
}

return field, nil
Expand Down
23 changes: 17 additions & 6 deletions app/server/utils/time.go → app/common/time.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package utils
package common

import (
"fmt"
"time"

"github.com/ydb-platform/fq-connector-go/app/common"
)

var (
Expand All @@ -15,7 +13,7 @@ var (

func TimeToYDBDate(t *time.Time) (uint16, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

days := t.Sub(minYDBTime).Hours() / 24
Expand All @@ -25,7 +23,7 @@ func TimeToYDBDate(t *time.Time) (uint16, error) {

func TimeToYDBDatetime(t *time.Time) (uint32, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

seconds := t.Unix()
Expand All @@ -35,10 +33,23 @@ func TimeToYDBDatetime(t *time.Time) (uint32, error) {

func TimeToYDBTimestamp(t *time.Time) (uint64, error) {
if t.Before(minYDBTime) || t.After(maxYDBTime) {
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, common.ErrValueOutOfTypeBounds)
return 0, fmt.Errorf("convert '%v' to YDB Date: %w", t, ErrValueOutOfTypeBounds)
}

seconds := t.UnixMicro()

return uint64(seconds), nil
}

type ydbTime interface {
uint16 | uint32 | uint64
}

func MustTimeToYDBType[OUT ydbTime](f func(t *time.Time) (OUT, error), t time.Time) OUT {
res, err := f(&t)
if err != nil {
panic(err)

Check warning on line 51 in app/common/time.go

View check run for this annotation

Codecov / codecov/patch

app/common/time.go#L51

Added line #L51 was not covered by tests
}

return res
}
22 changes: 10 additions & 12 deletions app/server/utils/time_test.go → app/common/time_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package utils
package common

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/ydb-platform/fq-connector-go/app/common"
)

func TestTimeToYDBDate(t *testing.T) {
Expand All @@ -31,12 +29,12 @@ func TestTimeToYDBDate(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -48,7 +46,7 @@ func TestTimeToYDBDate(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down Expand Up @@ -77,12 +75,12 @@ func TestTimeToYDBDatetime(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(9999, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -94,7 +92,7 @@ func TestTimeToYDBDatetime(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down Expand Up @@ -123,12 +121,12 @@ func TestTimeToYDBTimestamp(t *testing.T) {
{
input: time.Date(1969, 12, 31, 23, 59, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
{
input: time.Date(29427, 01, 01, 00, 00, 00, 00, time.UTC),
output: 0,
err: common.ErrValueOutOfTypeBounds,
err: ErrValueOutOfTypeBounds,
},
}

Expand All @@ -140,7 +138,7 @@ func TestTimeToYDBTimestamp(t *testing.T) {
require.Equal(t, tc.output, output)

if tc.err != nil {
require.True(t, errors.Is(tc.err, common.ErrValueOutOfTypeBounds))
require.True(t, errors.Is(tc.err, ErrValueOutOfTypeBounds))
} else {
require.NoError(t, err)
}
Expand Down
Loading

0 comments on commit 0d39d6c

Please sign in to comment.