Skip to content

Commit

Permalink
fix(spark): Finegrained lineage is emitted on the DataJob and not on …
Browse files Browse the repository at this point in the history
…the emitted Datasets. (#11956)
  • Loading branch information
treff7es authored Jan 6, 2025
1 parent 91c1c6b commit 3316d40
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 84 deletions.
101 changes: 58 additions & 43 deletions metadata-integration/java/acryl-spark-lineage/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package datahub.spark;

import static com.linkedin.metadata.Constants.*;
import static datahub.spark.converter.SparkStreamingEventToDatahub.*;
import static io.datahubproject.openlineage.converter.OpenLineageToDataHub.*;
import static io.datahubproject.openlineage.utils.DatahubUtils.*;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.GlobalTags;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.JacksonDataTemplateCodec;
import com.linkedin.data.template.StringMap;
import com.linkedin.dataprocess.DataProcessInstanceRelationships;
import com.linkedin.dataprocess.RunResultType;
Expand Down Expand Up @@ -62,12 +68,23 @@ public class DatahubEventEmitter extends EventEmitter {
private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
private SparkLineageConf datahubConf;
private static final int DEFAULT_TIMEOUT_SEC = 10;
private final ObjectMapper objectMapper;
private final JacksonDataTemplateCodec dataTemplateCodec;

private final EventFormatter eventFormatter = new EventFormatter();

public DatahubEventEmitter(SparkOpenLineageConfig config, String applicationJobName)
throws URISyntaxException {
super(config, applicationJobName);
objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
objectMapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
dataTemplateCodec = new JacksonDataTemplateCodec(objectMapper.getFactory());
}

private Optional<Emitter> getEmitter() {
Expand Down Expand Up @@ -407,7 +424,14 @@ protected void emitMcps(List<MetadataChangeProposal> mcps) {
.map(
mcp -> {
try {
log.info("emitting mcpw: " + mcp);
if (this.datahubConf.isLogMcps()) {
DataMap map = mcp.data();
String serializedMCP = dataTemplateCodec.mapToString(map);
log.info("emitting mcpw: {}", serializedMCP);
} else {
log.info(
"emitting aspect: {} for urn: {}", mcp.getAspectName(), mcp.getEntityUrn());
}
return emitter.get().emit(mcp);
} catch (IOException ioException) {
log.error("Failed to emit metadata to DataHub", ioException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SparkConfigParser {
public static final String FILE_EMITTER_FILE_NAME = "file.filename";
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding";
public static final String CONFIG_LOG_MCPS = "log.mcps";

public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
Expand All @@ -51,6 +52,7 @@ public class SparkConfigParser {

public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
public static final String LEGACY_LINEAGE_CLEANUP = "legacyLineageCleanup.enabled";
public static final String DISABLE_SYMLINK_RESOLUTION = "disableSymlinkResolution";

public static final String STAGE_METADATA_COALESCING = "stage_metadata_coalescing";
Expand Down Expand Up @@ -158,6 +160,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
Config sparkConfig, SparkAppContext sparkAppContext) {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.isSpark(true);
builder.filePartitionRegexpPattern(
SparkConfigParser.getFilePartitionRegexpPattern(sparkConfig));
builder.fabricType(SparkConfigParser.getCommonFabricType(sparkConfig));
Expand All @@ -172,6 +175,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig));
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.removeLegacyLineage(SparkConfigParser.isLegacyLineageCleanupEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
builder.lowerCaseDatasetUrns(SparkConfigParser.isLowerCaseDatasetUrns(sparkConfig));
try {
Expand Down Expand Up @@ -311,6 +315,13 @@ public static boolean isDatasetMaterialize(Config datahubConfig) {
&& datahubConfig.getBoolean(DATASET_MATERIALIZE_KEY);
}

public static boolean isLogMcps(Config datahubConfig) {
if (datahubConfig.hasPath(CONFIG_LOG_MCPS)) {
return datahubConfig.getBoolean(CONFIG_LOG_MCPS);
}
return true;
}

public static boolean isIncludeSchemaMetadata(Config datahubConfig) {
if (datahubConfig.hasPath(DATASET_INCLUDE_SCHEMA_METADATA)) {
return datahubConfig.getBoolean(DATASET_INCLUDE_SCHEMA_METADATA);
Expand Down Expand Up @@ -352,6 +363,14 @@ public static boolean isPatchEnabled(Config datahubConfig) {
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}

public static boolean isLegacyLineageCleanupEnabled(Config datahubConfig) {
if (!datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)) {
return false;
}
return datahubConfig.hasPath(LEGACY_LINEAGE_CLEANUP)
&& datahubConfig.getBoolean(LEGACY_LINEAGE_CLEANUP);
}

public static boolean isDisableSymlinkResolution(Config datahubConfig) {
if (!datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SparkLineageConf {
final DatahubOpenlineageConfig openLineageConf;
@Builder.Default final boolean coalesceEnabled = true;
@Builder.Default final boolean emitCoalescePeriodically = false;
@Builder.Default final boolean logMcps = true;
final SparkAppContext sparkAppContext;
final DatahubEmitterConfig datahubEmitterConfig;
@Builder.Default final List<String> tags = new LinkedList<>();
Expand All @@ -32,6 +33,7 @@ public static SparkLineageConf toSparkLineageConf(
SparkConfigParser.sparkConfigToDatahubOpenlineageConf(sparkConfig, sparkAppContext);
builder.openLineageConf(datahubOpenlineageConfig);
builder.coalesceEnabled(SparkConfigParser.isCoalesceEnabled(sparkConfig));
builder.logMcps(SparkConfigParser.isLogMcps(sparkConfig));
if (SparkConfigParser.getTags(sparkConfig) != null) {
builder.tags(Arrays.asList(Objects.requireNonNull(SparkConfigParser.getTags(sparkConfig))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,4 +814,32 @@ public void testProcessGCSInputsOutputs() throws URISyntaxException, IOException
dataset.getUrn().toString());
}
}

public void testProcessMappartitionJob() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.lowerCaseDatasetUrns(true);
builder.materializeDataset(true);
builder.includeSchemaMetadata(true);
builder.isSpark(true);

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/map_partition_job.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

assertEquals(1, datahubJob.getInSet().size());
for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my_dir/my_file.csv,DEV)",
dataset.getUrn().toString());
}
assertEquals(0, datahubJob.getOutSet().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"eventTime": "2024-11-20T12:59:29.059Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"eventType": "START",
"run": {
"runId": "01902a1e-0b05-750e-b38d-439998f7a853",
"facets": {
"parent": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-1/ParentRunFacet.json#/$defs/ParentRunFacet",
"run": {
"runId": "01902a1e-0b05-750e-b38d-439998f7a853"
},
"job": {
"namespace": "default",
"name": "spark_context_session"
}
},
"processing_engine": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/1-1-1/ProcessingEngineRunFacet.json#/$defs/ProcessingEngineRunFacet",
"version": "3.4.2",
"name": "spark"
},
"spark_jobDetails": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"jobId": 0
},
"spark_properties": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
"properties": {
"spark.master": "yarn",
"spark.app.name": "SparkContextSession"
}
}
}
},
"job": {
"namespace": "default",
"name": "spark_context_session.map_partitions_parallel_collection",
"facets": {
"jobType": {
"_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.24.2/integration/spark",
"_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
"processingType": "BATCH",
"integration": "SPARK",
"jobType": "RDD_JOB"
}
}
},
"inputs": [
{
"namespace": "s3://my-bucket",
"name": "my_dir/my_file.csv"
}
],
"outputs": [
{
"namespace": "s3://my-bucket",
"name": "my_dir/my_file.csv"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
@Getter
@ToString
public class DatahubOpenlineageConfig {
@Builder.Default private final boolean isSpark = false;
@Builder.Default private final boolean isStreaming = false;
@Builder.Default private final String pipelineName = null;
private final String platformInstance;
Expand All @@ -34,6 +35,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;
@Builder.Default private final boolean lowerCaseDatasetUrns = false;
@Builder.Default private final boolean removeLegacyLineage = false;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,9 +675,30 @@ private static void convertJobToDataJob(
datahubJob.setJobInfo(dji);
DataJobInputOutput inputOutput = new DataJobInputOutput();

boolean inputsEqualOutputs = false;
if ((datahubConf.isSpark())
&& ((event.getInputs() != null && event.getOutputs() != null)
&& (event.getInputs().size() == event.getOutputs().size()))) {
inputsEqualOutputs =
event.getInputs().stream()
.map(OpenLineage.Dataset::getName)
.collect(Collectors.toSet())
.equals(
event.getOutputs().stream()
.map(OpenLineage.Dataset::getName)
.collect(Collectors.toSet()));
if (inputsEqualOutputs) {
log.info(
"Inputs equals Outputs: {}. This is most probably because of an rdd map operation and we only process Inputs",
inputsEqualOutputs);
}
}

processJobInputs(datahubJob, event, datahubConf);

processJobOutputs(datahubJob, event, datahubConf);
if (!inputsEqualOutputs) {
processJobOutputs(datahubJob, event, datahubConf);
}

DataProcessInstanceRunEvent dpire = processDataProcessInstanceResult(event);
datahubJob.setDataProcessInstanceRunEvent(dpire);
Expand Down
Loading

0 comments on commit 3316d40

Please sign in to comment.