From f5e075e9ce481f8c1fe3a8ee1ca37354953371c2 Mon Sep 17 00:00:00 2001
From: ChenYunHey <90120383+ChenYunHey@users.noreply.github.com>
Date: Thu, 21 Dec 2023 18:08:15 +0800
Subject: [PATCH] [Flink] Add export to external dbs for LakeSoul's tables
(#376)
* Table out of the lake
Signed-off-by: ChenYunHey <1908166778@qq.com>
* supprt doris
Signed-off-by: ChenYunHey <1908166778@qq.com>
* add head license
Signed-off-by: ChenYunHey <1908166778@qq.com>
---------
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
lakesoul-flink/pom.xml | 11 +-
.../flink/lakesoul/entry/OracleCdc.java | 5 +
.../flink/lakesoul/entry/SyncDatabase.java | 316 ++++++++++++++++++
.../tool/LakeSoulSinkDatabasesOptions.java | 83 +++++
4 files changed, 414 insertions(+), 1 deletion(-)
create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml
index 801f35adb..22d1aa406 100644
--- a/lakesoul-flink/pom.xml
+++ b/lakesoul-flink/pom.xml
@@ -121,7 +121,6 @@ SPDX-License-Identifier: Apache-2.0
test
test-jar
-
com.ververica
flink-sql-connector-mysql-cdc
@@ -132,6 +131,16 @@ SPDX-License-Identifier: Apache-2.0
flink-sql-connector-oracle-cdc
2.3.0
+
+ org.apache.doris
+ flink-doris-connector-1.17
+ 1.5.0
+
+
+ org.apache.flink
+ flink-connector-jdbc
+ 3.1.1-1.17
+
org.apache.flink
flink-table
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java
index bc6d29dd3..7547c36a9 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/OracleCdc.java
@@ -1,3 +1,8 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+
package org.apache.flink.lakesoul.entry;
import com.dmetasoul.lakesoul.meta.external.oracle.OracleDBManager;
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
new file mode 100644
index 000000000..1b686d3a9
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
@@ -0,0 +1,316 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+
+package org.apache.flink.lakesoul.entry;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.dmetasoul.lakesoul.meta.DBManager;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.*;
+
+import java.sql.*;
+
+import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;
+
+public class SyncDatabase {
+
+ public static void main(String[] args) throws SQLException {
+ ParameterTool parameter = ParameterTool.fromArgs(args);
+
+ String sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
+ String sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
+ String targeSyncName = parameter.get(TARGET_DATABASE.key());
+ String targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
+ String targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
+ String url = parameter.get(TARGET_DB_URL.key());
+ String username = parameter.get(TARGET_DB_USER.key());
+ String password = parameter.get(TARGET_DB_PASSWORD.key());
+ int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
+ boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
+ //int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());
+
+ String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
+ Configuration conf = new Configuration();
+ conf.setString(RestOptions.BIND_PORT, "8081-8089");
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+ env.setParallelism(sinkParallelism);
+
+ switch (targeSyncName) {
+ case "mysql":
+ xsyncToMysql(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
+ break;
+ case "postgres":
+ xsyncToPg(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
+ break;
+ case "doris":
+ xsyncToDoris(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName, fenodes);
+ break;
+ default:
+ throw new RuntimeException("not supported the database: " + targeSyncName);
+ }
+ }
+
+ public static String dorisCreateTableSql(String[] stringFieldTypes, String[] fieldNames, String targetTableName, String pk, int replicationNum) {
+ StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS `")
+ .append(targetTableName)
+ .append("` (");
+ for (int i = 0; i < fieldNames.length; i++) {
+ String dataType = stringFieldTypes[i];
+ String nullable = stringFieldTypes[i].contains("NULL") ? "" : " NOT NULL";
+ createTableQuery.append("`").append(fieldNames[i]).append("` ").append(dataType).append(nullable);
+
+ if (i != fieldNames.length - 1) {
+ createTableQuery.append(", ");
+ }
+ }
+ createTableQuery.append(")").append(" DISTRIBUTED BY HASH(").append(pk).append(") BUCKETS 32")
+ .append(" PROPERTIES('replication_num'='").append(replicationNum).append("')");
+ return createTableQuery.toString();
+ }
+
+ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[] fieldNames, String targetTableName, String pk) {
+ StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
+ .append(targetTableName)
+ .append(" (");
+ for (int i = 0; i < fieldNames.length; i++) {
+ String dataType = stringFieldTypes[i];
+ //String nullable = stringFieldTypes[i].contains("NULL") ? "" : " NOT NULL";
+ createTableQuery.append(fieldNames[i]).append(" ").append(dataType);
+ if (i != fieldNames.length - 1) {
+ createTableQuery.append(", ");
+ }
+ }
+ createTableQuery.append(" ,PRIMARY KEY(").append(pk);
+ createTableQuery.append("))");
+ return createTableQuery.toString();
+ }
+
+ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
+ String[] stringFieldTypes = new String[fieldTypes.length];
+
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
+ String mysqlType = "TEXT";
+ if (pk.contains(fieldNames[i])) {
+ mysqlType = "VARCHAR(100)";
+ }
+ stringFieldTypes[i] = mysqlType;
+ } else if (fieldTypes[i].getLogicalType() instanceof DecimalType) {
+ stringFieldTypes[i] = "FLOAT";
+ } else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
+ stringFieldTypes[i] = "BINARY";
+ } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
+ stringFieldTypes[i] = "TIMESTAMP";
+ } else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
+ stringFieldTypes[i] = "BOOLEAN";
+ } else {
+ stringFieldTypes[i] = fieldTypes[i].toString();
+ }
+ }
+ return stringFieldTypes;
+ }
+
+ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
+ String[] stringFieldTypes = new String[fieldTypes.length];
+
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
+ String mysqlType = "TEXT";
+ if (pk.contains(fieldNames[i])) {
+ mysqlType = "VARCHAR(100)";
+ }
+ stringFieldTypes[i] = mysqlType;
+ } else if (fieldTypes[i].getLogicalType() instanceof DoubleType) {
+ stringFieldTypes[i] = "FLOAT8";
+ } else if (fieldTypes[i].getLogicalType() instanceof FloatType) {
+ stringFieldTypes[i] = "FLOAT4";
+ } else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
+ stringFieldTypes[i] = "BYTEA";
+ } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
+ stringFieldTypes[i] = "TIMESTAMP";
+ } else {
+ stringFieldTypes[i] = fieldTypes[i].toString();
+ }
+ }
+ return stringFieldTypes;
+ }
+
+ public static String[] getDorisFieldTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
+ String[] stringFieldTypes = new String[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
+ String mysqlType;
+ if (pk.contains(fieldNames[i])) {
+ mysqlType = "VARCHAR";
+ } else {
+ mysqlType = "TEXT";
+ }
+ stringFieldTypes[i] = mysqlType;
+ } else {
+ stringFieldTypes[i] = fieldTypes[i].toString();
+ }
+ }
+ return stringFieldTypes;
+ }
+
+ public static String getTablePk(String sourceDataBae, String sourceTableName) {
+ DBManager dbManager = new DBManager();
+ TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae);
+ String tableProperties = tableInfo.getProperties();
+ JSONObject jsonObject = JSON.parseObject(tableProperties);
+ return jsonObject.getString("hashPartitions");
+ }
+
+ public static String getColumns(String[] fieldNames, DataType[] fieldTypes, String pk) {
+ String[] names = new String[fieldNames.length];
+ StringBuilder columns = new StringBuilder();
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (fieldTypes[i].getLogicalType() instanceof VarCharType && pk.contains(fieldNames[i])) {
+ names[i] = "CAST(" + fieldNames[i] + " AS CHAR(85)) " + fieldNames[i];
+ } else {
+ names[i] = fieldNames[i];
+ }
+ columns.append(names[i]);
+ if (i < fieldNames.length - 1) {
+ columns.append(", ");
+ }
+ }
+ return columns.toString();
+ }
+
+ public static String tableFields(Connection conn, String tableName) throws SQLException {
+ DatabaseMetaData databaseMetaData = conn.getMetaData();
+ ResultSet specificResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
+ String columnName2;
+ String columnType2;
+ StringBuilder createSql = new StringBuilder();
+ while (specificResultSet.next()) {
+ columnName2 = specificResultSet.getString("COLUMN_NAME");
+ columnType2 = specificResultSet.getString("TYPE_NAME");
+ createSql.append(columnName2).append(" ").append(columnType2);
+ if (!specificResultSet.isLast()) {
+ createSql.append(",");
+ }
+ }
+ return createSql.toString();
+ }
+
+ public static void xsyncToPg(StreamExecutionEnvironment env, boolean bathXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName) throws SQLException {
+ if (bathXync) {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
+ Catalog lakesoulCatalog = new LakeSoulCatalog();
+ tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
+ tEnvs.useCatalog("lakeSoul");
+ tEnvs.useDatabase(sourceDatabase);
+ String jdbcUrl = url + targetDatabase;
+ Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
+
+ TableResult schemaResult = tEnvs.executeSql(
+ "SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");
+
+ String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
+ DataType[] fieldTypes = schemaResult.getTableSchema().getFieldDataTypes();
+ String tablePk = getTablePk(sourceDatabase, sourceTableName);
+ String[] stringFieldsTypes = getPgFieldsTypes(fieldTypes, fieldNames, tablePk);
+
+ String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
+ Statement statement = conn.createStatement();
+ // Create the target table in MySQL
+ statement.executeUpdate(createTableSql.toString());
+ String createCatalog = "create catalog postgres_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
+ "'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
+ // Move data from LakeSoul to MySQL
+ tEnvs.executeSql(createCatalog);
+ String insertQuery = "INSERT INTO postgres_catalog." + targetDatabase + "." + targetTableName +
+ " SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";
+
+ tEnvs.executeSql(insertQuery);
+ statement.close();
+ conn.close();
+ }
+
+ public static void xsyncToMysql(StreamExecutionEnvironment env, boolean bathXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName) throws SQLException {
+ if (bathXync) {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
+ Catalog lakesoulCatalog = new LakeSoulCatalog();
+ tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
+ tEnvs.useCatalog("lakeSoul");
+ tEnvs.useDatabase(sourceDatabase);
+
+ TableResult schemaResult = tEnvs.executeSql(
+ "SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");
+
+ String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
+ DataType[] fieldTypes = schemaResult.getTableSchema().getFieldDataTypes();
+ String tablePk = getTablePk(sourceDatabase, sourceTableName);
+ String[] stringFieldsTypes = getMysqlFieldsTypes(fieldTypes, fieldNames, tablePk);
+
+ String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
+ String newUrl = url + targetDatabase;
+ Connection conn = DriverManager.getConnection(newUrl, username, password);
+ Statement statement = conn.createStatement();
+
+ // Create the target table in MySQL
+ statement.executeUpdate(createTableSql.toString());
+ String createCatalog = "create catalog mysql_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
+ "'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
+ // Move data from LakeSoul to MySQL
+ tEnvs.executeSql(createCatalog);
+ String insertQuery = "INSERT INTO mysql_catalog." + targetDatabase + "." + targetTableName +
+ " SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";
+
+ tEnvs.executeSql(insertQuery);
+ // Close connections
+ statement.close();
+ conn.close();
+ }
+
+ public static void xsyncToDoris(StreamExecutionEnvironment env, boolean batchXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName, String fenodes) throws SQLException {
+ if (batchXync) {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
+ Catalog lakesoulCatalog = new LakeSoulCatalog();
+ tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
+ String jdbcUrl = url + targetDatabase;
+ Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
+ String sql = String.format(
+ "create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
+ targetTableName, tableFields(conn, targetTableName), "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
+ tEnvs.executeSql(sql);
+ tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul." + sourceDatabase + "." + sourceTableName);
+
+ conn.close();
+ }
+}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
new file mode 100644
index 000000000..16c3a9579
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
@@ -0,0 +1,83 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+
+package org.apache.flink.lakesoul.tool;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class LakeSoulSinkDatabasesOptions extends LakeSoulSinkOptions {
+
+ public static final ConfigOption TARGET_DB_URL = ConfigOptions
+ .key("target_db.url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("source database url");
+
+ public static final ConfigOption SOURCE_DB_DB_NAME = ConfigOptions
+ .key("source_db.db_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("source database name");
+
+ public static final ConfigOption TARGET_DB_USER = ConfigOptions
+ .key("target_db.user")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("source database user_name");
+
+ public static final ConfigOption TARGET_DB_PASSWORD = ConfigOptions
+ .key("target_db.password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("source database access password");
+
+ public static final ConfigOption TARGET_DATABASE = ConfigOptions
+ .key("target_db.db_sync")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("mysql,postgres,doris");
+
+ public static final ConfigOption TARGET_DB_DB_NAME = ConfigOptions
+ .key("target_db.db_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("target ddatabase name");
+
+ public static final ConfigOption SOURCE_DB_LAKESOUL_TABLE = ConfigOptions
+ .key("source_db.table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("lakesoul table");
+
+ public static final ConfigOption TARGET_DB_TABLE_NAME = ConfigOptions
+ .key("target_db.table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("target database table");
+
+ public static final ConfigOption DORIS_REPLICATION_NUM = ConfigOptions
+ .key("doris_replication.num")
+ .intType()
+ .defaultValue(1)
+ .withDescription("doris table replication num");
+
+ public static final ConfigOption SINK_PARALLELISM = ConfigOptions
+ .key("sink_parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDescription("parallelism settings for out-of-the-lake");
+
+ public static final ConfigOption BATHC_STREAM_SINK = ConfigOptions
+ .key("use_batch")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("batch or stream for out-of-lake");
+
+ public static final ConfigOption DORIS_FENODES = ConfigOptions
+ .key("doris.fenodes")
+ .stringType()
+ .defaultValue("127.0.0.1:8030");
+}