Skip to content

Commit

Permalink
[Flink] Support SQLServer CDC Sink (#421)
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Jan 17, 2024
1 parent b8e8b21 commit 0977cf8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
5 changes: 5 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ SPDX-License-Identifier: Apache-2.0
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@

import com.dmetasoul.lakesoul.meta.external.NameSpaceManager;
import com.dmetasoul.lakesoul.meta.external.mysql.MysqlDBManager;
import com.dmetasoul.lakesoul.meta.external.oracle.OracleDBManager;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
//import com.ververica.cdc.connectors.mysql.source.MySqlSource;
//import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder;
Expand Down Expand Up @@ -64,7 +62,7 @@ public static void main(String[] args) throws Exception {
host = parameter.get(SOURCE_DB_HOST.key());
port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
//Postgres Oracle
if (!dbType.equals("mysql")) {
if (dbType.equals("orcale") || dbType.equalsIgnoreCase("postgresql")) {
schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String[] tables = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
tableList = new String[tables.length];
Expand All @@ -73,6 +71,9 @@ public static void main(String[] args) throws Exception {
}
splitSize = parameter.getInt(SOURCE_DB_SPLIT_SIZE.key(), SOURCE_DB_SPLIT_SIZE.defaultValue());
}
if (dbType.equalsIgnoreCase("sqlserver")){
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
pluginName = parameter.get(PLUGIN_NAME.key(), PLUGIN_NAME.defaultValue());
//flink
String databasePrefixPath = parameter.get(WAREHOUSE_PATH.key());
Expand Down Expand Up @@ -135,6 +136,9 @@ public static void main(String[] args) throws Exception {
if (dbType.equalsIgnoreCase("oracle")) {
oracleCdc(lakeSoulRecordConvert, conf, env);
}
if (dbType.equalsIgnoreCase("sqlserver")){
sqlserverCdc(lakeSoulRecordConvert, conf, env);
}

}

Expand Down Expand Up @@ -244,4 +248,32 @@ private static void oracleCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Confi
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From Oracle Database " + dbName);
}

public static void sqlserverCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Configuration conf, StreamExecutionEnvironment env) throws Exception {
SqlServerSourceBuilder.SqlServerIncrementalSource<String> sqlServerSource =
new SqlServerSourceBuilder()
.hostname(host)
.port(port)
.databaseList(dbName)
.tableList(tableList)
.username(userName)
.password(passWord)
.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
.startupOptions(StartupOptions.initial())
.build();
NameSpaceManager manager = new NameSpaceManager();
manager.importOrSyncLakeSoulNamespace(dbName);
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = conf;
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(sqlServerSource, context, lakeSoulRecordConvert);

DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("Sqlserver Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From sqlserver Database " + dbName);
}
}

0 comments on commit 0977cf8

Please sign in to comment.