Skip to content

Commit

Permalink
Reformar Code
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey committed Jan 20, 2024
1 parent b604b0a commit 56278df
Showing 1 changed file with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]
createTableQuery.append(", ");
}
}
if (pk!=null){
if (pk != null) {
createTableQuery.append(" ,PRIMARY KEY(").append(pk);
createTableQuery.append(")");
}
Expand All @@ -91,7 +91,7 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "VARCHAR(255)";
if (pk!=null){
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
Expand All @@ -103,8 +103,7 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
stringFieldTypes[i] = "BINARY";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
}
else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
} else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
stringFieldTypes[i] = "BOOLEAN";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
Expand All @@ -119,7 +118,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk!=null){
if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
Expand All @@ -143,13 +142,12 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof TimestampType){
if (fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "DATETIME";
}
else if (fieldTypes[i].getLogicalType() instanceof VarCharType){
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";

} else {
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
Expand All @@ -165,7 +163,7 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < primaryKeys.size(); i++) {
stringBuilder.append(primaryKeys.get(i));
if (i<primaryKeys.size()-1){
if (i < primaryKeys.size() - 1) {
stringBuilder.append(",");
}
}
Expand Down Expand Up @@ -204,8 +202,8 @@ public static void xsyncToPg(StreamExecutionEnvironment env, boolean bathXync, S
"'" + username + "'" + "," + "'password'=" + "'" + password + "'" + "," + "'base-url'=" + "'" + url + "'" + ")";
// Move data from LakeSoul to MySQL
tEnvs.executeSql(createCatalog);
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase+ "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" +sourceDatabase + "`.`" + sourceTableName + "`";
String insertQuery = "INSERT INTO postgres_catalog.`" + targetDatabase + "`.`" + targetTableName +
"` SELECT * FROM lakeSoul.`" + sourceDatabase + "`.`" + sourceTableName + "`";

tEnvs.executeSql(insertQuery);
statement.close();
Expand Down Expand Up @@ -243,21 +241,21 @@ public static void xsyncToMysql(StreamExecutionEnvironment env, boolean batchXyn
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < mysqlFieldTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(mysqlFieldTypes[i]);
if (i< fieldDataTypes.length-1){
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql;
if (tablePk!=null){
if (tablePk != null) {
sql = String.format(
"create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, tablePk, "jdbc", jdbcUrl, targetTableName, username, password);
}else {
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password);
}
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into "+targetTableName+" select * from lakeSoul.`"+sourceDatabase+"`."+sourceTableName);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);

statement.close();
conn.close();
Expand All @@ -284,14 +282,14 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, boolean batchXyn
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]);
if (i< fieldDataTypes.length-1){
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
}
}
String sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into "+targetTableName+" select * from lakeSoul.`"+sourceDatabase+"`."+sourceTableName);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
}
}

0 comments on commit 56278df

Please sign in to comment.