Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preset dataset lineage #12331

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Dict, Type, Union
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayType,
BooleanType,
BytesType,
NullType,
NumberType,
RecordType,
StringType,
TimeType,
)

SUPERSET_FIELD_TYPE_MAPPINGS: Dict[
str,
Type[
Union[
ArrayType,
BytesType,
BooleanType,
NumberType,
RecordType,
StringType,
TimeType,
NullType,
]
],
] = {
"BYTES": BytesType,
"BOOL": BooleanType,
"BOOLEAN": BooleanType,
"DOUBLE": NumberType,
"DOUBLE PRECISION": NumberType,
"DECIMAL": NumberType,
"NUMERIC": NumberType,
"BIGNUMERIC": NumberType,
"BIGDECIMAL": NumberType,
"FLOAT64": NumberType,
"INT": NumberType,
"INT64": NumberType,
"SMALLINT": NumberType,
"INTEGER": NumberType,
"BIGINT": NumberType,
"TINYINT": NumberType,
"BYTEINT": NumberType,
"STRING": StringType,
"TIME": TimeType,
"TIMESTAMP": TimeType,
"DATE": TimeType,
"DATETIME": TimeType,
"GEOGRAPHY": NullType,
"JSON": NullType,
"INTERVAL": NullType,
"ARRAY": ArrayType,
"STRUCT": RecordType,
"CHARACTER VARYING": StringType,
"CHARACTER": StringType,
"CHAR": StringType,
"TIMESTAMP WITHOUT TIME ZONE": TimeType,
"REAL": NumberType,
"VARCHAR": StringType,
"TIMESTAMPTZ": TimeType,
"GEOMETRY": NullType,
"HLLSKETCH": NullType,
"TIMETZ": TimeType,
"VARBYTE": StringType,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.ingestion.source.superset.superset import SupersetConfig, SupersetSource
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -80,8 +80,6 @@ class PresetSource(SupersetSource):
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_term_urn,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -35,6 +39,7 @@
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import (
get_platform_from_sqlalchemy_uri,
Expand All @@ -48,6 +53,7 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata._schema_classes import GlossaryTermAssociationClass, GlossaryTermInfoClass, AuditStampClass
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand All @@ -72,6 +78,10 @@
ChartTypeClass,
DashboardInfoClass,
DatasetPropertiesClass,
GlossaryTermsClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.utilities import config_clean
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand Down Expand Up @@ -231,7 +241,9 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
cached_domains=[domain_id for domain_id in self.config.domain],
graph=self.ctx.graph,
)
self.sink_config = ctx.pipeline_config.sink.config
self.session = self.login()
self.owners_id_to_email_dict = self.build_preset_owner_dict()

def login(self) -> requests.Session:
login_response = requests.post(
Expand Down Expand Up @@ -325,7 +337,7 @@ def get_dataset_info(self, dataset_id: int) -> dict:
def get_datasource_urn_from_id(
self, dataset_response: dict, platform_instance: str
) -> str:
schema_name = dataset_response.get("result", {}).get("schema")
schema_name = dataset_response.get("result", {}).get("../schema")
table_name = dataset_response.get("result", {}).get("table_name")
database_id = dataset_response.get("result", {}).get("database", {}).get("id")
platform = self.get_platform_from_database_id(database_id)
Expand Down Expand Up @@ -355,6 +367,61 @@ def get_datasource_urn_from_id(
)
raise ValueError("Could not construct dataset URN")

def _parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]:
owners_id_to_email_dict = {}
for owner_data in payload["result"]:
owner_email = owner_data.get("extra", {}).get("email", None)
owner_id = owner_data.get("value", None)

if owner_id and owner_email:
owners_id_to_email_dict[owner_id] = owner_email
return owners_id_to_email_dict

def build_preset_owner_dict(self) -> Dict[str, str]:
owners_id_to_email_dict = {}
dataset_payload = self._get_all_entity_owners("dataset")
chart_payload = self._get_all_entity_owners("chart")
dashboard_payload = self._get_all_entity_owners("dashboard")

owners_id_to_email_dict.update(self._parse_owner_payload(dataset_payload))
owners_id_to_email_dict.update(self._parse_owner_payload(chart_payload))
owners_id_to_email_dict.update(self._parse_owner_payload(dashboard_payload))
return owners_id_to_email_dict

def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]:
owners_urn_list = []
for owner in data.get("owners", []):
owner_id = owner.get("id")
owner_email = self.owners_id_to_email_dict.get(owner_id)
if owner_email is not None:
owners_urn = make_user_urn(owner_email)
owners_urn_list.append(owners_urn)
return owners_urn_list

def _get_all_entity_owners(self, entity: str) -> Dict[str, Any]:
current_page = 1
total_owners = PAGE_SIZE
all_owners = []

while (current_page - 1) * PAGE_SIZE <= total_owners:
full_owners_response = self.session.get(
f"{self.config.connect_uri}/api/v1/{entity}/related/owners",
params=f"q=(page:{current_page},page_size:{PAGE_SIZE})",
)
if full_owners_response.status_code != 200:
logger.warning(
f"Failed to get {entity} data: {full_owners_response.text}"
)
current_page += 1
continue

payload = full_owners_response.json()
total_owners = payload.get("count", total_owners)
all_owners.extend(payload.get("result", []))
current_page += 1
# return combined payload
return {"result": all_owners, "count": total_owners}

def construct_dashboard_from_api_data(
self, dashboard_data: dict
) -> DashboardSnapshot:
Expand Down Expand Up @@ -427,6 +494,20 @@ def construct_dashboard_from_api_data(
customProperties=custom_properties,
)
dashboard_snapshot.aspects.append(dashboard_info)

dashboard_owners_list = self.build_owners_urn_list(dashboard_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dashboard_owners_list or [])
],
)
dashboard_snapshot.aspects.append(owners_info)

return dashboard_snapshot

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -474,10 +555,15 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}"

datasource_id = chart_data.get("datasource_id")
dataset_response = self.get_dataset_info(datasource_id)
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
if not datasource_id:
logger.debug(
f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info'
)
datasource_urn = None
else:
dataset_response = self.get_dataset_info(datasource_id)
ds_urn = self.get_datasource_urn_from_id(dataset_response, self.platform)
datasource_urn = [ds_urn] if not isinstance(ds_urn, list) else ds_urn

params = json.loads(chart_data.get("params", "{}"))
metrics = [
Expand Down Expand Up @@ -522,10 +608,24 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
title=title,
lastModified=last_modified,
chartUrl=chart_url,
inputs=[datasource_urn] if datasource_urn else None,
inputs=datasource_urn,
customProperties=custom_properties,
)
chart_snapshot.aspects.append(chart_info)

chart_owners_list = self.build_owners_urn_list(chart_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (chart_owners_list or [])
],
)
chart_snapshot.aspects.append(owners_info)

return chart_snapshot

def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -588,6 +688,54 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
env=self.config.env,
)

def check_if_term_exists(self, term_urn):
graph = DataHubGraph(
DatahubClientConfig(server=self.sink_config.get("server", ""), token=self.sink_config.get("token", "")))
# Query multiple aspects from entity
result = graph.get_entity_semityped(
entity_urn=term_urn,
aspects=["glossaryTermInfo"],
)

if result.get("glossaryTermInfo"):
return True
return False

def parse_glossary_terms_from_metrics(self, metrics, last_modified) -> GlossaryTermsClass:
glossary_term_urns = []
for metric in metrics:
## We only sync in certified metrics
if "certified_by" in metric.get("extra", {}):
expression = metric.get("expression", "")
certification_details = metric.get("extra", "")
metric_name = metric.get("metric_name", "")
description = metric.get("description", "")
term_urn = make_term_urn(metric_name)

if self.check_if_term_exists(term_urn):
logger.info(f"Term {term_urn} already exists")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))
continue

term_properties_aspect = GlossaryTermInfoClass(
definition=f"Description: {description} \nSql Expression: {expression} \nCertification details: {certification_details}",
termSource="",
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=term_urn,
aspect=term_properties_aspect,
)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server=self.sink_config.get("server", ""),
token=self.sink_config.get("token", ""))
rest_emitter.emit(event)
logger.info(f"Created Glossary term {term_urn}")
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))

return GlossaryTermsClass(terms=glossary_term_urns, auditStamp=last_modified)

def construct_dataset_from_dataset_data(
self, dataset_data: dict
) -> DatasetSnapshot:
Expand All @@ -596,7 +744,11 @@ def construct_dataset_from_dataset_data(
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)

modified_ts = int(
dp.parse(dataset_data.get("changed_on_utc", "now")).timestamp() * 1000
)
modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}"
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)
dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}"

dataset_info = DatasetPropertiesClass(
Expand All @@ -615,6 +767,25 @@ def construct_dataset_from_dataset_data(
]
)

dataset_owners_list = self.build_owners_urn_list(dataset_data)
owners_info = OwnershipClass(
owners=[
OwnerClass(
owner=urn,
# default as Technical Owners from Preset
type=OwnershipTypeClass.TECHNICAL_OWNER,
)
for urn in (dataset_owners_list or [])
],
)
aspects_items.append(owners_info)

metrics = dataset_response.get("result", {}).get("metrics", [])

if metrics:
glossary_terms = self.parse_glossary_terms_from_metrics(metrics, last_modified)
aspects_items.append(glossary_terms)

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
Expand All @@ -627,6 +798,7 @@ def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
except Exception as e:
self.report.warning(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_preset_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datahub.ingestion.source.preset import PresetConfig
from datahub.ingestion.source.superset.preset import PresetConfig


def test_default_values():
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_superset_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datahub.ingestion.source.superset import SupersetConfig
from datahub.ingestion.source.superset.superset import SupersetConfig


def test_default_values():
Expand Down
Loading