Skip to content

Commit

Permalink
[Flink] Fix LakeSoul table export with timestamp local timezone type (#…
Browse files Browse the repository at this point in the history
…427)

* fix getMysqlFieldsTypes bug

Signed-off-by: ChenYunHey <[email protected]>

* Reformar Code

Signed-off-by: ChenYunHey <[email protected]>

* support BINARY type

Signed-off-by: ChenYunHey <[email protected]>

* rebuild branch

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored and dmetasoul01 committed Mar 28, 2024
1 parent ae23229 commit 505f754
Showing 1 changed file with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0


package org.apache.flink.lakesoul.entry;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.amazonaws.services.dynamodbv2.xspec.S;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
Expand Down Expand Up @@ -45,8 +41,6 @@ public static void main(String[] args) throws SQLException {
String password = parameter.get(TARGET_DB_PASSWORD.key());
int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());

String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
Expand Down Expand Up @@ -81,7 +75,7 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]
createTableQuery.append(", ");
}
}
if (pk!=null){
if (pk != null) {
createTableQuery.append(" ,PRIMARY KEY(").append(pk);
createTableQuery.append(")");
}
Expand All @@ -91,11 +85,11 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]

public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];

for (int i = 0; i < fieldTypes.length; i++) {
String typeName = fieldTypes[i].getLogicalType().toString();
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk!=null){
String mysqlType = "VARCHAR(255)";
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
Expand All @@ -105,10 +99,14 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
stringFieldTypes[i] = "FLOAT";
} else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
stringFieldTypes[i] = "BINARY";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType || fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType().toString().equals("TIMESTAMP_LTZ(6)")) {
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
stringFieldTypes[i] = "BOOLEAN";
} else if (fieldTypes[i].getLogicalType().toString().equals("BYTES")) {
stringFieldTypes[i] = "VARBINARY(40)";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
Expand All @@ -122,7 +120,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk!=null){
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
Expand All @@ -146,13 +144,12 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof TimestampType){
if (fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "DATETIME";
}
else if (fieldTypes[i].getLogicalType() instanceof VarCharType){
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";

} else {
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
Expand All @@ -168,7 +165,7 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < primaryKeys.size(); i++) {
stringBuilder.append(primaryKeys.get(i));
if (i<primaryKeys.size()-1){
if (i < primaryKeys.size() - 1) {
stringBuilder.append(",");
}
}
Expand Down Expand Up @@ -207,8 +204,8 @@ public static void xsyncToPg(StreamExecutionEnvironment env, boolean bathXync, S
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL
tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase+ "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" +sourceDatabase + "`.`" + sourceTableName + "`";
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase + "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
statement.close();
Expand All @@ -231,35 +228,40 @@ public static void xsyncToMysql(StreamExecutionEnvironment env, boolean batchXyn
TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");
DataType[] fieldDataTypes = schemaResult.getTableSchema().getFieldDataTypes();
String[] mysqlFieldTypes = getDorisFieldTypes(fieldDataTypes);

String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
String[] stringFieldsTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
String[] mysqlFieldTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
//String[] stringFieldsTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
String createTableSql = pgAndMsqlCreateTableSql(mysqlFieldTypes, fieldNames, targetTableName, tablePk);

Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
Statement statement = conn.createStatement();
// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());

StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(mysqlFieldTypes[i]);
if (i< fieldDataTypes.length-1){
for (int i = 0; i < mysqlFieldTypes.length; i++) {
if (mysqlFieldTypes[i].equals("VARBINARY(40)")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(mysqlFieldTypes[i]);
}
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql;
if (tablePk!=null){
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password);
}else {
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password);
}
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into "+targetTableName+" select * from lakeSoul.`"+sourceDatabase+"`."+sourceTableName);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);

statement.close();
conn.close();
Expand All @@ -286,14 +288,14 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, boolean batchXyn
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]);
if (i< fieldDataTypes.length-1){
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into "+targetTableName+" select * from lakeSoul.`"+sourceDatabase+"`."+sourceTableName);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
}
}

0 comments on commit 505f754

Please sign in to comment.