Skip to content

Commit

Permalink
feat(ingest/databricks): view upstream lineage for hive metastore
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Jan 19, 2024
1 parent 4138b2f commit 15d88e8
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@


class HiveMetastoreProxy(Closeable):
# TODO: Support for view lineage using SQL parsing
# Why not use hive ingestion source directly here ?
# 1. hive ingestion source assumes 2-level namespace heirarchy and currently
# there is no other intermediate interface except sqlalchemy inspector
Expand Down
99 changes: 98 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
add_dataset_to_container,
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -67,6 +68,7 @@
DATA_TYPE_REGISTRY,
Catalog,
Column,
CustomCatalogType,
Metastore,
Notebook,
NotebookId,
Expand Down Expand Up @@ -104,6 +106,12 @@
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlglot_lineage import (
SchemaResolver,
SqlParsingResult,
sqlglot_lineage,
view_definition_lineage_helper,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -137,6 +145,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
unity_catalog_api_proxy: UnityCatalogApiProxy
platform: str = "databricks"
platform_instance_name: Optional[str]
sql_parser_schema_resolver: Optional[SchemaResolver] = None

def get_report(self) -> UnityCatalogReport:
return self.report
Expand Down Expand Up @@ -179,6 +188,9 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig):
self.table_refs: Set[TableReference] = set()
self.view_refs: Set[TableReference] = set()
self.notebooks: FileBackedDict[Notebook] = FileBackedDict()
self.view_definitions: FileBackedDict[
tuple[TableReference, str]
] = FileBackedDict()

# Global map of tables, for profiling
self.tables: FileBackedDict[Table] = FileBackedDict()
Expand All @@ -191,6 +203,13 @@ def init_hive_metastore_proxy(self):
self.config.get_sql_alchemy_url(HIVE_METASTORE), self.config.options
)
self.report.hive_metastore_catalog_found = True

if self.config.include_table_lineage:
self.sql_parser_schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
except Exception as e:
logger.debug("Exception", exc_info=True)
self.warn(
Expand Down Expand Up @@ -243,6 +262,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

yield from self.process_metastores()

yield from self.get_view_lineage()

if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Notebook lineage")
for notebook in self.notebooks.values():
Expand Down Expand Up @@ -304,7 +325,6 @@ def process_notebooks(self) -> Iterable[MetadataWorkUnit]:
yield from self._gen_notebook_workunits(notebook)

def _gen_notebook_workunits(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]:

properties = {"path": notebook.path}
if notebook.language:
properties["language"] = notebook.language.value
Expand Down Expand Up @@ -449,6 +469,17 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
table.ref, self.notebooks[str(notebook_id)]
)

# Sql parsing is required only for hive metastore view lineage
if (
self.sql_parser_schema_resolver
and table.schema.catalog.type == CustomCatalogType.HIVE_METASTORE_CATALOG
):
self.sql_parser_schema_resolver.add_schema_metadata(
dataset_urn, schema_metadata
)
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)

yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
Expand Down Expand Up @@ -828,8 +859,74 @@ def _create_schema_field(column: Column) -> List[SchemaFieldClass]:
)
]

def _run_sql_parser(
self, view_ref: TableReference, query: str, schema_resolver: SchemaResolver
) -> Optional[SqlParsingResult]:
raw_lineage = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=view_ref.catalog,
default_schema=view_ref.schema,
)
view_urn = self.gen_dataset_urn(view_ref)

if raw_lineage.debug_info.table_error:
logger.debug(
f"Failed to parse lineage for view {view_ref}: "
f"{raw_lineage.debug_info.table_error}"
)
self.report.num_view_definitions_failed_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Table-level sql parsing error for view {view_ref}: {raw_lineage.debug_info.table_error}"
)
return None

elif raw_lineage.debug_info.column_error:
self.report.num_view_definitions_failed_column_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Column-level sql parsing error for view {view_ref}: {raw_lineage.debug_info.column_error}"
)
else:
self.report.num_view_definitions_parsed += 1
return view_definition_lineage_helper(raw_lineage, view_urn)

def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
if not (
self.config.include_hive_metastore
and self.config.include_table_lineage
and self.sql_parser_schema_resolver
):
return
# This is only used for parsing view lineage. Usage, Operations are emitted elsewhere
builder = SqlParsingBuilder(
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
for dataset_name in self.view_definitions.keys():
view_ref, view_definition = self.view_definitions[dataset_name]
result = self._run_sql_parser(
view_ref,
view_definition,
self.sql_parser_schema_resolver,
)
if result and result.out_tables:
# This does not yield any workunits but we use
# yield here to execute this method
yield from builder.process_sql_parsing_result(
result=result,
query=view_definition,
is_view_ddl=True,
include_column_lineage=self.config.include_view_column_lineage,
)
yield from builder.gen_workunits()

def close(self):
if self.hive_metastore_proxy:
self.hive_metastore_proxy.close()
if self.view_definitions:
self.view_definitions.close()
if self.sql_parser_schema_resolver:
self.sql_parser_schema_resolver.close()

super().close()
Original file line number Diff line number Diff line change
Expand Up @@ -3463,6 +3463,66 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD)",
"type": "VIEW"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),betStatusId)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),betStatusId)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),channelId)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),channelId)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),combination)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),combination)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "unity-catalog-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.quickstart_schema.quickstart_table,PROD)",
Expand Down

0 comments on commit 15d88e8

Please sign in to comment.