From 348d449d8aec40c977e19cec830efdb48ee5f85d Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware <159135491+sagar-salvi-apptware@users.noreply.github.com> Date: Fri, 19 Jul 2024 14:39:19 +0530 Subject: [PATCH 1/2] fix(ingest/Glue): column upstream lineage between S3 and Glue (#10895) --- .../src/datahub/ingestion/source/aws/glue.py | 89 +- ...glue_mces_golden_table_column_lineage.json | 373 +++++ .../glue/glue_mces_golden_table_lineage.json | 1402 +++++++++++++++++ .../tests/unit/test_glue_source.py | 222 ++- .../tests/unit/test_glue_source_stubs.py | 92 ++ 5 files changed, 2172 insertions(+), 6 deletions(-) create mode 100644 metadata-ingestion/tests/unit/glue/glue_mces_golden_table_column_lineage.json create mode 100644 metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 3b77d58a8711b..f7e1e2610e7e2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -24,6 +24,7 @@ from pydantic import validator from pydantic.fields import Field +from datahub.api.entities.dataset.dataset import Dataset from datahub.configuration.common import AllowDenyPattern from datahub.configuration.source_common import DatasetSourceConfigMixin from datahub.emitter import mce_builder @@ -55,7 +56,11 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws import s3_util from datahub.ingestion.source.aws.aws_common import AwsSourceConfig -from datahub.ingestion.source.aws.s3_util import is_s3_uri, make_s3_urn +from datahub.ingestion.source.aws.s3_util import ( + is_s3_uri, + make_s3_urn, + make_s3_urn_for_lineage, +) from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, @@ -90,6 +95,9 @@ DatasetLineageTypeClass, DatasetProfileClass, DatasetPropertiesClass, + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, GlobalTagsClass, MetadataChangeEventClass, OwnerClass, @@ -97,6 +105,7 @@ OwnershipTypeClass, PartitionSpecClass, PartitionTypeClass, + SchemaMetadataClass, TagAssociationClass, UpstreamClass, UpstreamLineageClass, @@ -171,6 +180,11 @@ class GlueSourceConfig( description="If enabled, delta schemas can be alternatively fetched from table parameters.", ) + include_column_lineage: bool = Field( + default=True, + description="When enabled, column-level lineage will be extracted from the s3.", + ) + def is_profiling_enabled(self) -> bool: return self.profiling is not None and is_profiling_enabled( self.profiling.operation_config @@ -283,6 +297,7 @@ class GlueSource(StatefulIngestionSourceBase): def __init__(self, config: GlueSourceConfig, ctx: PipelineContext): super().__init__(config, ctx) + self.ctx = ctx self.extract_owners = config.extract_owners self.source_config = config self.report = GlueSourceReport() @@ -714,18 +729,43 @@ def get_lineage_if_enabled( dataset_properties: Optional[ DatasetPropertiesClass ] = mce_builder.get_aspect_if_available(mce, DatasetPropertiesClass) + # extract dataset schema aspect + schema_metadata: Optional[ + SchemaMetadataClass + ] = mce_builder.get_aspect_if_available(mce, SchemaMetadataClass) + if dataset_properties and "Location" in dataset_properties.customProperties: location = dataset_properties.customProperties["Location"] if is_s3_uri(location): - s3_dataset_urn = make_s3_urn(location, self.source_config.env) + s3_dataset_urn = make_s3_urn_for_lineage( + location, self.source_config.env + ) + assert self.ctx.graph + schema_metadata_for_s3: Optional[ + SchemaMetadataClass + ] = self.ctx.graph.get_schema_metadata(s3_dataset_urn) + if self.source_config.glue_s3_lineage_direction == "upstream": + fine_grained_lineages = None + if ( + self.source_config.include_column_lineage + and schema_metadata + and schema_metadata_for_s3 + ): + fine_grained_lineages = self.get_fine_grained_lineages( + mce.proposedSnapshot.urn, + s3_dataset_urn, + schema_metadata, + schema_metadata_for_s3, + ) upstream_lineage = UpstreamLineageClass( upstreams=[ UpstreamClass( dataset=s3_dataset_urn, type=DatasetLineageTypeClass.COPY, ) - ] + ], + fineGrainedLineages=fine_grained_lineages or None, ) return MetadataChangeProposalWrapper( entityUrn=mce.proposedSnapshot.urn, @@ -747,6 +787,49 @@ def get_lineage_if_enabled( ).as_workunit() return None + def get_fine_grained_lineages( + self, + dataset_urn: str, + s3_dataset_urn: str, + schema_metadata: SchemaMetadata, + schema_metadata_for_s3: SchemaMetadata, + ) -> Optional[List[FineGrainedLineageClass]]: + def simplify_field_path(field_path): + return Dataset._simplify_field_path(field_path) + + if schema_metadata and schema_metadata_for_s3: + fine_grained_lineages: List[FineGrainedLineageClass] = [] + for field in schema_metadata.fields: + field_path_v1 = simplify_field_path(field.fieldPath) + matching_s3_field = next( + ( + f + for f in schema_metadata_for_s3.fields + if simplify_field_path(f.fieldPath) == field_path_v1 + ), + None, + ) + if matching_s3_field: + fine_grained_lineages.append( + FineGrainedLineageClass( + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, + downstreams=[ + mce_builder.make_schema_field_urn( + dataset_urn, field_path_v1 + ) + ], + upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, + upstreams=[ + mce_builder.make_schema_field_urn( + s3_dataset_urn, + simplify_field_path(matching_s3_field.fieldPath), + ) + ], + ) + ) + return fine_grained_lineages + return None + def _create_profile_mcp( self, mce: MetadataChangeEventClass, diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_column_lineage.json b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_column_lineage.json new file mode 100644 index 0000000000000..fd4109b0f93c9 --- /dev/null +++ b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_column_lineage.json @@ -0,0 +1,373 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "glue", + "env": "PROD", + "database": "flights-database-lineage", + "param1": "value1", + "param2": "value2", + "LocationUri": "s3://test-bucket/test-prefix", + "CreateTime": "June 09, 2021 at 14:14:19" + }, + "name": "flights-database-lineage", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/flights-database-lineage" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:glue" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": "{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}", + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + "Location": "s3://crawler-public-us-west-2/flight/avro/", + "InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe', 'Parameters': {'avro.schema.literal': '{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}', 'serialization.format': '1'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "name": "avro", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:table/flights-database-lineage/avro", + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "flights-database-lineage.avro", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[type=int].yr", + "nullable": true, + "description": "test comment", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].flightdate", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].uniquecarrier", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].airlineid", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].carrier", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].flightnum", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].origin", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].year", + "nullable": true, + "description": "partition test comment", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.DataPlatformInstance": { + "platform": "urn:li:dataPlatform:glue" + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),yr)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),yr)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),flightdate)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),flightdate)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),uniquecarrier)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),uniquecarrier)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),airlineid)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),airlineid)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),carrier)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),carrier)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),flightnum)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),flightnum)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),origin)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),origin)" + ], + "confidenceScore": 1.0 + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json new file mode 100644 index 0000000000000..873776c5777bc --- /dev/null +++ b/metadata-ingestion/tests/unit/glue/glue_mces_golden_table_lineage.json @@ -0,0 +1,1402 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "glue", + "env": "PROD", + "database": "flights-database", + "param1": "value1", + "param2": "value2", + "LocationUri": "s3://test-bucket/test-prefix", + "CreateTime": "June 09, 2021 at 14:14:19" + }, + "name": "flights-database", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/flights-database" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:glue" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "glue", + "env": "PROD", + "database": "test-database", + "CreateTime": "June 01, 2021 at 14:55:02" + }, + "name": "test-database", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/test-database" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:glue" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "glue", + "env": "PROD", + "database": "empty-database", + "CreateTime": "June 01, 2021 at 14:55:13" + }, + "name": "empty-database", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:glue" + } + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:110bc08849d1c1bde5fc345dab5c3ae7", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Database" + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": "{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}", + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + "Location": "s3://crawler-public-us-west-2/flight/avro/", + "InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe', 'Parameters': {'avro.schema.literal': '{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}', 'serialization.format': '1'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "name": "avro", + "qualifiedName": "arn:aws:glue:us-west-2:123412341234:table/flights-database/avro", + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "flights-database.avro", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[type=int].yr", + "nullable": true, + "description": "test comment", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].flightdate", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].uniquecarrier", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].airlineid", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].carrier", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].flightnum", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].origin", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].year", + "nullable": true, + "description": "partition test comment", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.DataPlatformInstance": { + "platform": "urn:li:dataPlatform:glue" + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:baz:bob" + }, + { + "tag": "urn:li:tag:foo:bar" + } + ] + } + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD)", + "type": "COPY" + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test-jsons", + "averageRecordSize": "273", + "classification": "json", + "compressionType": "none", + "objectCount": "1", + "recordCount": "1", + "sizeKey": "273", + "typeOfData": "file", + "Location": "s3://test-glue-jsons/markers/", + "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe', 'Parameters': {'paths': 'markers'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "name": "test_jsons_markers", + "qualifiedName": "arn:aws:glue:us-west-2:795586375822:table/test-database/test_jsons_markers", + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "test-database.test_jsons_markers", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.ArrayType": { + "nestedType": [ + "record" + ] + } + } + }, + "nativeDataType": "array,location:array>>", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array,location:array>>\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=string].name", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=array].[type=double].position", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.ArrayType": { + "nestedType": [ + "double" + ] + } + } + }, + "nativeDataType": "array", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array\"}" + }, + { + "fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=array].[type=double].location", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.ArrayType": { + "nestedType": [ + "double" + ] + } + } + }, + "nativeDataType": "array", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"array\"}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.DataPlatformInstance": { + "platform": "urn:li:dataPlatform:glue" + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:baz:bob" + }, + { + "tag": "urn:li:tag:foo:bar" + } + ] + } + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons/markers,PROD)", + "type": "COPY" + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "test", + "averageRecordSize": "19", + "classification": "parquet", + "compressionType": "none", + "objectCount": "60", + "recordCount": "167497743", + "sizeKey": "4463574900", + "typeOfData": "file", + "Location": "s3://crawler-public-us-west-2/flight/parquet/", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + "Compressed": "False", + "NumberOfBuckets": "-1", + "SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe', 'Parameters': {'serialization.format': '1'}}", + "BucketColumns": "[]", + "SortColumns": "[]", + "StoredAsSubDirectories": "False" + }, + "name": "test_parquet", + "qualifiedName": "arn:aws:glue:us-west-2:795586375822:table/test-database/test_parquet", + "tags": [] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "test-database.test_parquet", + "platform": "urn:li:dataPlatform:glue", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "[version=2.0].[type=int].yr", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].quarter", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].month", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=int].dayofmonth", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "int", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}" + }, + { + "fieldPath": "[version=2.0].[type=string].year", + "nullable": true, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "isPartOfKey": false, + "jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.DataPlatformInstance": { + "platform": "urn:li:dataPlatform:glue" + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:owner", + "type": "DATAOWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:baz:bob" + }, + { + "tag": "urn:li:tag:foo:bar" + } + ] + } + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7" + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/parquet,PROD)", + "type": "COPY" + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": { + "urn": "urn:li:dataFlow:(glue,test-job-1,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataFlowInfo": { + "customProperties": { + "role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "created": "2021-06-10 16:51:25.690000", + "modified": "2021-06-10 16:55:35.307000", + "command": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-1.py" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1", + "description": "The first test job" + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": { + "urn": "urn:li:dataFlow:(glue,test-job-2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataFlowInfo": { + "customProperties": { + "role": "arn:aws:iam::123412341234:role/service-role/AWSGlueServiceRole-glue-crawler", + "created": "2021-06-10 16:58:32.469000", + "modified": "2021-06-10 16:58:32.469000", + "command": "s3://aws-glue-assets-123412341234-us-west-2/scripts/job-2.py" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-2/graph", + "name": "test-job-2", + "description": "The second test job" + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Filter-Transform0_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "f": "lambda row : ()", + "transformation_ctx": "\"Transform0\"", + "transformType": "Filter", + "nodeId": "Transform0_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:Filter-Transform0_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform2_job1)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform1_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform1\"", + "transformType": "ApplyMapping", + "nodeId": "Transform1_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:ApplyMapping-Transform1_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Filter-Transform0_job1)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform2_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform2\"", + "transformType": "ApplyMapping", + "nodeId": "Transform2_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:ApplyMapping-Transform2_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Join-Transform3_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "keys2": "[\"(right) flightdate\"]", + "transformation_ctx": "\"Transform3\"", + "keys1": "[\"yr\"]", + "transformType": "Join", + "nodeId": "Transform3_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:Join-Transform3_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform4_job1)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform4_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\"), (\"flightnum\", \"string\", \"flightnum\", \"string\"), (\"origin\", \"string\", \"origin\", \"string\"), (\"dest\", \"string\", \"dest\", \"string\"), (\"depdelay\", \"int\", \"depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"weatherdelay\", \"int\"), (\"year\", \"string\", \"year\", \"string\")]", + "transformation_ctx": "\"Transform4\"", + "transformType": "ApplyMapping", + "nodeId": "Transform4_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:ApplyMapping-Transform4_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform5_job1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"(right) yr\", \"int\"), (\"flightdate\", \"string\", \"(right) flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"(right) uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"(right) airlineid\", \"int\"), (\"carrier\", \"string\", \"(right) carrier\", \"string\"), (\"flightnum\", \"string\", \"(right) flightnum\", \"string\"), (\"origin\", \"string\", \"(right) origin\", \"string\"), (\"dest\", \"string\", \"(right) dest\", \"string\"), (\"depdelay\", \"int\", \"(right) depdelay\", \"int\"), (\"carrierdelay\", \"int\", \"(right) carrierdelay\", \"int\"), (\"weatherdelay\", \"int\", \"(right) weatherdelay\", \"int\"), (\"year\", \"string\", \"(right) year\", \"string\")]", + "transformation_ctx": "\"Transform5\"", + "transformType": "ApplyMapping", + "nodeId": "Transform5_job1" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-1/graph", + "name": "test-job-1:ApplyMapping-Transform5_job1", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "connection_type": "s3", + "format": "json", + "connection_options": "{'path': 's3://test-glue-jsons/', 'partitionKeys': []}", + "transformation_ctx": "DataSink1" + }, + "tags": [] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SplitFields-Transform0_job2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "paths": "[\"yr\", \"quarter\", \"month\", \"dayofmonth\", \"dayofweek\", \"flightdate\", \"uniquecarrier\"]", + "name2": "\"Transform0Output1\"", + "name1": "\"Transform0Output0\"", + "transformation_ctx": "\"Transform0\"", + "transformType": "SplitFields", + "nodeId": "Transform0_job2" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-2/graph", + "name": "test-job-2:SplitFields-Transform0_job2", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1_job2)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1_job2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "mappings": "[(\"yr\", \"int\", \"yr\", \"int\"), (\"quarter\", \"int\", \"quarter\", \"int\"), (\"month\", \"int\", \"month\", \"int\"), (\"dayofmonth\", \"int\", \"dayofmonth\", \"int\"), (\"dayofweek\", \"int\", \"dayofweek\", \"int\"), (\"flightdate\", \"string\", \"flightdate\", \"string\"), (\"uniquecarrier\", \"string\", \"uniquecarrier\", \"string\"), (\"airlineid\", \"int\", \"airlineid\", \"int\"), (\"carrier\", \"string\", \"carrier\", \"string\")]", + "transformation_ctx": "\"Transform1\"", + "transformType": "ApplyMapping", + "nodeId": "Transform1_job2" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-2/graph", + "name": "test-job-2:ApplyMapping-Transform1_job2", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),FillMissingValues-Transform2_job2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "missing_values_column": "\"dayofmonth\"", + "transformation_ctx": "\"Transform2\"", + "transformType": "FillMissingValues", + "nodeId": "Transform2_job2" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-2/graph", + "name": "test-job-2:FillMissingValues-Transform2_job2", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1_job2)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot": { + "urn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SelectFields-Transform3_job2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.datajob.DataJobInfo": { + "customProperties": { + "paths": "[]", + "transformation_ctx": "\"Transform3\"", + "transformType": "SelectFields", + "nodeId": "Transform3_job2" + }, + "externalUrl": "https://us-west-2.console.aws.amazon.com/gluestudio/home?region=us-west-2#/editor/job/test-job-2/graph", + "name": "test-job-2:SelectFields-Transform3_job2", + "type": { + "string": "GLUE" + } + } + }, + { + "com.linkedin.pegasus2avro.datajob.DataJobInputOutput": { + "inputDatasets": [], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)" + ], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),FillMissingValues-Transform2_job2)" + ] + } + } + ] + } + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:s3,test-glue-jsons,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "connection_type": "s3", + "format": "json", + "connection_options": "{'path': 's3://test-glue-jsons/', 'partitionKeys': []}", + "transformation_ctx": "DataSink0" + }, + "tags": [] + } + } + ] + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(glue,test-job-1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(glue,test-job-2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform1_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform2_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform4_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),ApplyMapping-Transform5_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Filter-Transform0_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-1,PROD),Join-Transform3_job1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),ApplyMapping-Transform1_job2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),FillMissingValues-Transform2_job2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SelectFields-Transform3_job2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(glue,test-job-2,PROD),SplitFields-Transform0_job2)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:baz:bob", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "baz:bob" + } + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:foo:bar", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "foo:bar" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_glue_source.py b/metadata-ingestion/tests/unit/test_glue_source.py index c8b7e021cf5a0..b43db47ae0071 100644 --- a/metadata-ingestion/tests/unit/test_glue_source.py +++ b/metadata-ingestion/tests/unit/test_glue_source.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Any, Dict, Optional, Tuple, Type, cast +from typing import Any, Callable, Dict, Optional, Tuple, Type, cast from unittest.mock import patch import pydantic @@ -8,8 +8,10 @@ from botocore.stub import Stubber from freezegun import freeze_time +import datahub.metadata.schema_classes as models from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.ingestion.sink.file import write_metadata_file from datahub.ingestion.source.aws.glue import GlueSource, GlueSourceConfig from datahub.ingestion.source.state.sql_common_state import ( @@ -35,6 +37,7 @@ get_bucket_tagging, get_databases_delta_response, get_databases_response, + get_databases_response_for_lineage, get_databases_response_with_resource_link, get_dataflow_graph_response_1, get_dataflow_graph_response_2, @@ -47,6 +50,7 @@ get_object_response_1, get_object_response_2, get_object_tagging, + get_tables_lineage_response_1, get_tables_response_1, get_tables_response_2, get_tables_response_for_target_database, @@ -63,19 +67,28 @@ def glue_source( platform_instance: Optional[str] = None, + mock_datahub_graph: Optional[Callable[[DatahubClientConfig], DataHubGraph]] = None, use_s3_bucket_tags: bool = True, use_s3_object_tags: bool = True, extract_delta_schema_from_parameters: bool = False, + emit_s3_lineage: bool = False, + include_column_lineage: bool = False, + extract_transforms: bool = True, ) -> GlueSource: + pipeline_context = PipelineContext(run_id="glue-source-tes") + if mock_datahub_graph: + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) return GlueSource( - ctx=PipelineContext(run_id="glue-source-test"), + ctx=pipeline_context, config=GlueSourceConfig( aws_region="us-west-2", - extract_transforms=True, + extract_transforms=extract_transforms, platform_instance=platform_instance, use_s3_bucket_tags=use_s3_bucket_tags, use_s3_object_tags=use_s3_object_tags, extract_delta_schema_from_parameters=extract_delta_schema_from_parameters, + emit_s3_lineage=emit_s3_lineage, + include_column_lineage=include_column_lineage, ), ) @@ -425,3 +438,206 @@ def test_glue_with_malformed_delta_schema_ingest( output_path=tmp_path / "glue_malformed_delta_mces.json", golden_path=test_resources_dir / "glue_malformed_delta_mces_golden.json", ) + + +@pytest.mark.parametrize( + "platform_instance, mce_file, mce_golden_file", + [ + (None, "glue_mces.json", "glue_mces_golden_table_lineage.json"), + ], +) +@freeze_time(FROZEN_TIME) +def test_glue_ingest_include_table_lineage( + tmp_path: Path, + pytestconfig: PytestConfig, + mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph], + platform_instance: str, + mce_file: str, + mce_golden_file: str, +) -> None: + glue_source_instance = glue_source( + platform_instance=platform_instance, + mock_datahub_graph=mock_datahub_graph, + emit_s3_lineage=True, + ) + + with Stubber(glue_source_instance.glue_client) as glue_stubber: + glue_stubber.add_response("get_databases", get_databases_response, {}) + glue_stubber.add_response( + "get_tables", + get_tables_response_1, + {"DatabaseName": "flights-database"}, + ) + glue_stubber.add_response( + "get_tables", + get_tables_response_2, + {"DatabaseName": "test-database"}, + ) + glue_stubber.add_response( + "get_tables", + {"TableList": []}, + {"DatabaseName": "empty-database"}, + ) + glue_stubber.add_response("get_jobs", get_jobs_response, {}) + glue_stubber.add_response( + "get_dataflow_graph", + get_dataflow_graph_response_1, + {"PythonScript": get_object_body_1}, + ) + glue_stubber.add_response( + "get_dataflow_graph", + get_dataflow_graph_response_2, + {"PythonScript": get_object_body_2}, + ) + + with Stubber(glue_source_instance.s3_client) as s3_stubber: + for _ in range( + len(get_tables_response_1["TableList"]) + + len(get_tables_response_2["TableList"]) + ): + s3_stubber.add_response( + "get_bucket_tagging", + get_bucket_tagging(), + ) + s3_stubber.add_response( + "get_object_tagging", + get_object_tagging(), + ) + + s3_stubber.add_response( + "get_object", + get_object_response_1(), + { + "Bucket": "aws-glue-assets-123412341234-us-west-2", + "Key": "scripts/job-1.py", + }, + ) + s3_stubber.add_response( + "get_object", + get_object_response_2(), + { + "Bucket": "aws-glue-assets-123412341234-us-west-2", + "Key": "scripts/job-2.py", + }, + ) + + mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()] + glue_stubber.assert_no_pending_responses() + s3_stubber.assert_no_pending_responses() + + write_metadata_file(tmp_path / mce_file, mce_objects) + + # Verify the output. + test_resources_dir = pytestconfig.rootpath / "tests/unit/glue" + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_file, + golden_path=test_resources_dir / mce_golden_file, + ) + + +@pytest.mark.parametrize( + "platform_instance, mce_file, mce_golden_file", + [ + (None, "glue_mces.json", "glue_mces_golden_table_column_lineage.json"), + ], +) +@freeze_time(FROZEN_TIME) +def test_glue_ingest_include_column_lineage( + tmp_path: Path, + pytestconfig: PytestConfig, + mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph], + platform_instance: str, + mce_file: str, + mce_golden_file: str, +) -> None: + glue_source_instance = glue_source( + platform_instance=platform_instance, + mock_datahub_graph=mock_datahub_graph, + emit_s3_lineage=True, + include_column_lineage=True, + use_s3_bucket_tags=False, + use_s3_object_tags=False, + extract_transforms=False, + ) + + # fake the server response + def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: + return models.SchemaMetadataClass( + schemaName="crawler-public-us-west-2/flight/avro", + platform="urn:li:dataPlatform:s3", # important <- platform must be an urn + version=0, + hash="", + platformSchema=models.OtherSchemaClass( + rawSchema="__insert raw schema here__" + ), + fields=[ + models.SchemaFieldClass( + fieldPath="yr", + type=models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()), + nativeDataType="int", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="flightdate", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="uniquecarrier", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="airlineid", + type=models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()), + nativeDataType="int", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="carrier", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="flightnum", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="origin", + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + ], + ) + + glue_source_instance.ctx.graph.get_schema_metadata = fake_schema_metadata # type: ignore + + with Stubber(glue_source_instance.glue_client) as glue_stubber: + glue_stubber.add_response( + "get_databases", get_databases_response_for_lineage, {} + ) + glue_stubber.add_response( + "get_tables", + get_tables_lineage_response_1, + {"DatabaseName": "flights-database-lineage"}, + ) + + mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()] + glue_stubber.assert_no_pending_responses() + + write_metadata_file(tmp_path / mce_file, mce_objects) + + # Verify the output. + test_resources_dir = pytestconfig.rootpath / "tests/unit/glue" + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_file, + golden_path=test_resources_dir / mce_golden_file, + ) diff --git a/metadata-ingestion/tests/unit/test_glue_source_stubs.py b/metadata-ingestion/tests/unit/test_glue_source_stubs.py index fc4c9e91410e0..f44a384a02c4a 100644 --- a/metadata-ingestion/tests/unit/test_glue_source_stubs.py +++ b/metadata-ingestion/tests/unit/test_glue_source_stubs.py @@ -880,6 +880,98 @@ ] get_delta_tables_response_2 = {"TableList": delta_tables_2} +get_databases_response_for_lineage = { + "DatabaseList": [ + { + "Name": "flights-database-lineage", + "CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19), + "CreateTableDefaultPermissions": [ + { + "Principal": { + "DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS" + }, + "Permissions": ["ALL"], + } + ], + "CatalogId": "123412341234", + "LocationUri": "s3://test-bucket/test-prefix", + "Parameters": {"param1": "value1", "param2": "value2"}, + }, + ] +} + +tables_lineage_1 = [ + { + "Name": "avro", + "DatabaseName": "flights-database-lineage", + "Owner": "owner", + "CreateTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "UpdateTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "LastAccessTime": datetime.datetime(2021, 6, 9, 14, 17, 35), + "Retention": 0, + "StorageDescriptor": { + "Columns": [ + {"Name": "yr", "Type": "int", "Comment": "test comment"}, + {"Name": "flightdate", "Type": "string"}, + {"Name": "uniquecarrier", "Type": "string"}, + {"Name": "airlineid", "Type": "int"}, + {"Name": "carrier", "Type": "string"}, + {"Name": "flightnum", "Type": "string"}, + {"Name": "origin", "Type": "string"}, + ], + "Location": "s3://crawler-public-us-west-2/flight/avro/", + "InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", + "Compressed": False, + "NumberOfBuckets": -1, + "SerdeInfo": { + "SerializationLibrary": "org.apache.hadoop.hive.serde2.avro.AvroSerDe", + "Parameters": { + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "serialization.format": "1", + }, + }, + "BucketColumns": [], + "SortColumns": [], + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + }, + "StoredAsSubDirectories": False, + }, + "PartitionKeys": [ + {"Name": "year", "Type": "string", "Comment": "partition test comment"} + ], + "TableType": "EXTERNAL_TABLE", + "Parameters": { + "CrawlerSchemaDeserializerVersion": "1.0", + "CrawlerSchemaSerializerVersion": "1.0", + "UPDATED_BY_CRAWLER": "flights-crawler", + "averageRecordSize": "55", + "avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}', + "classification": "avro", + "compressionType": "none", + "objectCount": "30", + "recordCount": "169222196", + "sizeKey": "9503351413", + "typeOfData": "file", + }, + "CreatedBy": "arn:aws:sts::123412341234:assumed-role/AWSGlueServiceRole-flights-crawler/AWS-Crawler", + "IsRegisteredWithLakeFormation": False, + "CatalogId": "123412341234", + } +] +get_tables_lineage_response_1 = {"TableList": tables_lineage_1} + def mock_get_object_response(raw_body: str) -> Dict[str, Any]: """ From 65ef8585bc5c842ef144efa356262ed6ffadfee6 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Fri, 19 Jul 2024 12:09:23 +0200 Subject: [PATCH 2/2] fix(ingest/abs): split abs utils into multiple files (#10945) --- .../datahub/ingestion/source/abs/source.py | 6 +- .../{abs_util.py => abs_folder_utils.py} | 63 ------------------ .../ingestion/source/azure/abs_utils.py | 66 +++++++++++++++++++ .../data_lake_common/data_lake_utils.py | 2 +- .../source/data_lake_common/path_spec.py | 2 +- 5 files changed, 72 insertions(+), 67 deletions(-) rename metadata-ingestion/src/datahub/ingestion/source/azure/{abs_util.py => abs_folder_utils.py} (79%) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py index 07cc694e1b162..c9833f6982599 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/abs/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/abs/source.py @@ -52,13 +52,15 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec from datahub.ingestion.source.abs.report import DataLakeSourceReport -from datahub.ingestion.source.azure.abs_util import ( +from datahub.ingestion.source.azure.abs_folder_utils import ( get_abs_properties, get_abs_tags, + list_folders, +) +from datahub.ingestion.source.azure.abs_utils import ( get_container_name, get_container_relative_path, get_key_prefix, - list_folders, strip_abs_prefix, ) from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py similarity index 79% rename from metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py rename to metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py index 34faa0f0979ef..ce166f2942dac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_folder_utils.py @@ -1,6 +1,4 @@ import logging -import os -import re from typing import Dict, Iterable, List, Optional from azure.storage.blob import BlobProperties @@ -10,67 +8,10 @@ from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass -ABS_PREFIXES_REGEX = re.compile( - r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)" -) - logging.getLogger("py4j").setLevel(logging.ERROR) logger: logging.Logger = logging.getLogger(__name__) -def is_abs_uri(uri: str) -> bool: - return bool(ABS_PREFIXES_REGEX.match(uri)) - - -def get_abs_prefix(abs_uri: str) -> Optional[str]: - result = re.search(ABS_PREFIXES_REGEX, abs_uri) - if result and result.groups(): - return result.group(1) - return None - - -def strip_abs_prefix(abs_uri: str) -> str: - # remove abs prefix https://.blob.core.windows.net - abs_prefix = get_abs_prefix(abs_uri) - if not abs_prefix: - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - length_abs_prefix = len(abs_prefix) - return abs_uri[length_abs_prefix:] - - -def make_abs_urn(abs_uri: str, env: str) -> str: - abs_name = strip_abs_prefix(abs_uri) - - if abs_name.endswith("/"): - abs_name = abs_name[:-1] - - name, extension = os.path.splitext(abs_name) - - if extension != "": - extension = extension[1:] # remove the dot - return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})" - - return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})" - - -def get_container_name(abs_uri: str) -> str: - if not is_abs_uri(abs_uri): - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - return strip_abs_prefix(abs_uri).split("/")[0] - - -def get_key_prefix(abs_uri: str) -> str: - if not is_abs_uri(abs_uri): - raise ValueError( - f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" - ) - return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1] - - def get_abs_properties( container_name: str, blob_name: Optional[str], @@ -280,7 +221,3 @@ def list_folders( this_dict[folder_name] = folder_name yield f"{folder_name}" - - -def get_container_relative_path(abs_uri: str) -> str: - return "/".join(strip_abs_prefix(abs_uri).split("/")[1:]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py new file mode 100644 index 0000000000000..042e1b4ef921f --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py @@ -0,0 +1,66 @@ +import os +import re +from typing import Optional + +# This file should not import any abs spectific modules as we import it in path_spec.py in datat_lake_common.py + +ABS_PREFIXES_REGEX = re.compile( + r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)" +) + + +def is_abs_uri(uri: str) -> bool: + return bool(ABS_PREFIXES_REGEX.match(uri)) + + +def get_abs_prefix(abs_uri: str) -> Optional[str]: + result = re.search(ABS_PREFIXES_REGEX, abs_uri) + if result and result.groups(): + return result.group(1) + return None + + +def strip_abs_prefix(abs_uri: str) -> str: + # remove abs prefix https://.blob.core.windows.net + abs_prefix = get_abs_prefix(abs_uri) + if not abs_prefix: + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + length_abs_prefix = len(abs_prefix) + return abs_uri[length_abs_prefix:] + + +def make_abs_urn(abs_uri: str, env: str) -> str: + abs_name = strip_abs_prefix(abs_uri) + + if abs_name.endswith("/"): + abs_name = abs_name[:-1] + + name, extension = os.path.splitext(abs_name) + + if extension != "": + extension = extension[1:] # remove the dot + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})" + + return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})" + + +def get_container_name(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/")[0] + + +def get_key_prefix(abs_uri: str) -> str: + if not is_abs_uri(abs_uri): + raise ValueError( + f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}" + ) + return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1] + + +def get_container_relative_path(abs_uri: str) -> str: + return "/".join(strip_abs_prefix(abs_uri).split("/")[1:]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py index 2ebdd2b4126bb..f594c61f4e5ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/data_lake_utils.py @@ -16,7 +16,7 @@ get_s3_prefix, is_s3_uri, ) -from datahub.ingestion.source.azure.abs_util import ( +from datahub.ingestion.source.azure.abs_utils import ( get_abs_prefix, get_container_name, get_container_relative_path, diff --git a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py index e21cdac1edf75..71765f9be5e32 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py +++ b/metadata-ingestion/src/datahub/ingestion/source/data_lake_common/path_spec.py @@ -11,7 +11,7 @@ from datahub.configuration.common import ConfigModel from datahub.ingestion.source.aws.s3_util import is_s3_uri -from datahub.ingestion.source.azure.abs_util import is_abs_uri +from datahub.ingestion.source.azure.abs_utils import is_abs_uri from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri # hide annoying debug errors from py4j