From 505f754080a9ad03af94dec6a28695e78c21fe5d Mon Sep 17 00:00:00 2001 From: ChenYunHey <90120383+ChenYunHey@users.noreply.github.com> Date: Fri, 22 Mar 2024 11:22:35 +0800 Subject: [PATCH] [Flink] Fix LakeSoul table export with timestamp local timezone type (#427) * fix getMysqlFieldsTypes bug Signed-off-by: ChenYunHey <1908166778@qq.com> * Reformar Code Signed-off-by: ChenYunHey <1908166778@qq.com> * support BINARY type Signed-off-by: ChenYunHey <1908166778@qq.com> * rebuild branch Signed-off-by: ChenYunHey <1908166778@qq.com> --------- Signed-off-by: ChenYunHey <1908166778@qq.com> --- .../flink/lakesoul/entry/SyncDatabase.java | 62 ++++++++++--------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java index aaf13d0c3..91ff488ae 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java @@ -2,12 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 - package org.apache.flink.lakesoul.entry; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.amazonaws.services.dynamodbv2.xspec.S; import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.entity.TableInfo; @@ -45,8 +41,6 @@ public static void main(String[] args) throws SQLException { String password = parameter.get(TARGET_DB_PASSWORD.key()); int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue()); boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue()); - //int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue()); - String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue()); Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081-8089"); @@ -81,7 +75,7 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[] createTableQuery.append(", "); } } - if (pk!=null){ + if (pk != null) { createTableQuery.append(" ,PRIMARY KEY(").append(pk); createTableQuery.append(")"); } @@ -91,11 +85,11 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[] public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) { String[] stringFieldTypes = new String[fieldTypes.length]; - for (int i = 0; i < fieldTypes.length; i++) { + String typeName = fieldTypes[i].getLogicalType().toString(); if (fieldTypes[i].getLogicalType() instanceof VarCharType) { - String mysqlType = "TEXT"; - if (pk!=null){ + String mysqlType = "VARCHAR(255)"; + if (pk != null) { if (pk.contains(fieldNames[i])) { mysqlType = "VARCHAR(100)"; } @@ -105,10 +99,14 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field stringFieldTypes[i] = "FLOAT"; } else if (fieldTypes[i].getLogicalType() instanceof BinaryType) { stringFieldTypes[i] = "BINARY"; - } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) { + } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType || fieldTypes[i].getLogicalType() instanceof TimestampType) { + stringFieldTypes[i] = "TIMESTAMP"; + } else if (fieldTypes[i].getLogicalType().toString().equals("TIMESTAMP_LTZ(6)")) { stringFieldTypes[i] = "TIMESTAMP"; } else if (fieldTypes[i].getLogicalType() instanceof BooleanType) { stringFieldTypes[i] = "BOOLEAN"; + } else if (fieldTypes[i].getLogicalType().toString().equals("BYTES")) { + stringFieldTypes[i] = "VARBINARY(40)"; } else { stringFieldTypes[i] = fieldTypes[i].toString(); } @@ -122,7 +120,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam for (int i = 0; i < fieldTypes.length; i++) { if (fieldTypes[i].getLogicalType() instanceof VarCharType) { String mysqlType = "TEXT"; - if (pk!=null){ + if (pk != null) { if (pk.contains(fieldNames[i])) { mysqlType = "VARCHAR(100)"; } @@ -146,13 +144,12 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam public static String[] getDorisFieldTypes(DataType[] fieldTypes) { String[] stringFieldTypes = new String[fieldTypes.length]; for (int i = 0; i < fieldTypes.length; i++) { - if (fieldTypes[i].getLogicalType() instanceof TimestampType){ + if (fieldTypes[i].getLogicalType() instanceof TimestampType) { stringFieldTypes[i] = "DATETIME"; - } - else if (fieldTypes[i].getLogicalType() instanceof VarCharType){ + } else if (fieldTypes[i].getLogicalType() instanceof VarCharType) { stringFieldTypes[i] = "VARCHAR"; - } else { + } else { stringFieldTypes[i] = fieldTypes[i].toString(); } } @@ -168,7 +165,7 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) { StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < primaryKeys.size(); i++) { stringBuilder.append(primaryKeys.get(i)); - if (i