diff --git a/.github/workflows/consistency-ci.yml b/.github/workflows/consistency-ci.yml index f95902f97..253f5d2d9 100644 --- a/.github/workflows/consistency-ci.yml +++ b/.github/workflows/consistency-ci.yml @@ -56,10 +56,10 @@ jobs: - name: Init PG run: | ./script/meta_init_for_local_test.sh -j 2 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - name: Set up Python 3.9 diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 5b26dfa81..8d9ed61a1 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -15,10 +15,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - uses: actions-rs/toolchain@v1 @@ -51,10 +51,10 @@ jobs: runs-on: windows-latest steps: - uses: actions/checkout@v4 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - name: Install Protoc @@ -151,10 +151,10 @@ jobs: with: name: lakesoul-nativemetadata-x86_64-pc-windows-msvc path: ./rust/target/release/ - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - name: Install Protoc diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml index d64de6f66..bed3450f7 100644 --- a/.github/workflows/native-build.yml +++ b/.github/workflows/native-build.yml @@ -27,10 +27,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - uses: actions-rs/toolchain@v1 @@ -64,10 +64,10 @@ jobs: runs-on: windows-latest steps: - uses: actions/checkout@v4 - - name: Set up JDK 8 + - name: Set up JDK 11 uses: actions/setup-java@v4 with: - java-version: '8' + java-version: '11' distribution: 'temurin' cache: maven - name: Install Protoc diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml index 605b8e9a6..2ab1b17c8 100644 --- a/lakesoul-flink/pom.xml +++ b/lakesoul-flink/pom.xml @@ -92,6 +92,21 @@ SPDX-License-Identifier: Apache-2.0 ${flink.version} ${local.scope} + + + + io.openlineage + openlineage-flink + 1.19.0 + ${local.scope} + + + org.apache.flink + flink-connector-kafka + ${flink.version} + ${local.scope} + + org.apache.flink flink-core @@ -526,7 +541,6 @@ SPDX-License-Identifier: Apache-2.0 org.furyio:fury-core com.google.guava:guava com.google.guava:failureaccess - org.casbin:jdbc-adapter org.aspectj:aspectjrt diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/KafkaCdc.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/KafkaCdc.java index 1fb03b086..8f9c81847 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/KafkaCdc.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/KafkaCdc.java @@ -26,7 +26,10 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener; +import org.apache.flink.lakesoul.entry.sql.utils.FileUtil; import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder; +import org.apache.flink.lakesoul.tool.JobOptions; import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; import org.apache.flink.lakesoul.types.*; import org.apache.flink.streaming.api.CheckpointingMode; @@ -43,6 +46,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -53,11 +57,10 @@ public class KafkaCdc { /** - * @param args - * --bootstrap_servers localhost:9092 --topic t_test --auto_offset_reset earliest --group_id test - * --source.parallelism 4 --sink.parallelism 4 --job.checkpoint_interval 5000 - * --warehouse_path /tmp/lakesoul/kafka - * --flink.checkpoint /tmp/lakesoul/chk + * @param args --bootstrap_servers localhost:9092 --topic t_test --auto_offset_reset earliest --group_id test + * --source.parallelism 4 --sink.parallelism 4 --job.checkpoint_interval 5000 + * --warehouse_path /tmp/lakesoul/kafka + * --flink.checkpoint /tmp/lakesoul/chk * @throws Exception */ public static void main(String[] args) throws Exception { @@ -164,7 +167,30 @@ public static void main(String[] args) throws Exception { conf.set(LakeSoulSinkOptions.LOGICALLY_DROP_COLUM, logicallyDropColumn); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + String lineageUrl = System.getenv("LINEAGE_URL"); + LakeSoulInAndOutputJobListener listener = null; + StreamExecutionEnvironment env; + String appName = null; + String namespace = null; + if (lineageUrl != null) { + conf.set(JobOptions.transportTypeOption, "http"); + conf.set(JobOptions.urlOption, lineageUrl); + conf.set(JobOptions.execAttach, false); + conf.set(lineageOption, true); + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID)); + namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN"); + if (namespace == null) { + namespace = "public"; + } + listener = new LakeSoulInAndOutputJobListener(lineageUrl); + listener.jobName(appName, namespace); + listener.inputFacets("kafka." + kafkaTopic, "kafka-public", null, null); + env.registerJobListener(listener); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + } + env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class); ParameterTool pt = ParameterTool.fromMap(conf.toMap()); env.getConfig().setGlobalJobParameters(pt); @@ -207,7 +233,16 @@ public static void main(String[] args) throws Exception { LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context(); context.env = env; - context.conf = (Configuration) env.getConfiguration(); + if (lineageUrl != null) { + Map confs = ((Configuration) env.getConfiguration()).toMap(); + confs.put(linageJobName.key(), appName); + confs.put(linageJobNamespace.key(), namespace); + confs.put(lineageJobUUID.key(), listener.getRunId()); + confs.put(lineageOption.key(), "true"); + context.conf = Configuration.fromMap(confs); + } else { + context.conf = (Configuration) env.getConfiguration(); + } LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(kafkaSource, context, lakeSoulRecordConvert); DataStreamSource source = builder.buildMultiTableSource("Kafka Source"); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/LakeSoulKafkaAvroSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/LakeSoulKafkaAvroSink.java index 2c9f9b55d..a15c9949d 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/LakeSoulKafkaAvroSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/LakeSoulKafkaAvroSink.java @@ -7,6 +7,7 @@ import com.dmetasoul.lakesoul.meta.DBManager; import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -23,7 +24,10 @@ import org.apache.flink.formats.avro.RowDataToAvroConverters.RowDataToAvroConverter; import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema; import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener; +import org.apache.flink.lakesoul.entry.sql.utils.FileUtil; import org.apache.flink.lakesoul.source.arrow.LakeSoulArrowSource; +import org.apache.flink.lakesoul.tool.JobOptions; import org.apache.flink.lakesoul.tool.LakeSoulKeyGen; import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions; import org.apache.flink.lakesoul.types.LakSoulKafkaPartitioner; @@ -137,7 +141,42 @@ public static void main(String[] args) throws Exception { conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, sinkParallelism); conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + String lineageUrl = System.getenv("LINEAGE_URL"); + LakeSoulInAndOutputJobListener listener; + StreamExecutionEnvironment env; + if (lineageUrl != null) { + conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + conf.set(JobOptions.transportTypeOption, "http"); + conf.set(JobOptions.urlOption, lineageUrl); + conf.set(JobOptions.execAttach, false); + conf.set(lineageOption, true); + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + String appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID)); + String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN"); + if (namespace == null) { + namespace = "public"; + } + listener = new LakeSoulInAndOutputJobListener(lineageUrl); + listener.jobName(appName, namespace); + listener.outputFacets("Kafka." + kafkaTopic, "kafka-public", null, null); + DBManager lakesoulDBManager = new DBManager(); + TableInfo tableInfo = lakesoulDBManager.getTableInfoByNameAndNamespace(lakeSoulTableName, lakeSoulDBName); + String tableSchema = tableInfo.getTableSchema(); + Schema schema = Schema.fromJSON(tableSchema); + int size = schema.getFields().size(); + String[] colNames = new String[size]; + String[] colTypes = new String[size]; + for (int i = 0; i < size; i++) { + Field field = schema.getFields().get(i); + colNames[i] = field.getName(); + colTypes[i] = field.getType().toString(); + } + listener.inputFacets("lakesoul." + lakeSoulDBName + "." + lakeSoulTableName, tableInfo.getDomain(), colNames, colTypes); + env.registerJobListener(listener); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + } + ParameterTool pt = ParameterTool.fromMap(conf.toMap()); env.getConfig().setGlobalJobParameters(pt); env.enableCheckpointing(checkpointInterval); @@ -183,7 +222,7 @@ public static void main(String[] args) throws Exception { .build(); - Tuple4 keyInfo = getKeyInfo(lakeSoulDBName, + Tuple4 keyInfo = getKeyInfo(lakeSoulDBName, lakeSoulTableName, kafkaTopic, schemaRegistryUrl, props); ConfluentRegistryAvroSerializationSchema keySerialization; RowDataToAvroConverter keyRowDataToAvroConverter; @@ -191,7 +230,7 @@ public static void main(String[] args) throws Exception { FieldGetter[] keyFieldGetters; if (keyInfo != null) { keySerialization = keyInfo.f0; - keyRowDataToAvroConverter = keyInfo.f1 ; + keyRowDataToAvroConverter = keyInfo.f1; keyRowType = keyInfo.f2; keyFieldGetters = keyInfo.f3; } else { @@ -221,7 +260,7 @@ public void flatMap(LakeSoulArrowWrapper lakeSoulArrowWrapper, Collector 0) { try { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java index 952b3e2db..3382df6f9 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java @@ -14,12 +14,17 @@ import org.apache.flink.connector.mongodb.sink.MongoSink; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener; +import org.apache.flink.lakesoul.entry.sql.utils.FileUtil; import org.apache.flink.lakesoul.metadata.LakeSoulCatalog; +import org.apache.flink.lakesoul.tool.JobOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.table.api.Table; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.types.DataType; @@ -30,9 +35,9 @@ import java.sql.*; import java.util.*; +import java.util.stream.Collectors; import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*; -import static org.apache.flink.lakesoul.tool.JobOptions.FLINK_CHECKPOINT; import static org.apache.flink.lakesoul.tool.JobOptions.JOB_CHECKPOINT_INTERVAL; import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*; @@ -52,6 +57,8 @@ public class SyncDatabase { static int sinkParallelism; static String jdbcOrDorisOptions; static int checkpointInterval; + static LakeSoulInAndOutputJobListener listener; + static String lineageUrl = null; public static void main(String[] args) throws Exception { StringBuilder connectorOptions = new StringBuilder(); @@ -63,20 +70,20 @@ public static void main(String[] args) throws Exception { targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase(); url = parameter.get(TARGET_DB_URL.key()); checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(), JOB_CHECKPOINT_INTERVAL.defaultValue()); - if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")){ + if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")) { for (int i = 0; i < args.length; i++) { - if ( args[i].startsWith("--D")){ + if (args[i].startsWith("--D")) { connectorOptions.append("'") .append(args[i].substring(3)) .append("'") .append("=") .append("'") - .append(args[i+1]) + .append(args[i + 1]) .append("'") .append(","); } } - if (connectorOptions.length()>0){ + if (connectorOptions.length() > 0) { jdbcOrDorisOptions = connectorOptions.substring(0, connectorOptions.length() - 1); } } @@ -88,9 +95,30 @@ public static void main(String[] args) throws Exception { useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue()); Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081-8089"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + StreamExecutionEnvironment env = null; + lineageUrl = System.getenv("LINEAGE_URL"); + if (lineageUrl != null) { + conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + conf.set(JobOptions.transportTypeOption, "http"); + conf.set(JobOptions.urlOption, lineageUrl); + conf.set(JobOptions.execAttach, false); + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + String appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID)); + String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN"); + if (namespace == null) { + namespace = "public"; + } + if (useBatch) { + listener = new LakeSoulInAndOutputJobListener(lineageUrl, "BATCH"); + } else { + listener = new LakeSoulInAndOutputJobListener(lineageUrl); + } + listener.jobName(appName, namespace); + env.registerJobListener(listener); + } else { + env = StreamExecutionEnvironment.getExecutionEnvironment(conf); + } env.setParallelism(sinkParallelism); - switch (dbType) { case "mysql": xsyncToMysql(env); @@ -197,7 +225,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) { stringFieldTypes[i] = "TIMESTAMP"; } else if (fieldTypes[i].getLogicalType() instanceof VarCharType) { stringFieldTypes[i] = "VARCHAR"; - } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType ) { + } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType) { stringFieldTypes[i] = "TIMESTAMP"; } else { stringFieldTypes[i] = fieldTypes[i].toString(); @@ -223,6 +251,12 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) { return primaryKeys.size() == 0 ? null : stringBuilder.toString(); } + public static String getTableDomain(String sourceDataBae, String sourceTableName) { + DBManager dbManager = new DBManager(); + TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae); + return tableInfo.getDomain(); + } + public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException { if (useBatch) { env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -267,7 +301,7 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException } } String sql; - if (jdbcOrDorisOptions==null){ + if (jdbcOrDorisOptions == null) { if (tablePk != null) { sql = String.format( "create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')", @@ -276,7 +310,7 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')", targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism); } - }else { + } else { if (tablePk != null) { sql = String.format( "create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)", @@ -331,7 +365,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept } } String sql; - if (jdbcOrDorisOptions==null){ + if (jdbcOrDorisOptions == null) { if (tablePk != null) { sql = String.format( "create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')", @@ -340,7 +374,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s')", targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism); } - }else { + } else { if (tablePk != null) { sql = String.format( "create table %s(%s ,PRIMARY KEY (%s) NOT ENFORCED) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s', 'sink.parallelism' = '%s', %s)", @@ -358,7 +392,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept conn.close(); } - public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws SQLException { + public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws Exception { if (useBatch) { env.setRuntimeMode(RuntimeExecutionMode.BATCH); } else { @@ -374,6 +408,14 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes(); String[] fieldNames = lakesoulTable.getSchema().getFieldNames(); String[] dorisFieldTypes = getDorisFieldTypes(fieldDataTypes); + if (lineageUrl != null) { + String inputName = "lakeSoul." + sourceDatabase + "." + sourceTableName; + String inputnNamespace = getTableDomain(sourceDatabase, sourceTableName); + String[] inputTypes = Arrays.stream(fieldDataTypes).map(type -> type.toString()).collect(Collectors.toList()).toArray(new String[0]); + listener.inputFacets(inputName, inputnNamespace, fieldNames, inputTypes); + String targetName = "doris." + targetDatabase + "." + targetTableName; + listener.outputFacets(targetName, "lake-public", fieldNames, dorisFieldTypes); + } StringBuilder coulmns = new StringBuilder(); for (int i = 0; i < fieldDataTypes.length; i++) { coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]); @@ -382,7 +424,7 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) } } String sql; - if (jdbcOrDorisOptions == null){ + if (jdbcOrDorisOptions == null) { sql = String.format( "create table %s(%s) with ('connector' = '%s'," + " 'jdbc-url' = '%s'," + @@ -393,7 +435,7 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) " 'sink.properties.format' = 'json'," + " 'sink.properties.read_json_by_line' = 'true')", targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password); - }else { + } else { sql = String.format( "create table %s(%s) with ('connector' = '%s'," + " 'jdbc-url' = '%s'," + @@ -408,7 +450,16 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) } tEnvs.executeSql(sql); - tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName); + if (lineageUrl != null) { + String insertsql = "insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName; + StreamStatementSet statements = tEnvs.createStatementSet(); + statements.addInsertSql(insertsql); + statements.attachAsDataStream(); + env.execute(); + } else { + tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName); + + } } public static void xsyncToMongodb(StreamExecutionEnvironment env, @@ -443,4 +494,4 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env, rowDataStream.sinkTo(sink).setParallelism(sinkParallelism); env.execute(); } -} \ No newline at end of file +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java index f1b9e5172..1cddecf98 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java @@ -42,7 +42,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t Parser parser = ((TableEnvironmentInternal) tableEnv).getParser(); StreamStatementSet statementSet = tableEnv.createStatementSet(); - + Boolean hasModifiedOp = false; for (String statement : statements) { Operation operation; try { @@ -75,6 +75,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t } else if (operation instanceof ModifyOperation) { System.out.println(MessageFormatter.format("\n======Executing insertion:\n{}", statement).getMessage()); // add insertion to statement set + hasModifiedOp = true; statementSet.addInsertSql(statement); } else if ((operation instanceof QueryOperation) || (operation instanceof AddJarOperation)) { LOG.warn("SQL Statement {} is ignored", statement); @@ -85,8 +86,16 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t tableEnv.executeSql(statement).print(); } } - statementSet.attachAsDataStream(); - Configuration conf = (Configuration) env.getConfiguration(); + if (hasModifiedOp) { + statementSet.attachAsDataStream(); + Configuration conf = (Configuration) env.getConfiguration(); + + // try get k8s cluster name + String k8sClusterID = conf.getString("kubernetes.cluster-id", ""); + env.execute(k8sClusterID.isEmpty() ? null : k8sClusterID); + } else { + System.out.println("There's no INSERT INTO statement, the program will terminate"); + } // try get k8s cluster name String k8sClusterID = conf.getString("kubernetes.cluster-id", ""); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java index 847358b7b..0ac4c3bdc 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java @@ -4,13 +4,17 @@ package org.apache.flink.lakesoul.entry.sql.flink; +import io.openlineage.flink.OpenLineageFlinkJobListener; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobListener; import org.apache.flink.lakesoul.entry.sql.Submitter; import org.apache.flink.lakesoul.entry.sql.common.FlinkOption; import org.apache.flink.lakesoul.entry.sql.common.JobType; import org.apache.flink.lakesoul.entry.sql.common.SubmitOption; import org.apache.flink.lakesoul.entry.sql.utils.FileUtil; +import org.apache.flink.lakesoul.tool.JobOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -33,6 +37,13 @@ public FlinkSqlSubmitter(SubmitOption submitOption) { @Override public void submit() throws Exception { + String lineageUrl = System.getenv("LINEAGE_URL"); + Configuration conf = new Configuration(); + if (lineageUrl != null) { + conf.set(JobOptions.transportTypeOption, "http"); + conf.set(JobOptions.urlOption, lineageUrl); + conf.set(JobOptions.execAttach, false); + } EnvironmentSettings settings = null; StreamTableEnvironment tEnv = null; if (submitOption.getJobType().equals(JobType.STREAM.getType())) { @@ -48,10 +59,24 @@ public void submit() throws Exception { } else { throw new RuntimeException("jobType is not supported: " + submitOption.getJobType()); } - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); if (submitOption.getJobType().equals(JobType.STREAM.getType())) { this.setCheckpoint(env); } + if (lineageUrl != null) { + String appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID)); + String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN"); + if (namespace == null) { + namespace = "lake-public"; + } + LOG.info("----namespace:table----{}:{}", appName, namespace); + JobListener listener = OpenLineageFlinkJobListener.builder() + .executionEnvironment(env) + .jobName(appName) + .jobNamespace(namespace) + .build(); + env.registerJobListener(listener); + } tEnv = StreamTableEnvironment.create(env, settings); String sql = FileUtil.readHDFSFile(submitOption.getSqlFilePath()); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java new file mode 100644 index 000000000..d984d47cc --- /dev/null +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java @@ -0,0 +1,185 @@ +// SPDX-FileCopyrightText: 2023 LakeSoul Contributors +// +// SPDX-License-Identifier: Apache-2.0 +package org.apache.flink.lakesoul.entry.sql.flink; + + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import io.openlineage.client.transports.HttpConfig; +import io.openlineage.client.transports.HttpTransport; +import io.openlineage.client.transports.Transport; +import io.openlineage.flink.client.EventEmitter; +import io.openlineage.flink.shaded.org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.core.execution.DetachedJobExecutionResult; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.JobListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.*; + +public class LakeSoulInAndOutputJobListener implements JobListener { + private static final Logger log = LoggerFactory.getLogger(LakeSoulInAndOutputJobListener.class); + private OpenLineageClient client; + private OpenLineage openLineage; + private List inputDatasets; + private List outputDatasets; + private String executeMode = "STREAM"; + + public String getRunId() { + return runId.toString(); + } + + private UUID runId; + OpenLineage.Run run; + private OpenLineage.Job job; + + public LakeSoulInAndOutputJobListener(String url) { + HttpConfig config = new HttpConfig(); + config.setUrl(URI.create(url)); + Transport transport = new HttpTransport(config); + client = new OpenLineageClient(transport); + openLineage = new OpenLineage(EventEmitter.OPEN_LINEAGE_CLIENT_URI); + } + + public LakeSoulInAndOutputJobListener(String url, String executeMode) { + this(url); + this.executeMode = executeMode; + } + + public LakeSoulInAndOutputJobListener jobName(String name, String namespace,String uuid) { + this.runId = UUID.fromString(uuid); + this.run = openLineage.newRunBuilder().runId(this.runId).build(); + OpenLineage.JobFacets jobFacets = openLineage.newJobFacetsBuilder().jobType(openLineage.newJobTypeJobFacetBuilder().jobType("Flink Job").integration("Flink").processingType(this.executeMode).build()).build(); + this.job = openLineage.newJobBuilder().name(name).namespace(namespace).facets(jobFacets).build(); + return this; + } + + public LakeSoulInAndOutputJobListener jobName(String name, String namespace) { + this.runId = UUID.randomUUID(); + this.run = openLineage.newRunBuilder().runId(this.runId).build(); + OpenLineage.JobFacets jobFacets = openLineage.newJobFacetsBuilder().jobType(openLineage.newJobTypeJobFacetBuilder().jobType("Flink Job").integration("Flink").processingType(this.executeMode).build()).build(); + this.job = openLineage.newJobBuilder().name(name).namespace(namespace).facets(jobFacets).build(); + return this; + } + + public LakeSoulInAndOutputJobListener inputFacets(String inputName, String inputNamespace, String[] inputSchemaNames, String[] inputSchemaTypes) { + List schemaFields = new ArrayList<>(); + if (inputSchemaNames != null && inputSchemaTypes != null && inputSchemaTypes.length == inputSchemaTypes.length) { + for (int i = 0; i < inputSchemaNames.length; i++) { + schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(inputSchemaNames[i]).type(inputSchemaTypes[i]).build()); + } + } + if (inputSchemaNames != null && inputSchemaTypes == null) { + for (int i = 0; i < inputSchemaNames.length; i++) { + schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(inputSchemaNames[i]).build()); + } + } + + OpenLineage.SchemaDatasetFacet schemaFacet = openLineage.newSchemaDatasetFacetBuilder().fields(schemaFields).build(); + this.inputDatasets = Arrays.asList( + openLineage.newInputDatasetBuilder().name(inputName).namespace(inputNamespace) + .facets( + openLineage.newDatasetFacetsBuilder().schema(schemaFacet).build() + ).build() + ); + return this; + } + public void emit(){ + OpenLineage.RunEvent runStateUpdate = + openLineage.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.COMPLETE) + .eventTime(ZonedDateTime.now()) + .run(this.run) + .job(this.job) + .inputs(this.inputDatasets) + .outputs(this.outputDatasets) + .build(); + if(this.inputDatasets != null || this.outputDatasets != null){ + this.client.emit(runStateUpdate); + } + } + + public LakeSoulInAndOutputJobListener outputFacets(String outputName, String outputNamespace, String[] outputSchemaNames, String[] outputSchemaTypes) { + + List schemaFields = new ArrayList<>(); + if (outputSchemaNames != null && outputSchemaTypes != null && outputSchemaTypes.length == outputSchemaTypes.length) { + for (int i = 0; i < outputSchemaNames.length; i++) { + schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(outputSchemaNames[i]).type(outputSchemaTypes[i]).build()); + } + } + if (outputSchemaNames != null && outputSchemaTypes == null) { + for (int i = 0; i < outputSchemaNames.length; i++) { + schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(outputSchemaNames[i]).build()); + } + } + + OpenLineage.SchemaDatasetFacet schemaFacet = openLineage.newSchemaDatasetFacetBuilder().fields(schemaFields).build(); + this.outputDatasets = Arrays.asList( + openLineage.newOutputDatasetBuilder().name(outputName).namespace(outputNamespace) + .facets( + openLineage.newDatasetFacetsBuilder().schema(schemaFacet).build() + ).build() + ); + return this; + } + + + @Override + public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) { + log.info("------lake onjobsubmit----jobid:{}",jobClient.getJobID()); + + OpenLineage.RunEvent runStateUpdate = + openLineage.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.COMPLETE) + .eventTime(ZonedDateTime.now()) + .run(this.run) + .job(this.job) + .inputs(this.inputDatasets) + .outputs(this.outputDatasets) + .build(); + if(this.inputDatasets != null || this.outputDatasets != null) { + this.client.emit(runStateUpdate); + } + } + + @Override + public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) { + if (jobExecutionResult instanceof DetachedJobExecutionResult) { + log.warn("Job running in detached mode. Set execution.attached to true if you want to emit completed events."); + } else { + OpenLineage.RunEvent runStateUpdate = null; + if (jobExecutionResult != null) { + log.info("------onjobexecuted----jobresult:{}",jobExecutionResult.getJobExecutionResult().toString()); + runStateUpdate = + openLineage.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.COMPLETE) + .eventTime(ZonedDateTime.now()) + .run(this.run) + .job(this.job) + .inputs(this.inputDatasets) + .outputs(this.outputDatasets) + .build(); + } else { + log.info("------onjobexecuted----jobresult:{null}"); + OpenLineage.Run failRun = openLineage.newRunBuilder().runId(this.runId).facets(openLineage.newRunFacetsBuilder().errorMessage(openLineage.newErrorMessageRunFacet(throwable.getMessage(), "JAVA", ExceptionUtils.getStackTrace(throwable))).build()).build(); + runStateUpdate = + openLineage.newRunEventBuilder() + .eventType(OpenLineage.RunEvent.EventType.FAIL) + .eventTime(ZonedDateTime.now()) + .run(failRun) + .job(this.job) + .inputs(this.inputDatasets) + .outputs(this.outputDatasets) + .build(); + } + this.client.emit(runStateUpdate); + } + } + +} diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/utils/FileUtil.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/utils/FileUtil.java index 0aa5dde7c..ce60ad505 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/utils/FileUtil.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/utils/FileUtil.java @@ -52,4 +52,12 @@ public static String replaceDefaultKeywordFromZeppelin(String text) { String replacedStr = pc.matcher(text).replaceAll(replaceText); return replacedStr; } + public static String getSubNameFromBatch(String batchName) { + if (batchName.matches(".*-(\\d+)-(batch$)")) { + int index = batchName.lastIndexOf("-",batchName.lastIndexOf("-")-1); + return batchName.substring(0,index); + }else{ + return batchName; + } + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java index f561bc49b..8a8984000 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java @@ -133,4 +133,7 @@ public Collection getCompatibleStateNames() { // StreamingFileSink return Collections.singleton("lakesoul-cdc-multitable-bucket-states"); } + public BucketsBuilder getBucketsBuilder(){ + return this.bucketsBuilder; + } } diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java index 5f932f525..489e65a77 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java @@ -33,6 +33,9 @@ public DefaultOneTableBulkFormatBuilder( super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf)); this.identity = identity; } + public TableSchemaIdentity getIdentity(){ + return this.identity; + } @Override public AbstractLakeSoulMultiTableSinkWriter createWriter(Sink.InitContext context, int subTaskId) throws diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java index e368847bf..64ddb80f4 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java @@ -11,17 +11,20 @@ import com.dmetasoul.lakesoul.meta.DBUtil; import com.dmetasoul.lakesoul.meta.dao.TableInfoDao; import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.flink.api.connector.sink.GlobalCommitter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener; import org.apache.flink.lakesoul.sink.LakeSoulMultiTablesSink; import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkCommittable; import org.apache.flink.lakesoul.sink.state.LakeSoulMultiTableSinkGlobalCommittable; import org.apache.flink.lakesoul.sink.writer.AbstractLakeSoulMultiTableSinkWriter; import org.apache.flink.lakesoul.tool.FlinkUtil; +import org.apache.flink.lakesoul.tool.JobOptions; import org.apache.flink.lakesoul.types.TableSchemaIdentity; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.table.catalog.ObjectPath; @@ -141,6 +144,14 @@ public List commit( StructType sparkSchema = ArrowUtils.fromArrowSchema(msgSchema); TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(tableName, tableNamespace); + LakeSoulInAndOutputJobListener listener = null; + if (this.conf.getBoolean(JobOptions.lineageOption)) { + listener = new LakeSoulInAndOutputJobListener(this.conf.getString(JobOptions.urlOption)); + String uuid = this.conf.getString(JobOptions.lineageJobUUID); + String jobName = this.conf.getString(JobOptions.linageJobName); + String namespace = this.conf.getString(JobOptions.linageJobNamespace); + listener.jobName(jobName, namespace, uuid); + } if (tableInfo == null) { if (!conf.getBoolean(AUTO_SCHEMA_CHANGE)) { throw new SuppressRestartsException( @@ -171,6 +182,20 @@ public List commit( dbManager.createNewTable(tableId, tableNamespace, tableName, identity.tableLocation, msgSchema.toJson(), properties, partition); + if (this.conf.getBoolean(JobOptions.lineageOption)) { + String domain = dbManager.getNamespaceByNamespace(tableNamespace).getDomain(); + int size = msgSchema.getFields().size(); + String[] colNames = new String[size]; + String[] colTypes = new String[size]; + for (int i = 0; i < size; i++) { + Field field = msgSchema.getFields().get(i); + colNames[i] = field.getName(); + colTypes[i] = field.getType().toString(); + } + listener.outputFacets("lakesoul." + tableNamespace + "." + tableName, domain, colNames,colTypes); + listener.emit(); + } + } else { if (conf.getBoolean(AUTO_SCHEMA_CHANGE)) { DBUtil.TablePartitionKeys diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java index 9fbb63336..a4552a5a1 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java @@ -67,7 +67,9 @@ public LakeSoulLookupTableSource(TableId tableId, catalogTable.getOptions().forEach(configuration::setString); validateLookupConfigurations(); } - + public TableId getTableId(){ + return this.tableId; + } private void validateLookupConfigurations() { String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE); diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java index 05af12a12..12736ebde 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java @@ -80,6 +80,9 @@ public LakeSoulSource(TableId tableId, this.pushedFilter = pushedFilter; this.partitionFilters = partitionFilters; } + public TableId getTableId(){ + return this.tableId; + } @Override public Boundedness getBoundedness() { diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java index 18b4c04f6..033c0f52b 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java @@ -90,7 +90,9 @@ public LakeSoulTableSource(TableId tableId, this.optionParams = optionParams; this.modificationContext = null; } - + public TableId getTableId(){ + return this.tableId; + } @Override public DynamicTableSource copy() { LakeSoulTableSource newInstance = new LakeSoulTableSource(this.tableId, diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java index a86d3b980..f0ab3c42a 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java @@ -11,6 +11,7 @@ import java.time.Duration; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.configuration.description.TextElement.text; public class JobOptions { @@ -120,4 +121,33 @@ public class JobOptions { .defaultValue("file:///") .withDescription("Option to set fs default scheme"); + public static final ConfigOption transportTypeOption = + ConfigOptions.key("openlineage.transport.type").stringType().defaultValue("http"); + public static final ConfigOption urlOption = + ConfigOptions.key("openlineage.transport.url").stringType().noDefaultValue(); + public static final ConfigOption execAttach = + ConfigOptions.key("execution.attached").booleanType().defaultValue(false); + public static final ConfigOption lineageOption = + ConfigOptions.key("openlineage.executed").booleanType().defaultValue(false); + public static final ConfigOption lineageJobUUID = + ConfigOptions.key("openlineage.uuid").stringType().noDefaultValue(); + public static final ConfigOption linageJobName = + ConfigOptions.key("openlineage.jobname").stringType().noDefaultValue(); + public static final ConfigOption linageJobNamespace = + ConfigOptions.key("openlineage.jobnamespace").stringType().noDefaultValue(); + + public static final ConfigOption KUBE_CLUSTER_ID = + key("kubernetes.cluster-id") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. " + + "The id must only contain lowercase alphanumeric characters and \"-\". " + + "The required format is %s. " + + "If not set, the client will automatically generate it with a random ID.", + code("[a-z]([-a-z0-9]*[a-z0-9])")) + .build()); + }