diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 97851e90e860ed..e51c884c297d7e 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -24,7 +24,7 @@ When running jobs using spark-submit, the agent needs to be configured in the co ```text #Configuring DataHub spark agent jar -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server http://localhost:8080 ``` @@ -32,7 +32,7 @@ spark.datahub.rest.server http://localhost:8080 ## spark-submit command line ```sh -spark-submit --packages io.acryl:acryl-spark-lineage:0.2.16 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py +spark-submit --packages io.acryl:acryl-spark-lineage:0.2.17 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py ``` ### Configuration Instructions: Amazon EMR @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html) ```text -spark.jars.packages io.acryl:acryl-spark-lineage:0.2.16 +spark.jars.packages io.acryl:acryl-spark-lineage:0.2.17 spark.extraListeners datahub.spark.DatahubSparkListener spark.datahub.rest.server https://your_datahub_host/gms #If you have authentication set up then you also need to specify the Datahub access token @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh spark = SparkSession.builder .master("spark://spark-master:7077") .appName("test-application") -.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.16") +.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.17") .config("spark.extraListeners", "datahub.spark.DatahubSparkListener") .config("spark.datahub.rest.server", "http://localhost:8080") .enableHiveSupport() @@ -79,7 +79,7 @@ appName("test-application") config("spark.master","spark://spark-master:7077") . -config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.16") +config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.17") . config("spark.extraListeners","datahub.spark.DatahubSparkListener") @@ -158,45 +158,47 @@ information like tokens. ## Configuration Options -| Field | Required | Default | Description | -|--------------------------------------------------------|----------|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 | -| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | -| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | -| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | -| spark.datahub.rest.token | | | Authentication token. | -| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | -| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || -| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | -| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | -| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | -| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set | -| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set | -| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client | -| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` | -| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | -| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | -| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | +| Field | Required | Default | Description | +|--------------------------------------------------------|----------|-----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:acryl-spark-lineage:0.2.15 | +| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | +| spark.datahub.emitter | | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | +| spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | +| spark.datahub.rest.token | | | Authentication token. | +| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || +| spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | +| spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | +| spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | +| spark.datahub.kafka.bootstrap | | | The Kafka bootstrap server url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_url | | | The Schema registry url to use if the Kafka emitter is set | +| spark.datahub.kafka.schema_registry_config. | | | Additional config to pass in to the Schema Registry Client | +| spark.datahub.kafka.producer_config. | | | Additional config to pass in to the Kafka producer. For example: `--conf "spark.datahub.kafka.producer_config.client.id=my_client_id"` | +| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance (it is usefult to set if you have it in your glue ingestion) | +| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | +| spark.datahub.metadata.dataset.hivePlatformAlias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | | spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | -| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | -| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | -| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | -| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | -| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | -| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | -| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | -| spark.datahub.file_partition_regexp | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | -| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | -| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | -| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | -| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | -| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | -| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | -| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set | -| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set | -| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | - +| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | +| spark.datahub.coalesce_jobs | | true | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | +| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | +| spark.datahub.metadata.dataset.materialize | | false | Materialize Datasets in DataHub | +| spark.datahub.platform.s3.path_spec_list | | | List of pathspec per platform | +| spark.datahub.metadata.dataset.include_schema_metadata | false | | Emit dataset schema metadata based on the spark execution. It is recommended to get schema information from platform specific DataHub sources as this is less reliable | +| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name | +| spark.datahub.file_partition_regexp | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` | +| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow | +| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow | +| spark.datahub.stage_metadata_coalescing | | | Normally it coalesces and sends metadata at the onApplicationEnd event which is never called on Databricks or on Glue. You should enable this on Databricks if you want coalesced run. | +| spark.datahub.patch.enabled | | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | +| spark.datahub.metadata.dataset.lowerCaseUrns | | false | Set this to true to lowercase dataset urns. By default, it is disabled. | +| spark.datahub.disableSymlinkResolution | | false | Set this to true if you prefer using the s3 location instead of the Hive table. By default, it is disabled. | +| spark.datahub.s3.bucket | | | The name of the bucket where metadata will be written if s3 emitter is set | +| spark.datahub.s3.prefix | | | The prefix for the file where metadata will be written on s3 if s3 emitter is set | +| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | +| spark.datahub.s3.filename | | | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | +|spark.datahub.log.mcps | | true | Set this to true to log MCPS to the log. By default, it is enabled. | +|spark.datahub.legacyLineageCleanup.enabled| | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. | ## What to Expect: The Metadata Model @@ -358,6 +360,19 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b + ## Changelog +### Version 0.2.17 +- *Major changes*: + - Finegrained lineage is emitted on the DataJob and not on the emitted Datasets. This is the correct behaviour which was not correct earlier. This causes earlier emitted finegrained lineages won't be overwritten by the new ones. + You can remove the old lineages by setting `spark.datahub.legacyLineageCleanup.enabled=true`. Make sure you have the latest server if you enable with patch support. (this was introduced since 0.2.17-rc5) + +- *Changes*: + - OpenLineage 1.25.0 upgrade + - Add option to disable chunked encoding in the datahub rest sink -> `spark.datahub.rest.disable_chunked_encoding` + - Add option to specify the mcp kafka topic for the datahub kafka sink -> `spark.datahub.kafka.mcp_topic` + - Add option to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob -> `spark.datahub.legacyLineageCleanup.enabled` +- *Fixes*: + - Fix handling map transformation in the lineage. Earlier it generated wrong lineage for map transformation. + ### Version 0.2.16 - Remove logging DataHub config into logs diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index 0bcc7db9e87408..84f397226ce912 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -1,12 +1,18 @@ package datahub.spark; +import static com.linkedin.metadata.Constants.*; import static datahub.spark.converter.SparkStreamingEventToDatahub.*; import static io.datahubproject.openlineage.converter.OpenLineageToDataHub.*; import static io.datahubproject.openlineage.utils.DatahubUtils.*; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.StreamReadConstraints; +import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.common.GlobalTags; import com.linkedin.common.UrnArray; import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.data.DataMap; +import com.linkedin.data.template.JacksonDataTemplateCodec; import com.linkedin.data.template.StringMap; import com.linkedin.dataprocess.DataProcessInstanceRelationships; import com.linkedin.dataprocess.RunResultType; @@ -62,12 +68,23 @@ public class DatahubEventEmitter extends EventEmitter { private final Map schemaMap = new HashMap<>(); private SparkLineageConf datahubConf; private static final int DEFAULT_TIMEOUT_SEC = 10; + private final ObjectMapper objectMapper; + private final JacksonDataTemplateCodec dataTemplateCodec; private final EventFormatter eventFormatter = new EventFormatter(); public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName) throws URISyntaxException { super(config, applicationJobName); + objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + int maxSize = + Integer.parseInt( + System.getenv() + .getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE)); + objectMapper + .getFactory() + .setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build()); + dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory()); } private Optional getEmitter() { @@ -407,7 +424,14 @@ protected void emitMcps(List mcps) { .map( mcp -> { try { - log.info("emitting mcpw: " + mcp); + if (this.datahubConf.isLogMcps()) { + DataMap map = mcp.data(); + String serializedMCP = dataTemplateCodec.mapToString(map); + log.info("emitting mcpw: {}", serializedMCP); + } else { + log.info( + "emitting aspect: {} for urn: {}", mcp.getAspectName(), mcp.getEntityUrn()); + } return emitter.get().emit(mcp); } catch (IOException ioException) { log.error("Failed to emit metadata to DataHub", ioException); diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 3860285083c4bb..824cd1a687b264 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -31,6 +31,7 @@ public class SparkConfigParser { public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding"; + public static final String CONFIG_LOG_MCPS = "log.mcps"; public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; @@ -51,6 +52,7 @@ public class SparkConfigParser { public static final String COALESCE_KEY = "coalesce_jobs"; public static final String PATCH_ENABLED = "patch.enabled"; + public static final String LEGACY_LINEAGE_CLEANUP = "legacyLineageCleanup.enabled"; public static final String DISABLE_SYMLINK_RESOLUTION = "disableSymlinkResolution"; public static final String STAGE_METADATA_COALESCING = "stage_metadata_coalescing"; @@ -158,6 +160,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( Config sparkConfig, SparkAppContext sparkAppContext) { DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = DatahubOpenlineageConfig.builder(); + builder.isSpark(true); builder.filePartitionRegexpPattern( SparkConfigParser.getFilePartitionRegexpPattern(sparkConfig)); builder.fabricType(SparkConfigParser.getCommonFabricType(sparkConfig)); @@ -172,6 +175,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf( builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig)); builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig)); builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig)); + builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig)); builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig)); builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig)); try { @@ -311,6 +315,13 @@ public static boolean isDatasetMaterialize(Config datahubConfig) { && datahubConfig.getBoolean(DATASET_MATERIALIZE_KEY); } + public static boolean isLogMcps(Config datahubConfig) { + if (datahubConfig.hasPath(CONFIG_LOG_MCPS)) { + return datahubConfig.getBoolean(CONFIG_LOG_MCPS); + } + return true; + } + public static boolean isIncludeSchemaMetadata(Config datahubConfig) { if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) { return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA); @@ -352,6 +363,14 @@ public static boolean isPatchEnabled(Config datahubConfig) { return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED); } + public static boolean isLegacyLineageCleanupEnabled(Config datahubConfig) { + if (!datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)) { + return false; + } + return datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP) + && datahubConfig.getBoolean(LEGACY_LINEAGE_CLEANUP); + } + public static boolean isDisableSymlinkResolution(Config datahubConfig) { if (!datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION)) { return false; diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java index 014cff873bbde9..96afe729b82c00 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkLineageConf.java @@ -17,6 +17,7 @@ public class SparkLineageConf { final DatahubOpenlineageConfig openLineageConf; @Builder.Default final boolean coalesceEnabled = true; @Builder.Default final boolean emitCoalescePeriodically = false; + @Builder.Default final boolean logMcps = true; final SparkAppContext sparkAppContext; final DatahubEmitterConfig datahubEmitterConfig; @Builder.Default final List tags = new LinkedList<>(); @@ -32,6 +33,7 @@ public static SparkLineageConf toSparkLineageConf( SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext); builder.openLineageConf(datahubOpenlineageConfig); builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig)); + builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig)); if (SparkConfigParser.getTags(sparkConfig) != null) { builder.tags(Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig)))); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java index ef2b17e9932f2f..b9a142364d4e89 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java +++ b/metadata-integration/java/acryl-spark-lineage/src/test/java/datahub/spark/OpenLineageEventToDatahubTest.java @@ -814,4 +814,32 @@ public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException dataset.getUrn().toString()); } } + + public void testProcessMappartitionJob() throws URISyntaxException, IOException { + DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder = + DatahubOpenlineageConfig.builder(); + builder.fabricType(FabricType.DEV); + builder.lowerCaseDatasetUrns(true); + builder.materializeDataset(true); + builder.includeSchemaMetadata(true); + builder.isSpark(true); + + String olEvent = + IOUtils.toString( + this.getClass().getResourceAsStream("/ol_events/map_partition_job.json"), + StandardCharsets.UTF_8); + + OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent); + DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build()); + + assertNotNull(datahubJob); + + assertEquals(1, datahubJob.getInSet().size()); + for (DatahubDataset dataset : datahubJob.getInSet()) { + assertEquals( + "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my_dir/my_file.csv,DEV)", + dataset.getUrn().toString()); + } + assertEquals(0, datahubJob.getOutSet().size()); + } } diff --git a/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json new file mode 100644 index 00000000000000..39560a782840ce --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/test/resources/ol_events/map_partition_job.json @@ -0,0 +1,66 @@ +{ + "eventTime": "2024-11-20T12:59:29.059Z", + "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent", + "eventType": "START", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853", + "facets": { + "parent": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet", + "run": { + "runId": "01902a1e-0b05-750e-b38d-439998f7a853" + }, + "job": { + "namespace": "default", + "name": "spark_context_session" + } + }, + "processing_engine": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet", + "version": "3.4.2", + "name": "spark" + }, + "spark_jobDetails": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "jobId": 0 + }, + "spark_properties": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", + "properties": { + "spark.master": "yarn", + "spark.app.name": "SparkContextSession" + } + } + } + }, + "job": { + "namespace": "default", + "name": "spark_context_session.map_partitions_parallel_collection", + "facets": { + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark", + "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "RDD_JOB" + } + } + }, + "inputs": [ + { + "namespace": "s3://my-bucket", + "name": "my_dir/my_file.csv" + } + ], + "outputs": [ + { + "namespace": "s3://my-bucket", + "name": "my_dir/my_file.csv" + } + ] +} \ No newline at end of file diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java index 5abb3c90d232bd..c725673eae47b5 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java @@ -16,6 +16,7 @@ @Getter @ToString public class DatahubOpenlineageConfig { + @Builder.Default private final boolean isSpark = false; @Builder.Default private final boolean isStreaming = false; @Builder.Default private final String pipelineName = null; private final String platformInstance; @@ -34,6 +35,7 @@ public class DatahubOpenlineageConfig { @Builder.Default private Map urnAliases = new HashMap<>(); @Builder.Default private final boolean disableSymlinkResolution = false; @Builder.Default private final boolean lowerCaseDatasetUrns = false; + @Builder.Default private final boolean removeLegacyLineage = false; public List getPathSpecsForPlatform(String platform) { if ((pathSpecs == null) || (pathSpecs.isEmpty())) { diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java index 9237ee60f473b4..9fcfc68bd03f55 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java @@ -675,9 +675,30 @@ private static void convertJobToDataJob( datahubJob.setJobInfo(dji); DataJobInputOutput inputOutput = new DataJobInputOutput(); + boolean inputsEqualOutputs = false; + if ((datahubConf.isSpark()) + && ((event.getInputs() != null && event.getOutputs() != null) + && (event.getInputs().size() == event.getOutputs().size()))) { + inputsEqualOutputs = + event.getInputs().stream() + .map(OpenLineage.Dataset::getName) + .collect(Collectors.toSet()) + .equals( + event.getOutputs().stream() + .map(OpenLineage.Dataset::getName) + .collect(Collectors.toSet())); + if (inputsEqualOutputs) { + log.info( + "Inputs equals Outputs: {}. This is most probably because of an rdd map operation and we only process Inputs", + inputsEqualOutputs); + } + } + processJobInputs(datahubJob, event, datahubConf); - processJobOutputs(datahubJob, event, datahubConf); + if (!inputsEqualOutputs) { + processJobOutputs(datahubJob, event, datahubConf); + } DataProcessInstanceRunEvent dpire = processDataProcessInstanceResult(event); datahubJob.setDataProcessInstanceRunEvent(dpire); diff --git a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java index 60caaae359677f..e2aa2c3a04c406 100644 --- a/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java +++ b/metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java @@ -28,7 +28,10 @@ import com.linkedin.dataprocess.DataProcessInstanceRelationships; import com.linkedin.dataprocess.DataProcessInstanceRunEvent; import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageArray; import com.linkedin.dataset.Upstream; +import com.linkedin.dataset.UpstreamArray; +import com.linkedin.dataset.UpstreamLineage; import com.linkedin.domain.Domains; import com.linkedin.metadata.aspect.patch.builder.DataJobInputOutputPatchBuilder; import com.linkedin.metadata.aspect.patch.builder.GlobalTagsPatchBuilder; @@ -167,11 +170,34 @@ public List toMcps(DatahubOpenlineageConfig config) thro return mcps; } + private FineGrainedLineageArray mergeFinegrainedLineages() { + FineGrainedLineageArray fgls = new FineGrainedLineageArray(); + + for (DatahubDataset dataset : inSet) { + if (dataset.lineage != null && dataset.lineage.getFineGrainedLineages() != null) { + dataset.lineage.getFineGrainedLineages().stream() + .filter(Objects::nonNull) + .forEach(fgls::add); + } + } + + for (DatahubDataset dataset : outSet) { + if (dataset.lineage != null && dataset.lineage.getFineGrainedLineages() != null) { + dataset.lineage.getFineGrainedLineages().stream() + .filter(Objects::nonNull) + .forEach(fgls::add); + } + } + + return fgls; + } + private void generateDataJobInputOutputMcp( EdgeArray inputEdges, EdgeArray outputEdges, DatahubOpenlineageConfig config, List mcps) { + DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(); log.info("Adding DataJob edges to {}", jobUrn); if (config.isUsePatch() && (!parentJobs.isEmpty() || !inSet.isEmpty() || !outSet.isEmpty())) { @@ -186,6 +212,27 @@ private void generateDataJobInputOutputMcp( for (DataJobUrn parentJob : parentJobs) { dataJobInputOutputPatchBuilder.addInputDatajobEdge(parentJob); } + + FineGrainedLineageArray fgls = mergeFinegrainedLineages(); + fgls.forEach( + fgl -> { + Objects.requireNonNull(fgl.getUpstreams()) + .forEach( + upstream -> { + Objects.requireNonNull(fgl.getDownstreams()) + .forEach( + downstream -> { + dataJobInputOutputPatchBuilder.addFineGrainedUpstreamField( + upstream, + fgl.getConfidenceScore(), + StringUtils.defaultIfEmpty( + fgl.getTransformOperation(), "TRANSFORM"), + downstream, + fgl.getQuery()); + }); + }); + }); + MetadataChangeProposal dataJobInputOutputMcp = dataJobInputOutputPatchBuilder.build(); log.info( "dataJobInputOutputMcp: {}", @@ -195,6 +242,8 @@ private void generateDataJobInputOutputMcp( mcps.add(dataJobInputOutputPatchBuilder.build()); } else { + FineGrainedLineageArray fgls = mergeFinegrainedLineages(); + dataJobInputOutput.setFineGrainedLineages(fgls); dataJobInputOutput.setInputDatasetEdges(inputEdges); dataJobInputOutput.setInputDatasets(new DatasetUrnArray()); dataJobInputOutput.setOutputDatasetEdges(outputEdges); @@ -235,6 +284,49 @@ private void generateDataProcessInstanceMcp( generateDataProcessInstanceRelationship(mcps); } + private void deleteOldDatasetLineage( + DatahubDataset dataset, DatahubOpenlineageConfig config, List mcps) { + if (dataset.getLineage() != null) { + if (config.isUsePatch()) { + if (!dataset.getLineage().getUpstreams().isEmpty()) { + UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = + new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); + for (Upstream upstream : dataset.getLineage().getUpstreams()) { + upstreamLineagePatchBuilder.removeUpstream(upstream.getDataset()); + } + + log.info("Removing FineGrainedLineage to {}", dataset.getUrn()); + for (FineGrainedLineage fineGrainedLineage : + Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { + for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { + for (Urn downstream : Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { + upstreamLineagePatchBuilder.removeFineGrainedUpstreamField( + upstream, + StringUtils.defaultIfEmpty( + fineGrainedLineage.getTransformOperation(), "TRANSFORM"), + downstream, + null); + } + } + } + MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); + log.info( + "upstreamLineagePatch: {}", + mcp.getAspect().getValue().asString(Charset.defaultCharset())); + mcps.add(mcp); + } + } else { + if (!dataset.getLineage().getUpstreams().isEmpty()) { + // Remove earlier created UpstreamLineage which most probably was created by the plugin. + UpstreamLineage upstreamLineage = new UpstreamLineage(); + upstreamLineage.setUpstreams(new UpstreamArray()); + upstreamLineage.setFineGrainedLineages(new FineGrainedLineageArray()); + addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, upstreamLineage, mcps); + } + } + } + } + private Pair processDownstreams( DatahubOpenlineageConfig config, List mcps) { UrnArray outputUrnArray = new UrnArray(); @@ -263,43 +355,13 @@ private Pair processDownstreams( dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getSchemaMetadata(), mcps); } - if (dataset.getLineage() != null) { - if (config.isUsePatch()) { - if (!dataset.getLineage().getUpstreams().isEmpty()) { - UpstreamLineagePatchBuilder upstreamLineagePatchBuilder = - new UpstreamLineagePatchBuilder().urn(dataset.getUrn()); - for (Upstream upstream : dataset.getLineage().getUpstreams()) { - upstreamLineagePatchBuilder.addUpstream( - upstream.getDataset(), upstream.getType()); - } - - log.info("Adding FineGrainedLineage to {}", dataset.getUrn()); - for (FineGrainedLineage fineGrainedLineage : - Objects.requireNonNull(dataset.getLineage().getFineGrainedLineages())) { - for (Urn upstream : Objects.requireNonNull(fineGrainedLineage.getUpstreams())) { - for (Urn downstream : - Objects.requireNonNull(fineGrainedLineage.getDownstreams())) { - upstreamLineagePatchBuilder.addFineGrainedUpstreamField( - upstream, - fineGrainedLineage.getConfidenceScore(), - StringUtils.defaultIfEmpty( - fineGrainedLineage.getTransformOperation(), "TRANSFORM"), - downstream, - null); - } - } - } - MetadataChangeProposal mcp = upstreamLineagePatchBuilder.build(); - log.info( - "upstreamLineagePatch: {}", - mcp.getAspect().getValue().asString(Charset.defaultCharset())); - mcps.add(mcp); - } - } else { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); - } + // Remove lineage which was added by older plugin that set lineage on Datasets and not on + // DataJobs + if (config.isRemoveLegacyLineage()) { + deleteOldDatasetLineage(dataset, config, mcps); } }); + return Pair.of(outputUrnArray, outputEdges); } @@ -330,10 +392,6 @@ private Pair processUpstreams( addAspectToMcps( dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getSchemaMetadata(), mcps); } - - if (dataset.getLineage() != null) { - addAspectToMcps(dataset.getUrn(), DATASET_ENTITY_TYPE, dataset.getLineage(), mcps); - } }); return Pair.of(inputUrnArray, inputEdges); }