Skip to content

Commit

Permalink
YDB: support JsonDocument (#209)
Browse files Browse the repository at this point in the history
* Filter test suites

* Suite filtering

* YDB: Test JsonDocument
  • Loading branch information
vitalyisaev2 authored Nov 8, 2024
1 parent 200e297 commit 88c9a7f
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 57 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROJECT_PATH = $(shell pwd)
projectPath = $(shell pwd)

build:
go build -o fq-connector-go ./app
Expand All @@ -13,7 +13,7 @@ unit_test:
go test ./app/... ./common/... ./tests/utils/...

integration_test: integration_test_build
./fq-connector-go-tests -projectPath=$(PROJECT_PATH) -test.failfast
./fq-connector-go-tests -projectPath="$(projectPath)" -suiteName="$(suiteName)" -test.failfast

integration_test_build:
go test -c -o fq-connector-go-tests ./tests
Expand All @@ -29,7 +29,7 @@ test_coverage: integration_test_env_run
go test -coverpkg=./... -coverprofile=coverage_unit_tests.out -covermode=atomic ./app/... ./common/... ./tests/utils/...
sleep 10
go test -c -o fq-connector-go-tests -coverpkg=./... -covermode=atomic ./tests
./fq-connector-go-tests -projectPath=$(PROJECT_PATH) -test.coverprofile=coverage_integration_tests.out
./fq-connector-go-tests -projectPath="$(projectPath)" -test.coverprofile=coverage_integration_tests.out
cat coverage_unit_tests.out | grep -v 'pb.go\|mock.go\|library' > coverage.out
cat coverage_integration_tests.out | grep -v 'atomic\|pb.go\|mock.go\|library' >> coverage.out
go tool cover -func=sudo apt install cloc
Expand Down
5 changes: 2 additions & 3 deletions app/server/datasource/rdbms/ydb/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type sqlFormatter struct {
mode config.TYdbConfig_Mode
}

func (sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
func (sqlFormatter) supportsTypeForPushdown(typeID Ydb.Type_PrimitiveTypeId) bool {
switch typeID {
case Ydb.Type_BOOL:
return true
Expand Down Expand Up @@ -52,11 +52,10 @@ func (sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
}
}

// TODO: pushdown support
func (f *sqlFormatter) supportsConstantValueExpression(t *Ydb.Type) bool {
switch v := t.Type.(type) {
case *Ydb.Type_TypeId:
return f.supportsType(v.TypeId)
return f.supportsTypeForPushdown(v.TypeId)
case *Ydb.Type_OptionalType:
return f.supportsConstantValueExpression(v.OptionalType.Item)
case *Ydb.Type_NullType:
Expand Down
39 changes: 21 additions & 18 deletions app/server/datasource/rdbms/ydb/type_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ type typeMapper struct {
var isOptional = regexp.MustCompile(`Optional<(\w+)>$`)

const (
typeBool = "Bool"
typeInt8 = "Int8"
typeUint8 = "Uint8"
typeInt16 = "Int16"
typeUint16 = "Uint16"
typeInt32 = "Int32"
typeUint32 = "Uint32"
typeInt64 = "Int64"
typeUint64 = "Uint64"
typeFloat = "Float"
typeDouble = "Double"
typeString = "String"
typeUtf8 = "Utf8"
typeJSON = "Json"
typeDate = "Date"
typeDatetime = "Datetime"
typeTimestamp = "Timestamp"
typeBool = "Bool"
typeInt8 = "Int8"
typeUint8 = "Uint8"
typeInt16 = "Int16"
typeUint16 = "Uint16"
typeInt32 = "Int32"
typeUint32 = "Uint32"
typeInt64 = "Int64"
typeUint64 = "Uint64"
typeFloat = "Float"
typeDouble = "Double"
typeString = "String"
typeUtf8 = "Utf8"
typeJSON = "Json"
typeDate = "Date"
typeDatetime = "Datetime"
typeTimestamp = "Timestamp"
typeJSONDocument = "JsonDocument"
)

func primitiveYqlTypeName(typeId Ydb.Type_PrimitiveTypeId) (string, error) {
Expand Down Expand Up @@ -141,6 +142,8 @@ func makePrimitiveTypeFromString(typeName string) (*Ydb.Type, error) {
return common.MakePrimitiveType(Ydb.Type_DATETIME), nil
case typeTimestamp:
return common.MakePrimitiveType(Ydb.Type_TIMESTAMP), nil
case typeJSONDocument:
return common.MakePrimitiveType(Ydb.Type_JSON_DOCUMENT), nil
default:
return nil, fmt.Errorf("convert type '%s': %w", typeName, common.ErrDataTypeNotSupported)
}
Expand Down Expand Up @@ -255,7 +258,7 @@ func makeAcceptorAppender(
return makeAcceptorAppenderCheckOptional[float32, float32, *array.Float32Builder](optional, cc.Float32())
case typeDouble:
return makeAcceptorAppenderCheckOptional[float64, float64, *array.Float64Builder](optional, cc.Float64())
case typeString:
case typeString, typeJSONDocument:
return makeAcceptorAppenderCheckOptional[[]byte, []byte, *array.BinaryBuilder](optional, cc.Bytes())
case typeUtf8, typeJSON:
return makeAcceptorAppenderCheckOptional[string, string, *array.StringBuilder](optional, cc.String())
Expand Down
4 changes: 4 additions & 0 deletions common/arrow_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
builder = array.NewUint32Builder(arrowAllocator)
case Ydb.Type_TIMESTAMP:
builder = array.NewUint64Builder(arrowAllocator)
case Ydb.Type_JSON_DOCUMENT:
builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary)
default:
return nil, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)
}
Expand Down Expand Up @@ -223,6 +225,8 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Uint32}
case Ydb.Type_TIMESTAMP:
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Uint64}
case Ydb.Type_JSON_DOCUMENT:
field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.Binary}
default:
return arrow.Field{}, fmt.Errorf("register type '%v': %w", typeID, ErrDataTypeNotSupported)
}
Expand Down
29 changes: 29 additions & 0 deletions common/call_stack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package common

import "runtime"

func GetCallStackFunctionNames() []string {
var functionNames []string

pc := make([]uintptr, 20)

n := runtime.Callers(2, pc)
if n == 0 {
return functionNames
}

pc = pc[:n]
frames := runtime.CallersFrames(pc)

for {
frame, more := frames.Next()

functionNames = append(functionNames, frame.Function)

if !more {
break
}
}

return functionNames
}
19 changes: 16 additions & 3 deletions tests/infra/datasource/ydb/init/01_basic.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#!/bin/bash

/ydb -p tests-ydb-client yql -s '
set -x

/ydb -p tests-ydb-client yql -s '
CREATE TABLE simple (id Int32 NOT NULL, col1 String NOT NULL, col2 Int32 NOT NULL, PRIMARY KEY (id));
COMMIT;
INSERT INTO simple (id, col1, col2) VALUES
Expand All @@ -12,7 +13,6 @@
(5, "ydb_e", 50);
COMMIT;
CREATE TABLE primitives (
id Int32 NOT NULL,
col_01_bool Bool NOT NULL,
Expand Down Expand Up @@ -138,4 +138,17 @@
(5, "e");
COMMIT;
'


# YQ-3494
/ydb -p tests-ydb-client yql -s "
CREATE TABLE json_document (
id INT32 NOT NULL,
data JsonDocument NOT NULL,
PRIMARY KEY (id)
);
COMMIT;
INSERT INTO json_document (id, data) VALUES
(1, JsonDocument('{\"key1\": \"value1\"}')),
(2, JsonDocument('{\"key2\": \"value2\"}'));
COMMIT;
"
10 changes: 10 additions & 0 deletions tests/infra/datasource/ydb/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,20 @@ func (s *Suite) TestPushdownStringsStringOptional() {
)
}

// YQ-3949
func (s *Suite) TestJSONDocument() {
s.ValidateTable(s.dataSource, tables["json_document"])
}

func (s *Suite) TestLargeTable() {
// For tables larger than 1000 rows, scan queries must be used,
// otherwise output will be truncated.
// https://ydb.tech/docs/en/concepts/scan_query
// This test makes sense only for Table Service.
if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE {
s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode")
}

s.ValidateTable(
s.dataSource,
tables["large"],
Expand Down
22 changes: 22 additions & 0 deletions tests/infra/datasource/ydb/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,28 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{
},
},
},
// YQ-3949
"json_document": {
Name: "json_document",
IDArrayBuilderFactory: newInt32IDArrayBuilder(memPool),
Schema: &test_utils.TableSchema{
Columns: map[string]*Ydb.Type{
"id": common.MakePrimitiveType(Ydb.Type_INT32),
"data": common.MakePrimitiveType(Ydb.Type_JSON_DOCUMENT),
},
},
Records: []*test_utils.Record[int32, *array.Int32Builder]{
{
Columns: map[string]any{
"id": []int32{1, 2},
"data": [][]byte{
[]byte("{\"key1\":\"value1\"}"),
[]byte("{\"key2\":\"value2\"}"),
},
},
},
},
},
}

func pushdownSchemaYdb() *test_utils.TableSchema {
Expand Down
15 changes: 14 additions & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tests

import (
"flag"
"fmt"
"log"
"testing"
Expand All @@ -21,9 +22,13 @@ import (
)

// TODO: find the way of passing this object into suites as a parameter instead of global var
var state *suite.State
var (
state *suite.State
)

func TestMain(m *testing.M) {
flag.Parse()

var err error

state, err = suite.NewState()
Expand All @@ -35,14 +40,18 @@ func TestMain(m *testing.M) {
}

func TestClickHouse(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, clickhouse.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "ClickHouse")))
}

func TestPostgreSQL(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, postgresql.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "PostgreSQL")))
}

func TestYDB(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)

modes := []config.TYdbConfig_Mode{
config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES,
config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE,
Expand All @@ -60,17 +69,21 @@ func TestYDB(t *testing.T) {
}

func TestGreenplum(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, greenplum.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "Greenplum")))
}

func TestMySQL(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, mysql.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "MySQL")))
}

func TestOracle(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, oracle.NewSuite(suite.NewBase[int64, *array.Int64Builder](t, state, "Oracle")))
}

func TestMsSqlServer(t *testing.T) {
state.SkipSuiteIfNotEnabled(t)
testify_suite.Run(t, ms_sql_server.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "MS SQL Server")))
}
38 changes: 37 additions & 1 deletion tests/suite/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,53 @@ package suite
import (
"flag"
"fmt"
"log"
"os"
"strings"
"testing"

"github.com/ydb-platform/fq-connector-go/common"
"github.com/ydb-platform/fq-connector-go/tests/infra/docker_compose"
)

// CLI parameters
var projectPath = flag.String("projectPath", "", "path to fq-connector-go source dir")
var (
projectPath = flag.String("projectPath", "", "path to fq-connector-go source dir")
suiteName = flag.String("suiteName", "", "specifies the test suite one wants to run")
)

// Global-scope services and objects that will be accessible from every test suite
// during the testing lifecycle
type State struct {
EndpointDeterminer *docker_compose.EndpointDeterminer
suiteName string
}

func (s *State) SkipSuiteIfNotEnabled(t *testing.T) {
if s.suiteName == "" {
// if no suite specified, run all suites
return
}

functionNames := common.GetCallStackFunctionNames()
if len(functionNames) == 0 {
t.FailNow()
return
}

for _, functionName := range functionNames {
if strings.Contains(functionName, "Test") {
actualSuiteName := strings.TrimLeft(strings.Split(functionName, ".")[2], "Test")
if actualSuiteName == s.suiteName {
return
}

log.Printf("Suite '%s' skipped as it doesn't match flag value '%s'\n", actualSuiteName, s.suiteName)
t.SkipNow()

return
}
}
}

func NewState() (*State, error) {
Expand All @@ -40,6 +75,7 @@ func NewState() (*State, error) {

result := &State{
EndpointDeterminer: ed,
suiteName: *suiteName,
}

return result, nil
Expand Down
Loading

0 comments on commit 88c9a7f

Please sign in to comment.