Skip to content

Commit

Permalink
add glossary term
Browse files Browse the repository at this point in the history
can ingest certified metrics

refactor

Null check

only sync physical metrics
  • Loading branch information
llance committed Jan 15, 2025
1 parent fc49916 commit 4967d03
Showing 1 changed file with 82 additions and 10 deletions.
92 changes: 82 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
make_data_platform_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_domain_urn, make_term_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -35,6 +37,7 @@
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
get_platform_from_sqlalchemy_uri,
Expand All @@ -48,6 +51,8 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata._schema_classes import AuditStampClass, GlossaryTermsClass, GlossaryTermAssociationClass, \
GlossaryTermInfoClass
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand Down Expand Up @@ -80,7 +85,6 @@

PAGE_SIZE = 25


chart_type_from_viz_type = {
"line": ChartTypeClass.LINE,
"big_number": ChartTypeClass.LINE,
Expand All @@ -97,7 +101,6 @@
"box_plot": ChartTypeClass.BAR,
}


platform_without_databases = ["druid"]


Expand Down Expand Up @@ -231,6 +234,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.sink_config = ctx.pipeline_config.sink.config
self.session = self.login()

def login(self) -> requests.Session:
Expand Down Expand Up @@ -318,7 +322,7 @@ def get_dataset_info(self, dataset_id: int) -> dict:
return dataset_response.json()

def get_datasource_urn_from_id(
self, dataset_response: dict, platform_instance: str
self, dataset_response: dict, platform_instance: str
) -> str:
schema_name = dataset_response.get("result", {}).get("schema")
table_name = dataset_response.get("result", {}).get("table_name")
Expand Down Expand Up @@ -351,7 +355,7 @@ def get_datasource_urn_from_id(
raise ValueError("Could not construct dataset URN")

def construct_dashboard_from_api_data(
self, dashboard_data: dict
self, dashboard_data: dict
) -> DashboardSnapshot:
dashboard_urn = make_dashboard_urn(
platform=self.platform,
Expand Down Expand Up @@ -560,8 +564,8 @@ def gen_schema_fields(self, column_data: List[Dict[str, str]]) -> List[SchemaFie
return schema_fields

def gen_schema_metadata(
self,
dataset_response: dict,
self,
dataset_response: dict,
) -> SchemaMetadata:
dataset_response = dataset_response.get("result", {})
column_data = dataset_response.get("columns", [])
Expand All @@ -583,14 +587,75 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
env=self.config.env,
)

def check_if_term_exists(self, term_urn):
graph = DataHubGraph(
DatahubClientConfig(server=self.sink_config.get("server", ""), token=self.sink_config.get("token", "")))
# Query multiple aspects from entity
result = graph.get_entity_semityped(
entity_urn=term_urn,
aspects=["glossaryTermInfo"],
)

if result.get("glossaryTermInfo"):
return True
return False

def parse_glossary_terms_from_metrics(self, metrics, last_modified) -> GlossaryTermsClass:
glossary_term_urns = []
for metric in metrics:
expression = metric.get("expression", "")
certification_details = metric.get("extra", "")
metric_name = metric.get("metric_name", "")
description = metric.get("description", "")
term_urn = make_term_urn(metric_name)

if self.check_if_term_exists(term_urn):
logger.info(f"Term {term_urn} already exists")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))
continue

term_properties_aspect = GlossaryTermInfoClass(
name=metric_name,
definition=f"Description: {description} \nSql Expression: {expression} \nCertification details: {certification_details}",
termSource="",
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=term_urn,
aspect=term_properties_aspect,
)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""))
rest_emitter.emit(event)
logger.info(f"Created Glossary term {term_urn}")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))

return GlossaryTermsClass(terms=glossary_term_urns, auditStamp=last_modified)

def _is_certified_metric(self, response_result: dict) -> bool:
# We only sync in certified metrics for physical dataset
metrics = response_result.get("metrics", {})
extra = response_result.get("extra", {})
kind = response_result.get("kind")
if metrics and extra and "certification" in extra and kind == "physical":
return True
else:
return False

def construct_dataset_from_dataset_data(
self, dataset_data: dict
self, dataset_data: dict
) -> DatasetSnapshot:
dataset_response = self.get_dataset_info(dataset_data.get("id"))
dataset = SupersetDataset(**dataset_response["result"])
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
datasource_urn = self.get_datasource_urn_from_id(dataset_response, self.platform)
now = datetime.now().strftime("%I:%M%p on %B %d, %Y")
modified_ts = int(
dp.parse(dataset_data.get("changed_on") or now).timestamp() * 1000
)
modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}"
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)

dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}"

Expand All @@ -602,6 +667,7 @@ def construct_dataset_from_dataset_data(
else None,
externalUrl=dataset_url,
)

aspects_items: List[Any] = []
aspects_items.extend(
[
Expand All @@ -610,6 +676,12 @@ def construct_dataset_from_dataset_data(
]
)

response_result = dataset_response.get("result", {})

if self._is_certified_metric(response_result):
glossary_terms = self.parse_glossary_terms_from_metrics(response_result.get("metrics", {}), last_modified)
aspects_items.append(glossary_terms)

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
Expand Down

0 comments on commit 4967d03

Please sign in to comment.