Skip to content

Commit

Permalink
[Flink] Add export to external dbs for LakeSoul's tables (#376)
Browse files Browse the repository at this point in the history
* Table out of the lake

Signed-off-by: ChenYunHey <[email protected]>

* supprt doris

Signed-off-by: ChenYunHey <[email protected]>

* add head license

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Dec 21, 2023
1 parent a0d93af commit f5e075e
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 1 deletion.
11 changes: 10 additions & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ SPDX-License-Identifier: Apache-2.0
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
Expand All @@ -132,6 +131,16 @@ SPDX-License-Identifier: Apache-2.0
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.17</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0


package org.apache.flink.lakesoul.entry;

import com.dmetasoul.lakesoul.meta.external.oracle.OracleDBManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0


package org.apache.flink.lakesoul.entry;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;

import java.sql.*;

import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;

public class SyncDatabase {

public static void main(String[] args) throws SQLException {
ParameterTool parameter = ParameterTool.fromArgs(args);

String sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
String sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
String targeSyncName = parameter.get(TARGET_DATABASE.key());
String targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
String targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
String url = parameter.get(TARGET_DB_URL.key());
String username = parameter.get(TARGET_DB_USER.key());
String password = parameter.get(TARGET_DB_PASSWORD.key());
int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());

String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(sinkParallelism);

switch (targeSyncName) {
case "mysql":
xsyncToMysql(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
break;
case "postgres":
xsyncToPg(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
break;
case "doris":
xsyncToDoris(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName, fenodes);
break;
default:
throw new RuntimeException("not supported the database: " + targeSyncName);
}
}

public static String dorisCreateTableSql(String[] stringFieldTypes, String[] fieldNames, String targetTableName, String pk, int replicationNum) {
StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS `")
.append(targetTableName)
.append("` (");
for (int i = 0; i < fieldNames.length; i++) {
String dataType = stringFieldTypes[i];
String nullable = stringFieldTypes[i].contains("NULL") ? "" : " NOT NULL";
createTableQuery.append("`").append(fieldNames[i]).append("` ").append(dataType).append(nullable);

if (i != fieldNames.length - 1) {
createTableQuery.append(", ");
}
}
createTableQuery.append(")").append(" DISTRIBUTED BY HASH(").append(pk).append(") BUCKETS 32")
.append(" PROPERTIES('replication_num'='").append(replicationNum).append("')");
return createTableQuery.toString();
}

public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[] fieldNames, String targetTableName, String pk) {
StringBuilder createTableQuery = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(targetTableName)
.append(" (");
for (int i = 0; i < fieldNames.length; i++) {
String dataType = stringFieldTypes[i];
//String nullable = stringFieldTypes[i].contains("NULL") ? "" : " NOT NULL";
createTableQuery.append(fieldNames[i]).append(" ").append(dataType);
if (i != fieldNames.length - 1) {
createTableQuery.append(", ");
}
}
createTableQuery.append(" ,PRIMARY KEY(").append(pk);
createTableQuery.append("))");
return createTableQuery.toString();
}

public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];

for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
stringFieldTypes[i] = mysqlType;
} else if (fieldTypes[i].getLogicalType() instanceof DecimalType) {
stringFieldTypes[i] = "FLOAT";
} else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
stringFieldTypes[i] = "BINARY";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
stringFieldTypes[i] = "BOOLEAN";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
return stringFieldTypes;
}

public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];

for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
stringFieldTypes[i] = mysqlType;
} else if (fieldTypes[i].getLogicalType() instanceof DoubleType) {
stringFieldTypes[i] = "FLOAT8";
} else if (fieldTypes[i].getLogicalType() instanceof FloatType) {
stringFieldTypes[i] = "FLOAT4";
} else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
stringFieldTypes[i] = "BYTEA";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
return stringFieldTypes;
}

public static String[] getDorisFieldTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType;
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR";
} else {
mysqlType = "TEXT";
}
stringFieldTypes[i] = mysqlType;
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
return stringFieldTypes;
}

public static String getTablePk(String sourceDataBae, String sourceTableName) {
DBManager dbManager = new DBManager();
TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae);
String tableProperties = tableInfo.getProperties();
JSONObject jsonObject = JSON.parseObject(tableProperties);
return jsonObject.getString("hashPartitions");
}

public static String getColumns(String[] fieldNames, DataType[] fieldTypes, String pk) {
String[] names = new String[fieldNames.length];
StringBuilder columns = new StringBuilder();
for (int i = 0; i < fieldNames.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType && pk.contains(fieldNames[i])) {
names[i] = "CAST(" + fieldNames[i] + " AS CHAR(85)) " + fieldNames[i];
} else {
names[i] = fieldNames[i];
}
columns.append(names[i]);
if (i < fieldNames.length - 1) {
columns.append(", ");
}
}
return columns.toString();
}

public static String tableFields(Connection conn, String tableName) throws SQLException {
DatabaseMetaData databaseMetaData = conn.getMetaData();
ResultSet specificResultSet = databaseMetaData.getColumns(null, "%", tableName, "%");
String columnName2;
String columnType2;
StringBuilder createSql = new StringBuilder();
while (specificResultSet.next()) {
columnName2 = specificResultSet.getString("COLUMN_NAME");
columnType2 = specificResultSet.getString("TYPE_NAME");
createSql.append(columnName2).append(" ").append(columnType2);
if (!specificResultSet.isLast()) {
createSql.append(",");
}
}
return createSql.toString();
}

public static void xsyncToPg(StreamExecutionEnvironment env, boolean bathXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName) throws SQLException {
if (bathXync) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
tEnvs.useCatalog("lakeSoul");
tEnvs.useDatabase(sourceDatabase);
String jdbcUrl = url + targetDatabase;
Connection conn = DriverManager.getConnection(jdbcUrl, username, password);

TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");

String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
DataType[] fieldTypes = schemaResult.getTableSchema().getFieldDataTypes();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
String[] stringFieldsTypes = getPgFieldsTypes(fieldTypes, fieldNames, tablePk);

String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
Statement statement = conn.createStatement();
// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());
String createCatalog = "create catalog postgres_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL
tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO postgres_catalog." + targetDatabase + "." + targetTableName +
" SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
statement.close();
conn.close();
}

public static void xsyncToMysql(StreamExecutionEnvironment env, boolean bathXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName) throws SQLException {
if (bathXync) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
tEnvs.useCatalog("lakeSoul");
tEnvs.useDatabase(sourceDatabase);

TableResult schemaResult = tEnvs.executeSql(
"SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "` LIMIT 1");

String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
DataType[] fieldTypes = schemaResult.getTableSchema().getFieldDataTypes();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
String[] stringFieldsTypes = getMysqlFieldsTypes(fieldTypes, fieldNames, tablePk);

String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
String newUrl = url + targetDatabase;
Connection conn = DriverManager.getConnection(newUrl, username, password);
Statement statement = conn.createStatement();

// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());
String createCatalog = "create catalog mysql_catalog with('type'='jdbc','default-database'=" + "'" + targetDatabase + "'" + "," + "'username'=" +
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL
tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO mysql_catalog." + targetDatabase + "." + targetTableName +
" SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
// Close connections
statement.close();
conn.close();
}

public static void xsyncToDoris(StreamExecutionEnvironment env, boolean batchXync, String url, String sourceDatabase, String username, String password, String targetDatabase, String sourceTableName, String targetTableName, String fenodes) throws SQLException {
if (batchXync) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Catalog lakesoulCatalog = new LakeSoulCatalog();
tEnvs.registerCatalog("lakeSoul", lakesoulCatalog);
String jdbcUrl = url + targetDatabase;
Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
String sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, tableFields(conn, targetTableName), "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul." + sourceDatabase + "." + sourceTableName);

conn.close();
}
}
Loading

0 comments on commit f5e075e

Please sign in to comment.