Skip to content

Commit

Permalink
fix schema test case. fix buffer ffi alignment
Browse files Browse the repository at this point in the history
Signed-off-by: chenxu <[email protected]>
  • Loading branch information
dmetasoul01 committed Jan 10, 2025
1 parent d57618b commit 811b553
Show file tree
Hide file tree
Showing 5 changed files with 671 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package org.apache.flink.lakesoul.test.connector;

import com.dmetasoul.lakesoul.lakesoul.local.arrow.writers.DecimalWriter;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -35,6 +36,8 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -47,6 +50,34 @@
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.MAX_ROW_GROUP_SIZE;

public class LakeSoulArrowConnectorCase extends AbstractTestBase {
public static void main(String[] args) throws Exception {

int parallelism = 2;

StreamExecutionEnvironment
execEnv =
LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 2000L, 2000L);

Configuration conf = new Configuration();
conf.set(INFERRING_SCHEMA, true);
DataStreamSource<LakeSoulArrowWrapper> source = execEnv.fromSource(
LakeSoulArrowSource.create(
"default",
MockLakeSoulArrowSource.MockSourceFunction.tableName,
conf
),
WatermarkStrategy.noWatermarks(),
"LakeSoul Arrow Source"
);

String name = "Print Sink";
PrintSinkFunction<LakeSoulArrowWrapper> printFunction = new PrintSinkFunction<>(name, false);

DataStreamSink<LakeSoulArrowWrapper> sink = source.addSink(printFunction).name(name);
execEnv.execute("Test MockLakeSoulArrowSource.MockSourceFunction");

}

// @Test
public void test() throws Exception {
int parallelism = 2;
Expand Down Expand Up @@ -207,8 +238,11 @@ public void testSchemaChange() throws Exception {
// float 32 column
new Field("field_float32",
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null),
// date partition column
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), null)
// timestamp column
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
null)
// decimal column
, new Field("decimal", FieldType.nullable(new ArrowType.Decimal(20, 2)), null)
));

// TableInfo object can be reused
Expand Down Expand Up @@ -266,6 +300,14 @@ public void testSchemaChange() throws Exception {
}
timestampVector.setValueCount(batchSize);

// create decimal vector
DecimalVector decimalVector = (DecimalVector) arrowBatch.getVector("decimal");
decimalVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
decimalVector.set(i, i * 1000);
}
decimalVector.setValueCount(batchSize);

arrowBatch.setRowCount(batchSize);

arrowBatches.add(new LakeSoulArrowWrapper(sinkTableInfoEncoded, arrowBatch));
Expand Down Expand Up @@ -300,7 +342,10 @@ public void testSchemaChange() throws Exception {
new Field("field_float32",
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null),
// date partition column
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), null)
new Field("timestamp", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
null)
// decimal column
, new Field("decimal", FieldType.nullable(new ArrowType.Decimal(25, 2)), null)
));

// TableInfo object can be reused
Expand Down Expand Up @@ -358,6 +403,14 @@ public void testSchemaChange() throws Exception {
}
timestampVector.setValueCount(batchSize);

// create decimal vector
DecimalVector decimalVector = (DecimalVector) arrowBatch.getVector("decimal");
decimalVector.allocateNew(batchSize);
for (int i = 0; i < batchSize; i++) {
decimalVector.set(i, i * 1000);
}
decimalVector.setValueCount(batchSize);

arrowBatch.setRowCount(batchSize);

arrowBatches.add(new LakeSoulArrowWrapper(sinkTableInfoEncoded, arrowBatch));
Expand All @@ -380,32 +433,4 @@ public void testSchemaChange() throws Exception {
tEnv.executeSql("select * from `default`.`qar_table`").print();
}
}

public static void main(String[] args) throws Exception {

int parallelism = 2;

StreamExecutionEnvironment
execEnv =
LakeSoulTestUtils.createStreamExecutionEnvironment(parallelism, 2000L, 2000L);

Configuration conf = new Configuration();
conf.set(INFERRING_SCHEMA, true);
DataStreamSource<LakeSoulArrowWrapper> source = execEnv.fromSource(
LakeSoulArrowSource.create(
"default",
MockLakeSoulArrowSource.MockSourceFunction.tableName,
conf
),
WatermarkStrategy.noWatermarks(),
"LakeSoul Arrow Source"
);

String name = "Print Sink";
PrintSinkFunction<LakeSoulArrowWrapper> printFunction = new PrintSinkFunction<>(name, false);

DataStreamSink<LakeSoulArrowWrapper> sink = source.addSink(printFunction).name(name);
execEnv.execute("Test MockLakeSoulArrowSource.MockSourceFunction");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.spark.sql.arrow.DataTypeCastUtils$;
import org.apache.spark.sql.catalyst.expressions.Cast$;
import org.apache.spark.sql.types.DataTypes;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -76,15 +79,14 @@ public void testFromIntToBigInt() throws IOException, ExecutionException, Interr
);
}

@Test(expected = ExecutionException.class)
@Test()
public void testFromIntToBigIntWithoutAllowance() throws IOException, ExecutionException, InterruptedException {
Map<String, String> options = new HashMap<>();
options.put(CATALOG_PATH.key(), tempFolder.newFolder("test_sink").getAbsolutePath());
options.put(LakeSoulSinkOptions.AUTO_SCHEMA_CHANGE.key(), "true");
RowType.RowField beforeField = new RowType.RowField("a", new IntType());
RowType.RowField afterField = new RowType.RowField("a", new BigIntType());

System.setProperty("datatype.cast.allow_precision_inc", "false");
testSchemaMigration(
CatalogTable.of(Schema.newBuilder().column(beforeField.getName(), beforeField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
CatalogTable.of(Schema.newBuilder().column(afterField.getName(), afterField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
Expand All @@ -97,24 +99,23 @@ public void testFromIntToBigIntWithoutAllowance() throws IOException, ExecutionE
);
}

@Test(expected = ExecutionException.class)
@Test
public void testFromBigIntToInt() throws IOException, ExecutionException, InterruptedException {
Map<String, String> options = new HashMap<>();
options.put(CATALOG_PATH.key(), tempFolder.newFolder("test_sink").getAbsolutePath());
options.put(LakeSoulSinkOptions.AUTO_SCHEMA_CHANGE.key(), "true");
RowType.RowField beforeField = new RowType.RowField("a", new BigIntType());
RowType.RowField afterField = new RowType.RowField("a", new IntType());

System.clearProperty("datatype.cast.allow_precision_loss");
testSchemaMigration(
CatalogTable.of(Schema.newBuilder().column(beforeField.getName(), beforeField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
CatalogTable.of(Schema.newBuilder().column(afterField.getName(), afterField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
"insert into test_sink values (10000000000), (20000000000)",
"insert into test_sink values (3), (4)",
"[+I[a, BIGINT, true, null, null, null]]",
"",
"[+I[a, INT, true, null, null, null]]",
"[+I[10000000000], +I[20000000000]]",
""
"[+I[3], +I[4], +I[null], +I[null]]"
);
}

Expand Down Expand Up @@ -160,15 +161,14 @@ public void testFromFloatToDouble() throws IOException, ExecutionException, Inte
);
}

@Test(expected = ExecutionException.class)
@Test()
public void testFromFloatToDoubleWithoutAllowance() throws IOException, ExecutionException, InterruptedException {
Map<String, String> options = new HashMap<>();
options.put(CATALOG_PATH.key(), tempFolder.newFolder("test_sink").getAbsolutePath());
options.put(LakeSoulSinkOptions.AUTO_SCHEMA_CHANGE.key(), "true");
RowType.RowField beforeField = new RowType.RowField("a", new FloatType());
RowType.RowField afterField = new RowType.RowField("a", new DoubleType());

System.setProperty("datatype.cast.allow_precision_inc", "false");
testSchemaMigration(
CatalogTable.of(Schema.newBuilder().column(beforeField.getName(), beforeField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
CatalogTable.of(Schema.newBuilder().column(afterField.getName(), afterField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
Expand Down Expand Up @@ -274,24 +274,32 @@ public void testDropColumnLogically() throws IOException, ExecutionException, In
assertThat(Objects.requireNonNull(properties).toString()).contains("b");
}

@Test(expected = ExecutionException.class)
@Test
public void testCanCast() {
assertThat(DataTypeCastUtils$.MODULE$.checkDataTypeEqualOrCanCast(DataTypes.DoubleType, DataTypes.FloatType))
.isEqualTo(DataTypeCastUtils$.MODULE$.CAN_CAST());
assertThat(DataTypeCastUtils$.MODULE$.checkDataTypeEqualOrCanCast(DataTypes.createDecimalType(20, 2),
DataTypes.createDecimalType(25, 2)))
.isEqualTo(DataTypeCastUtils$.MODULE$.CAN_CAST());
}

@Test
public void testFromDoubleToFloat() throws IOException, ExecutionException, InterruptedException {
Map<String, String> options = new HashMap<>();
options.put(CATALOG_PATH.key(), tempFolder.newFolder("test_sink").getAbsolutePath());
options.put(LakeSoulSinkOptions.AUTO_SCHEMA_CHANGE.key(), "true");
RowType.RowField beforeField = new RowType.RowField("a", new DoubleType());
RowType.RowField afterField = new RowType.RowField("a", new FloatType());

System.clearProperty("datatype.cast.allow_precision_loss");
testSchemaMigration(
CatalogTable.of(Schema.newBuilder().column(beforeField.getName(), beforeField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
CatalogTable.of(Schema.newBuilder().column(afterField.getName(), afterField.getType().asSerializableString()).build(), "", Collections.emptyList(), options),
"insert into test_sink values (1.11111111111), (2.22222222222)",
"insert into test_sink values (3.3333333333), (4.44444444444)",
"[+I[a, DOUBLE, true, null, null, null]]",
"",
"[+I[a, FLOAT, true, null, null, null]]",
"[+I[1.11111111111], +I[2.22222222222]]",
""
"[+I[1.1111112], +I[2.2222223], +I[3.3333333], +I[4.4444447]]"
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ object DataTypeCastUtils {
*
* @param source
* @param target
* @return 0 if two StructType is equal, 1 if two StructType is not equal but Struct source can be cast to target, -1 if Struct source can not be cast to target
*/
def checkDataTypeEqualOrCanCast(source: DataType, target: DataType): String = {
if (source == target)
if (source == target) {
IS_EQUAL
else if (Cast.canCast(source, target)) {
} else if (Cast.canCast(source, target)) {
CAN_CAST
} else {
s"$source is not allowed to cast to $target"
Expand Down
Loading

0 comments on commit 811b553

Please sign in to comment.