diff --git a/docs/content/Realtime synchronization from MySQL.md b/docs/content/Realtime synchronization from MySQL.md index abbf142e..70bcd15a 100755 --- a/docs/content/Realtime synchronization from MySQL.md +++ b/docs/content/Realtime synchronization from MySQL.md @@ -84,7 +84,8 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d --sink-conf password= Password1 \ --sink-conf jdbc-url=jdbc:mysql://ip:9030 \ --sink-conf sink.label-prefix=superman \ - --table-conf replication_num=1 + --table-conf replication_num=1 \ + --table-conf fast_schema_evolution=true ``` ## Options @@ -103,4 +104,4 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d | --sink-conf password | Yes | NONE | The password of the StarRocks | | --sink-conf sink.label-prefix | Yes | No | stream load label | | --table-conf replication_num | Yes | 3 | table property | - +| --table-conf fast_schema_evolution| Yes | TRUE | table property for fast schema evolution, add/drop column diff --git a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java index 73b29978..cc882051 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/DatabaseSync.java @@ -51,6 +51,7 @@ public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); + private static final String FAST_SCHEMA_EVOLUTION = "fast_schema_evolution"; protected Configuration config; protected String database; protected TableNameConverter converter; @@ -79,6 +80,10 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); this.sinkConfig = sinkConfig; this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; + + if (!this.tableConfig.containsKey(FAST_SCHEMA_EVOLUTION)) { + this.tableConfig.put(FAST_SCHEMA_EVOLUTION, "true"); + } } public void build() throws Exception { diff --git a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java index f2b02e39..436afb25 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/StarRocksOptions.java @@ -32,7 +32,7 @@ public class StarRocksOptions implements Serializable { private String tableIdentifier; public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl) { - this.opts = new StarRocksJdbcConnectionOptions(username, password, jdbcUrl); + this.opts = new StarRocksJdbcConnectionOptions(jdbcUrl, username, password); this.tableIdentifier = tableIdentifier; } @@ -40,6 +40,10 @@ public String getTableIdentifier() { return tableIdentifier; } + public StarRocksJdbcConnectionOptions getOpts() { + return opts; + } + public String save() throws IllegalArgumentException { Properties copy = new Properties(); return IOUtils.propsToString(copy); diff --git a/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java b/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java index 740cbc65..6bf9754f 100755 --- a/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.starrocks.connector.flink.catalog.StarRocksCatalog; +import com.starrocks.connector.flink.catalog.StarRocksColumn; import com.starrocks.connector.flink.cdc.StarRocksOptions; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.StringUtils; @@ -33,9 +35,12 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; import java.util.HashMap; import java.util.Map; -import java.util.Objects; +import java.util.ArrayList; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -59,6 +64,7 @@ public class DebeziumJsonSerializer implements Serializable { private String table; //table name of the cdc upstream, format is db.tbl private String sourceTableName; + private StarRocksCatalog starRocksCatalog; public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String sourceTableName) { this.starRocksOptions = starRocksOptions; @@ -71,6 +77,9 @@ public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); this.objectMapper.setNodeFactory(jsonNodeFactory); + this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(), + starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get()); + this.starRocksCatalog.open(); } public String process(String record) throws IOException { @@ -79,8 +88,7 @@ public String process(String record) throws IOException { String op = extractJsonNode(recordRoot, "op"); if (Objects.isNull(op)) { // schema change ddl - // starrocks 存算分离版本目前不支持schemaChange, 先注释掉 - // schemaChange(recordRoot); + schemaChange(recordRoot); return INVALID_RESULT; } Map valueMap; @@ -107,22 +115,17 @@ public String process(String record) throws IOException { @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { - boolean status = false; + try{ - if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){ - return false; - } - String ddl = extractDDL(recordRoot); - if(StringUtils.isNullOrWhitespaceOnly(ddl)){ - LOG.info("ddl can not do schema change:{}", recordRoot); + if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) { return false; } - // TODO Exec schema change - LOG.info("schema change status:{}", status); + + extractDDLAndExecute(recordRoot); }catch (Exception ex){ LOG.warn("schema change error :", ex); } - return status; + return true; } /** @@ -174,27 +177,47 @@ private Map extractRow(JsonNode recordRow) { return recordMap != null ? recordMap : new HashMap<>(); } - public String extractDDL(JsonNode record) throws JsonProcessingException { + private void extractDDLAndExecute(JsonNode record) throws JsonProcessingException { String historyRecord = extractJsonNode(record, "historyRecord"); if (Objects.isNull(historyRecord)) { - return null; + return; } String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl"); LOG.debug("received debezium ddl :{}", ddl); if (!Objects.isNull(ddl)) { //filter add/drop operation Matcher matcher = addDropDDLPattern.matcher(ddl); - if(matcher.find()){ + if (matcher.find()) { String op = matcher.group(1); String col = matcher.group(3); + + if (op.equalsIgnoreCase("drop")) { + execDropDDL(col); + return; + } + String type = matcher.group(5); type = handleType(type); - ddl = String.format(EXECUTE_DDL, starRocksOptions.getTableIdentifier(), op, col, type); - LOG.info("parse ddl:{}", ddl); - return ddl; + execAddDDL(col, type); } } - return null; + } + + private void execAddDDL(String col, String type) { + List toAddColumns = new ArrayList<>(); + StarRocksColumn.Builder builder = new StarRocksColumn.Builder() + .setColumnName(col) + .setDataType(type); + + toAddColumns.add(builder.build()); + + starRocksCatalog.alterAddColumns(database, table, toAddColumns, 30); + + } + + private void execDropDDL(String col) { + List cols = Arrays.asList(col); + starRocksCatalog.alterDropColumns(database, table, cols, 30); } public static DebeziumJsonSerializer.Builder builder() { diff --git a/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java b/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java index f80d7051..46056587 100755 --- a/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java +++ b/src/test/java/com/starrocks/connector/flink/cdc/StarRocksCdcToolsTest.java @@ -61,6 +61,7 @@ public static void main(String[] args) throws Exception{ Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); + tableConfig.put("fast_schema_evolution", "true"); String includingTables = "tbl1|tbl2|tbl3"; String excludingTables = "";