Skip to content

Commit

Permalink
replace table_info.table_schema with arrow kind schema (#354)
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>
  • Loading branch information
Ceng23333 and zenghua authored Oct 19, 2023
1 parent 9c2e093 commit 6bf075f
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public TableInfo selectByTableId(String tableId) {
String sql = String.format("select * from table_info where table_id = '%s'", tableId);
return getTableInfo(sql);
}
public List<TableInfo> selectByNamespace(String namespace){

public List<TableInfo> selectByNamespace(String namespace) {
String sql = String.format("select * from table_info where table_namespace='%s'", namespace);
return getTableInfos(sql);
}
Expand Down Expand Up @@ -105,6 +106,7 @@ private TableInfo getTableInfo(String sql) {
}
return tableInfo;
}

private List<TableInfo> getTableInfos(String sql) {
Connection conn = null;
PreparedStatement pstmt = null;
Expand Down Expand Up @@ -284,4 +286,12 @@ public static TableInfo tableInfoFromResultSet(ResultSet rs) throws SQLException
.setDomain(rs.getString("domain"))
.build();
}

public static boolean isArrowKindSchema(String schema) {
return schema.charAt(schema.indexOf('"') + 1) == 'f';
}

public static boolean isSparkKindSchema(String schema) {
return schema.charAt(schema.indexOf('"') + 1) == 't';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean
List<String> tables = listTables(databaseName);
if (!tables.isEmpty()) {
if (cascade) {
for (String table: tables) {
for (String table : tables) {
try {
dropTable(new ObjectPath(databaseName, table), true);
} catch (TableNotExistException e) {
Expand Down Expand Up @@ -207,7 +207,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
dbManager.deleteShortTableName(tableInfo.getTableName(), tableName, tablePath.getDatabaseName());
dbManager.deleteDataCommitInfo(tableId);
dbManager.deletePartitionInfoByTableId(tableId);
if(FlinkUtil.isTable(tableInfo)){
if (FlinkUtil.isTable(tableInfo)) {
Path path = new Path(tableInfo.getTablePath());
try {
path.getFileSystem().delete(path, true);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
String tableId = TABLE_ID_PREFIX + UUID.randomUUID();
String qualifiedPath = "";
String sparkSchema = FlinkUtil.toSparkSchema(schema, cdcColumn).json();
String sparkSchema = FlinkUtil.toArrowSchema(schema, cdcColumn).toJson();
List<String> partitionKeys = Collections.emptyList();
if (table instanceof ResolvedCatalogTable) {
partitionKeys = ((ResolvedCatalogTable) table).getPartitionKeys();
Expand All @@ -284,7 +284,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
} else {
String flinkWarehouseDir = GlobalConfiguration.loadConfiguration().get(FLINK_WAREHOUSE_DIR);
if (null != flinkWarehouseDir) {
path = String.join("/", flinkWarehouseDir, tablePath.getDatabaseName(), tablePath.getObjectName());
path = String.join("/", flinkWarehouseDir, tablePath.getDatabaseName(), tablePath.getObjectName());
}
}
try {
Expand All @@ -298,9 +298,9 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
}
if (table instanceof ResolvedCatalogView) {
tableOptions.put(LAKESOUL_VIEW.key(), "true");
tableOptions.put(LAKESOUL_VIEW_TYPE.key(),LAKESOUL_VIEW_TYPE.defaultValue());
tableOptions.put(VIEW_ORIGINAL_QUERY,((ResolvedCatalogView) table).getOriginalQuery());
tableOptions.put(VIEW_EXPANDED_QUERY,((ResolvedCatalogView) table).getExpandedQuery());
tableOptions.put(LAKESOUL_VIEW_TYPE.key(), LAKESOUL_VIEW_TYPE.defaultValue());
tableOptions.put(VIEW_ORIGINAL_QUERY, ((ResolvedCatalogView) table).getOriginalQuery());
tableOptions.put(VIEW_EXPANDED_QUERY, ((ResolvedCatalogView) table).getExpandedQuery());
}
String json = JSON.toJSONString(tableOptions);
JSONObject properties = JSON.parseObject(json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBConfig;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
Expand All @@ -19,6 +21,7 @@
import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter;
import org.apache.flink.lakesoul.tool.FlinkUtil;
import org.apache.flink.lakesoul.types.TableSchemaIdentity;
import org.apache.spark.sql.arrow.ArrowUtils;
import org.apache.spark.sql.arrow.DataTypeCastUtils;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,9 +119,11 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
String tableName = identity.tableId.table();
String tableNamespace = identity.tableId.schema();
boolean isCdc = identity.useCDC;
StructType msgSchema = FlinkUtil.toSparkSchema(identity.rowType, isCdc ? Optional.of(
Schema msgSchema = FlinkUtil.toArrowSchema(identity.rowType, isCdc ? Optional.of(
identity.cdcColumn) :
Optional.empty());
StructType sparkSchema = ArrowUtils.fromArrowSchema(msgSchema);

TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName, tableNamespace);
LOG.info("Committing: {}, {}, {}, {} {}", tableNamespace, tableName, isCdc, msgSchema, tableInfo);
if (tableInfo == null) {
Expand All @@ -137,7 +142,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
properties.put(CDC_CHANGE_COLUMN, CDC_CHANGE_COLUMN_DEFAULT);
}
}
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.json(),
dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(),
properties, partition);
} else {
DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
Expand All @@ -149,11 +154,17 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
!new HashSet<>(partitionKeys.rangeKeys).containsAll(identity.partitionKeyList)) {
throw new IOException("Change of partition key column of table " + tableName + " is forbidden");
}
StructType origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
StructType origSchema = null;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
} else {
origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
}
scala.Tuple3<String, Object, StructType>
equalOrCanCastTuple3 =
DataTypeCastUtils.checkSchemaEqualOrCanCast(origSchema,
msgSchema,
ArrowUtils.fromArrowSchema(msgSchema),
identity.partitionKeyList,
identity.primaryKeys);
String equalOrCanCast = equalOrCanCastTuple3._1();
Expand All @@ -162,17 +173,17 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
if (equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST())) {
LOG.warn("Schema change found, origin schema = {}, changed schema = {}",
origSchema.json(),
msgSchema.json());
msgSchema.toJson());
if (logicallyDropColumn) {
List<String> droppedColumn = DataTypeCastUtils.getDroppedColumn(origSchema, msgSchema);
List<String> droppedColumn = DataTypeCastUtils.getDroppedColumn(origSchema, sparkSchema);
if (droppedColumn.size() > 0) {
LOG.warn("Dropping Column {} Logically", droppedColumn);
dbManager.logicallyDropColumn(tableInfo.getTableId(), droppedColumn);
if (schemaChanged) {
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}
} else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.json());
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
} else {
LOG.info("Changing table schema: {}, {}, {}, {}, {}, {}",
Expand All @@ -182,7 +193,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
msgSchema,
identity.useCDC,
identity.cdcColumn);
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.json());
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
if (JSONObject.parseObject(tableInfo.getProperties()).containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
dbManager.removeLogicallyDropColumn(tableInfo.getTableId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.lakesoul.io.NativeIOBase;
import com.dmetasoul.lakesoul.meta.*;
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -30,6 +33,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.fs.permission.FsAction;
Expand Down Expand Up @@ -72,6 +76,75 @@ public static String getRangeValue(CatalogPartitionSpec cps) {
return "Null";
}

public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(RowType rowType, Optional<String> cdcColumn) throws CatalogException {
List<Field> fields = new ArrayList<>();
String cdcColName = null;
if (cdcColumn.isPresent()) {
cdcColName = cdcColumn.get();
Field cdcField = ArrowUtils.toArrowField(cdcColName, new VarCharType(false, 16));
fields.add(cdcField);
}

for (RowType.RowField field : rowType.getFields()) {
String name = field.getName();
if (name.equals(SORT_FIELD)) continue;

LogicalType logicalType = field.getType();
Field arrowField = ArrowUtils.toArrowField(name, logicalType);
if (name.equals(cdcColName)) {
if (!arrowField.toString().equals(fields.get(0).toString())) {
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
"has an invalid field of" +
field +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
fields.get(0).toString());
}
} else {
fields.add(arrowField);
}
}
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

public static org.apache.arrow.vector.types.pojo.Schema toArrowSchema(TableSchema tsc, Optional<String> cdcColumn) throws CatalogException {
List<Field> fields = new ArrayList<>();
String cdcColName = null;
if (cdcColumn.isPresent()) {
cdcColName = cdcColumn.get();
Field cdcField = ArrowUtils.toArrowField(cdcColName, new VarCharType(false, 16));
fields.add(cdcField);
}

for (int i = 0; i < tsc.getFieldCount(); i++) {
String name = tsc.getFieldName(i).get();
DataType dt = tsc.getFieldDataType(i).get();
if (name.equals(SORT_FIELD)) continue;

LogicalType logicalType = dt.getLogicalType();
Field arrowField = ArrowUtils.toArrowField(name, logicalType);
if (name.equals(cdcColName)) {
if (!arrowField.toString().equals(fields.get(0).toString())) {
throw new CatalogException(CDC_CHANGE_COLUMN +
"=" +
cdcColName +
"has an invalid field of" +
arrowField +
"," +
CDC_CHANGE_COLUMN +
" require field of " +
fields.get(0).toString());
}
} else {
fields.add(arrowField);
}
}
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}

public static StructType toSparkSchema(RowType rowType, Optional<String> cdcColumn) throws CatalogException {
StructType stNew = new StructType();

Expand Down Expand Up @@ -220,10 +293,18 @@ public static CatalogBaseTable toFlinkCatalog(TableInfo tableInfo) {
String tableSchema = tableInfo.getTableSchema();
JSONObject properties = JSON.parseObject(tableInfo.getProperties());

StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableSchema);
org.apache.arrow.vector.types.pojo.Schema
arrowSchema =
org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
org.apache.arrow.vector.types.pojo.Schema arrowSchema = null;
System.out.println(tableSchema);
if (TableInfoDao.isArrowKindSchema(tableSchema)) {
try {
arrowSchema = org.apache.arrow.vector.types.pojo.Schema.fromJSON(tableSchema);
} catch (IOException e) {
throw new CatalogException(e);
}
} else {
StructType struct = (StructType) org.apache.spark.sql.types.DataType.fromJson(tableSchema);
arrowSchema = org.apache.spark.sql.arrow.ArrowUtils.toArrowSchema(struct, ZoneId.of("UTC").toString());
}
RowType rowType = ArrowUtils.fromArrowSchema(arrowSchema);
Builder bd = Schema.newBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.spark.sql.arrow.ArrowUtils;
import org.apache.spark.sql.types.StructType;
import org.assertj.core.api.Assertions;
import org.junit.Before;
Expand Down Expand Up @@ -82,8 +83,7 @@ public void createTable() {
tEnvs.executeSql("show tables").print();
TableInfo info = DbManage.getTableInfoByNameAndNamespace("user_behaviorgg", "test_lakesoul_meta");
assertEquals(info.getTableSchema(),
new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType)
.json());
ArrowUtils.toArrowSchema(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType), "UTC").toJson());
tEnvs.executeSql("DROP TABLE user_behaviorgg");
}

Expand All @@ -94,11 +94,11 @@ public void createTableWithLike() {
"'lakesoul_meta_host_port'='9043', 'path'='/tmp/user_behaviorgg', 'use_cdc'='true')");

TableInfo info = DbManage.getTableInfoByNameAndNamespace("user_behaviorgg", "test_lakesoul_meta");
Assertions.assertThat(info.getTableSchema()).isEqualTo(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType, false).json());
Assertions.assertThat(info.getTableSchema()).isEqualTo(ArrowUtils.toArrowSchema(new StructType().add("name", StringType, false).add("user_id", LongType, false).add("dt", StringType), "UTC").toJson());

tEnvs.executeSql("CREATE TABLE if not exists like_table with ('path'='/tmp/like_table') like user_behaviorgg");
TableInfo info2 = DbManage.getTableInfoByNameAndNamespace("like_table", "test_lakesoul_meta");
Assertions.assertThat(info2.getTableSchema()).isEqualTo(new StructType().add("user_id", LongType, false).add("dt", StringType).add("name", StringType, false).json());
Assertions.assertThat(info2.getTableSchema()).isEqualTo(ArrowUtils.toArrowSchema(new StructType().add("name", StringType, false).add("user_id", LongType, false).add("dt", StringType), "UTC").toJson());
Assertions.assertThat(JSON.parseObject(info.getProperties()).get("lakesoul_cdc_change_column")).isEqualTo(JSON.parseObject(info2.getProperties()).get("lakesoul_cdc_change_column"));
Assertions.assertThat(JSON.parseObject(info.getProperties()).get("path")).isEqualTo("/tmp/user_behaviorgg");
Assertions.assertThat(JSON.parseObject(info2.getProperties()).get("path")).isEqualTo("/tmp/like_table");
Expand Down
Loading

0 comments on commit 6bf075f

Please sign in to comment.