Skip to content

Commit

Permalink
[Enhancement] Synchronize ddl schema change to Starrocks by parsing j…
Browse files Browse the repository at this point in the history
…son (#337)

Signed-off-by: chenhaifengkeda <[email protected]>
  • Loading branch information
chenhaifengkeda committed Feb 29, 2024
1 parent 3242e74 commit 2e730c5
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 23 deletions.
5 changes: 3 additions & 2 deletions docs/content/Realtime synchronization from MySQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
--sink-conf password= Password1 \
--sink-conf jdbc-url=jdbc:mysql://ip:9030 \
--sink-conf sink.label-prefix=superman \
--table-conf replication_num=1
--table-conf replication_num=1 \
--table-conf fast_schema_evolution=true
```

## Options
Expand All @@ -103,4 +104,4 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
| --sink-conf password | Yes | NONE | The password of the StarRocks |
| --sink-conf sink.label-prefix | Yes | No | stream load label |
| --table-conf replication_num | Yes | 3 | table property |

| --table-conf fast_schema_evolution| Yes | TRUE | table property for fast schema evolution, add/drop column
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private static final String FAST_SCHEMA_EVOLUTION = "fast_schema_evolution";
protected Configuration config;
protected String database;
protected TableNameConverter converter;
Expand Down Expand Up @@ -79,6 +80,10 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio
this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;

if (!this.tableConfig.containsKey(FAST_SCHEMA_EVOLUTION)) {
this.tableConfig.put(FAST_SCHEMA_EVOLUTION, "true");
}
}

public void build() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ public class StarRocksOptions implements Serializable {
private String tableIdentifier;

public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl) {
this.opts = new StarRocksJdbcConnectionOptions(username, password, jdbcUrl);
this.opts = new StarRocksJdbcConnectionOptions(jdbcUrl, username, password);
this.tableIdentifier = tableIdentifier;
}

public String getTableIdentifier() {
return tableIdentifier;
}

public StarRocksJdbcConnectionOptions getOpts() {
return opts;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.cdc.StarRocksOptions;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
Expand All @@ -33,9 +35,12 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -59,6 +64,7 @@ public class DebeziumJsonSerializer implements Serializable {
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private StarRocksCatalog starRocksCatalog;

public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String sourceTableName) {
this.starRocksOptions = starRocksOptions;
Expand All @@ -71,6 +77,9 @@ public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(),
starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get());
this.starRocksCatalog.open();
}

public String process(String record) throws IOException {
Expand All @@ -79,8 +88,7 @@ public String process(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
// starrocks 存算分离版本目前不支持schemaChange, 先注释掉
// schemaChange(recordRoot);
schemaChange(recordRoot);
return INVALID_RESULT;
}
Map<String, String> valueMap;
Expand All @@ -107,22 +115,17 @@ public String process(String record) throws IOException {

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;

try{
if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
return false;
}
String ddl = extractDDL(recordRoot);
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
LOG.info("ddl can not do schema change:{}", recordRoot);
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
// TODO Exec schema change
LOG.info("schema change status:{}", status);

extractDDLAndExecute(recordRoot);
}catch (Exception ex){
LOG.warn("schema change error :", ex);
}
return status;
return true;
}

/**
Expand Down Expand Up @@ -174,27 +177,47 @@ private Map<String, String> extractRow(JsonNode recordRow) {
return recordMap != null ? recordMap : new HashMap<>();
}

public String extractDDL(JsonNode record) throws JsonProcessingException {
private void extractDDLAndExecute(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
return;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);

if (op.equalsIgnoreCase("drop")) {
execDropDDL(col);
return;
}

String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, starRocksOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
execAddDDL(col, type);
}
}
return null;
}

private void execAddDDL(String col, String type) {
List<StarRocksColumn> toAddColumns = new ArrayList<>();
StarRocksColumn.Builder builder = new StarRocksColumn.Builder()
.setColumnName(col)
.setDataType(type);

toAddColumns.add(builder.build());

starRocksCatalog.alterAddColumns(database, table, toAddColumns, 30);

}

private void execDropDDL(String col) {
List<String> cols = Arrays.asList(col);
starRocksCatalog.alterDropColumns(database, table, cols, 30);
}

public static DebeziumJsonSerializer.Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void main(String[] args) throws Exception{

Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
tableConfig.put("fast_schema_evolution", "true");

String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
Expand Down

0 comments on commit 2e730c5

Please sign in to comment.