From 93a4a53e3490a0bc763ab739fde4a4c92dea7987 Mon Sep 17 00:00:00 2001 From: Enos Date: Wed, 27 Nov 2024 16:35:53 -0500 Subject: [PATCH 1/8] split ownership pr --- .../src/datahub/ingestion/source/superset.py | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 1da233bf0b22ab..6ac93fe82e3add 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -22,6 +22,7 @@ make_dataset_urn, make_dataset_urn_with_platform_instance, make_domain_urn, + make_user_urn, ) from datahub.emitter.mcp_builder import add_domain_to_entity_wu from datahub.ingestion.api.common import PipelineContext @@ -72,6 +73,9 @@ ChartTypeClass, DashboardInfoClass, DatasetPropertiesClass, + OwnershipClass, + OwnerClass, + OwnershipTypeClass, ) from datahub.utilities import config_clean from datahub.utilities.registries.domain_registry import DomainRegistry @@ -232,6 +236,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): graph=self.ctx.graph, ) self.session = self.login() + self.owners_dict = self.build_preset_owner_dict() def login(self) -> requests.Session: login_response = requests.post( @@ -354,6 +359,58 @@ def get_datasource_urn_from_id( env=self.config.env, ) raise ValueError("Could not construct dataset URN") + + def parse_owner_payload(self, payload: dict, owners_dict: dict) -> None: + for owner_data in payload.get("result", []): + email = owner_data.get("extra", {}).get("email") + value = owner_data.get("value") + + if value and email and value not in owners_dict: + owners_dict[value] = email + + def build_preset_owner_dict(self) -> Dict[str, str]: + owners_dict = {} + dataset_payload = self.get_all_entity_owners("dataset") + chart_payload = self.get_all_entity_owners("chart") + dashboard_payload = self.get_all_entity_owners("dashboard") + + owners_dict.update(self.parse_owner_payload(dataset_payload)) + owners_dict.update(self.parse_owner_payload(chart_payload)) + owners_dict.update(self.parse_owner_payload(dashboard_payload)) + + return owners_dict + + def build_owners_urn_list(self, data: dict) -> List[str]: + owners_urn_list = [] + for owner in data.get("owners", []): + owner_id = owner.get("id") + owner_email = self.owners_dict.get(owner_id) + if owner_email is not None: + owners_urn = make_user_urn(owner_email) + owners_urn_list.append(owners_urn) + return owners_urn_list + + def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: + current_page = 1 + total_owners = PAGE_SIZE + all_owners = [] + + while (current_page - 1) * PAGE_SIZE <= total_owners: + full_owners_response = self.session.get( + f"{self.config.connect_uri}/api/v1/{entity}/related/owners", + params=f"q=(page:{current_page},page_size:{PAGE_SIZE})", + ) + if full_owners_response.status_code != 200: + logger.warning(f"Failed to get {entity} data: {full_owners_response.text}") + full_owners_response.raise_for_status() + + payload = full_owners_response.json() + total_owners = payload.get("count", total_owners) + all_owners.extend(payload.get("result", [])) + current_page += 1 + + #return combined payload + return {"result": all_owners, "count": total_owners} def construct_dashboard_from_api_data( self, dashboard_data: dict @@ -427,6 +484,20 @@ def construct_dashboard_from_api_data( customProperties=custom_properties, ) dashboard_snapshot.aspects.append(dashboard_info) + + dashboard_owners_list = self.build_owners_urn_list(dashboard_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (dashboard_owners_list or []) + ], + ) + dashboard_snapshot.aspects.append(owners_info) + return dashboard_snapshot def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: @@ -526,6 +597,20 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: customProperties=custom_properties, ) chart_snapshot.aspects.append(chart_info) + + chart_owners_list = self.build_owners_urn_list(chart_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (chart_owners_list or []) + ], + ) + chart_snapshot.aspects.append(owners_info) + return chart_snapshot def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: @@ -615,6 +700,19 @@ def construct_dataset_from_dataset_data( ] ) + dataset_owners_list = self.build_owners_urn_list(dataset_data) + owners_info = OwnershipClass( + owners=[ + OwnerClass( + owner=urn, + #default as Technical Owners from Preset + type=OwnershipTypeClass.TECHNICAL_OWNER, + ) + for urn in (dataset_owners_list or []) + ], + ) + aspects_items.append(owners_info) + dataset_snapshot = DatasetSnapshot( urn=datasource_urn, aspects=aspects_items, From 66b22579ee8b88e4bca667d4a0226579712c31e5 Mon Sep 17 00:00:00 2001 From: Enos Date: Fri, 13 Dec 2024 16:02:05 -0500 Subject: [PATCH 2/8] parseowner fix - not working --- .../src/datahub/ingestion/source/superset.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 6ac93fe82e3add..5dc6d67d539f5c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -360,14 +360,16 @@ def get_datasource_urn_from_id( ) raise ValueError("Could not construct dataset URN") - def parse_owner_payload(self, payload: dict, owners_dict: dict) -> None: + def parse_owner_payload(self, payload: dict) -> dict: + owners_dict = {} for owner_data in payload.get("result", []): email = owner_data.get("extra", {}).get("email") value = owner_data.get("value") - if value and email and value not in owners_dict: + if value and email: owners_dict[value] = email - + return owners_dict + def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict = {} dataset_payload = self.get_all_entity_owners("dataset") From b084b7b079c5efbd2306008482b9dc6682a0ab70 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 16 Dec 2024 14:45:46 -0500 Subject: [PATCH 3/8] skip fetching for dataset when datasource_id is null --- .../src/datahub/ingestion/source/superset.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 5dc6d67d539f5c..7a5099125bbac4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -547,10 +547,13 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}" datasource_id = chart_data.get("datasource_id") - dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = self.get_datasource_urn_from_id( - dataset_response, self.platform - ) + if not datasource_id: + logger.warning(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + else: + dataset_response = self.get_dataset_info(datasource_id) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) params = json.loads(chart_data.get("params", "{}")) metrics = [ From aca7830023ac76bf298633546ff7edb30b52ddb7 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 16 Dec 2024 14:54:31 -0500 Subject: [PATCH 4/8] fix for local variable 'datasource_urn' referenced before assignment --- .../src/datahub/ingestion/source/superset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 7a5099125bbac4..14e7f7657bb678 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -548,12 +548,13 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: datasource_id = chart_data.get("datasource_id") if not datasource_id: - logger.warning(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + logger.debug(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + datasource_urn = None else: dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = self.get_datasource_urn_from_id( + datasource_urn = list(self.get_datasource_urn_from_id( dataset_response, self.platform - ) + )) params = json.loads(chart_data.get("params", "{}")) metrics = [ @@ -598,7 +599,7 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: title=title, lastModified=last_modified, chartUrl=chart_url, - inputs=[datasource_urn] if datasource_urn else None, + inputs=datasource_urn, customProperties=custom_properties, ) chart_snapshot.aspects.append(chart_info) From 2bceb4666992dff01cbfa54b19f794e43dfdf76a Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Tue, 17 Dec 2024 11:33:20 -0500 Subject: [PATCH 5/8] lint --- .../src/datahub/ingestion/source/preset.py | 2 - .../src/datahub/ingestion/source/superset.py | 48 +++++++++---------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py index 7b0bc89648c529..322a3b42339d1c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/preset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -80,8 +80,6 @@ class PresetSource(SupersetSource): platform = "preset" def __init__(self, ctx: PipelineContext, config: PresetConfig): - logger.info(f"ctx is {ctx}") - super().__init__(ctx, config) self.config = config self.report = StaleEntityRemovalSourceReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 14e7f7657bb678..d3457863fc89d3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -73,8 +73,8 @@ ChartTypeClass, DashboardInfoClass, DatasetPropertiesClass, - OwnershipClass, OwnerClass, + OwnershipClass, OwnershipTypeClass, ) from datahub.utilities import config_clean @@ -359,18 +359,18 @@ def get_datasource_urn_from_id( env=self.config.env, ) raise ValueError("Could not construct dataset URN") - - def parse_owner_payload(self, payload: dict) -> dict: + + def parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: owners_dict = {} - for owner_data in payload.get("result", []): - email = owner_data.get("extra", {}).get("email") - value = owner_data.get("value") + for owner_data in payload["result"]: + email = owner_data["extra"]["email"] + value = owner_data["value"] if value and email: owners_dict[value] = email return owners_dict - - def build_preset_owner_dict(self) -> Dict[str, str]: + + def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict = {} dataset_payload = self.get_all_entity_owners("dataset") chart_payload = self.get_all_entity_owners("chart") @@ -379,10 +379,9 @@ def build_preset_owner_dict(self) -> Dict[str, str]: owners_dict.update(self.parse_owner_payload(dataset_payload)) owners_dict.update(self.parse_owner_payload(chart_payload)) owners_dict.update(self.parse_owner_payload(dashboard_payload)) - return owners_dict - - def build_owners_urn_list(self, data: dict) -> List[str]: + + def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]: owners_urn_list = [] for owner in data.get("owners", []): owner_id = owner.get("id") @@ -391,8 +390,8 @@ def build_owners_urn_list(self, data: dict) -> List[str]: owners_urn = make_user_urn(owner_email) owners_urn_list.append(owners_urn) return owners_urn_list - - def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: + + def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: current_page = 1 total_owners = PAGE_SIZE all_owners = [] @@ -403,15 +402,16 @@ def get_all_entity_owners(self, entity: str) -> Iterable[Dict]: params=f"q=(page:{current_page},page_size:{PAGE_SIZE})", ) if full_owners_response.status_code != 200: - logger.warning(f"Failed to get {entity} data: {full_owners_response.text}") + logger.warning( + f"Failed to get {entity} data: {full_owners_response.text}" + ) full_owners_response.raise_for_status() payload = full_owners_response.json() total_owners = payload.get("count", total_owners) all_owners.extend(payload.get("result", [])) current_page += 1 - - #return combined payload + # return combined payload return {"result": all_owners, "count": total_owners} def construct_dashboard_from_api_data( @@ -492,7 +492,7 @@ def construct_dashboard_from_api_data( owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (dashboard_owners_list or []) @@ -548,13 +548,14 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: datasource_id = chart_data.get("datasource_id") if not datasource_id: - logger.debug(f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info') + logger.debug( + f'chart {chart_data["id"]} has no datasource_id, skipping fetching dataset info' + ) datasource_urn = None else: dataset_response = self.get_dataset_info(datasource_id) - datasource_urn = list(self.get_datasource_urn_from_id( - dataset_response, self.platform - )) + ds_urn = self.get_datasource_urn_from_id(dataset_response, self.platform) + datasource_urn = [ds_urn] if not isinstance(ds_urn, list) else ds_urn params = json.loads(chart_data.get("params", "{}")) metrics = [ @@ -609,7 +610,7 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (chart_owners_list or []) @@ -711,14 +712,13 @@ def construct_dataset_from_dataset_data( owners=[ OwnerClass( owner=urn, - #default as Technical Owners from Preset + # default as Technical Owners from Preset type=OwnershipTypeClass.TECHNICAL_OWNER, ) for urn in (dataset_owners_list or []) ], ) aspects_items.append(owners_info) - dataset_snapshot = DatasetSnapshot( urn=datasource_urn, aspects=aspects_items, From 44199965db3014a0c7ca2872bd26e2daa8c36bb6 Mon Sep 17 00:00:00 2001 From: Enos Date: Wed, 18 Dec 2024 15:06:36 -0500 Subject: [PATCH 6/8] review comments --- .../src/datahub/ingestion/source/superset.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index d3457863fc89d3..17b95fc11b5b29 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -236,7 +236,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): graph=self.ctx.graph, ) self.session = self.login() - self.owners_dict = self.build_preset_owner_dict() + self.owners_id_to_email_dict = self.build_preset_owner_dict() def login(self) -> requests.Session: login_response = requests.post( @@ -360,38 +360,38 @@ def get_datasource_urn_from_id( ) raise ValueError("Could not construct dataset URN") - def parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: - owners_dict = {} + def _parse_owner_payload(self, payload: Dict[str, Any]) -> Dict[str, str]: + owners_id_to_email_dict = {} for owner_data in payload["result"]: - email = owner_data["extra"]["email"] - value = owner_data["value"] + owner_email = owner_data.get("extra", {}).get("email", None) + owner_id = owner_data.get("value", None) - if value and email: - owners_dict[value] = email - return owners_dict + if owner_id and owner_email: + owners_id_to_email_dict[owner_id] = owner_email + return owners_id_to_email_dict def build_preset_owner_dict(self) -> Dict[str, str]: - owners_dict = {} - dataset_payload = self.get_all_entity_owners("dataset") - chart_payload = self.get_all_entity_owners("chart") - dashboard_payload = self.get_all_entity_owners("dashboard") + owners_id_to_email_dict = {} + dataset_payload = self._get_all_entity_owners("dataset") + chart_payload = self._get_all_entity_owners("chart") + dashboard_payload = self._get_all_entity_owners("dashboard") - owners_dict.update(self.parse_owner_payload(dataset_payload)) - owners_dict.update(self.parse_owner_payload(chart_payload)) - owners_dict.update(self.parse_owner_payload(dashboard_payload)) - return owners_dict + owners_id_to_email_dict.update(self._parse_owner_payload(dataset_payload)) + owners_id_to_email_dict.update(self._parse_owner_payload(chart_payload)) + owners_id_to_email_dict.update(self._parse_owner_payload(dashboard_payload)) + return owners_id_to_email_dict def build_owners_urn_list(self, data: Dict[str, Any]) -> List[str]: owners_urn_list = [] for owner in data.get("owners", []): owner_id = owner.get("id") - owner_email = self.owners_dict.get(owner_id) + owner_email = self.owners_id_to_email_dict.get(owner_id) if owner_email is not None: owners_urn = make_user_urn(owner_email) owners_urn_list.append(owners_urn) return owners_urn_list - def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: + def _get_all_entity_owners(self, entity: str) -> Dict[str, Any]: current_page = 1 total_owners = PAGE_SIZE all_owners = [] @@ -405,7 +405,8 @@ def get_all_entity_owners(self, entity: str) -> Dict[str, Any]: logger.warning( f"Failed to get {entity} data: {full_owners_response.text}" ) - full_owners_response.raise_for_status() + current_page += 1 + continue payload = full_owners_response.json() total_owners = payload.get("count", total_owners) From 9156a84beb2d1e31d4dd6c6a7a5c89461189a6b7 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 13 Jan 2025 13:35:14 -0500 Subject: [PATCH 7/8] preset glossary term --- .../src/datahub/ingestion/source/superset.py | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 17b95fc11b5b29..5906213bea27df 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -22,9 +22,12 @@ make_dataset_urn, make_dataset_urn_with_platform_instance, make_domain_urn, + make_term_urn, make_user_urn, ) +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import add_domain_to_entity_wu +from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -36,6 +39,7 @@ ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( get_platform_from_sqlalchemy_uri, @@ -49,6 +53,7 @@ StatefulIngestionConfigBase, StatefulIngestionSourceBase, ) +from datahub.metadata._schema_classes import GlossaryTermAssociationClass, GlossaryTermInfoClass, AuditStampClass from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -73,6 +78,7 @@ ChartTypeClass, DashboardInfoClass, DatasetPropertiesClass, + GlossaryTermsClass, OwnerClass, OwnershipClass, OwnershipTypeClass, @@ -235,6 +241,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig): cached_domains=[domain_id for domain_id in self.config.domain], graph=self.ctx.graph, ) + self.sink_config = ctx.pipeline_config.sink.config self.session = self.login() self.owners_id_to_email_dict = self.build_preset_owner_dict() @@ -681,6 +688,54 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str: env=self.config.env, ) + def check_if_term_exists(self, term_urn): + graph = DataHubGraph( + DatahubClientConfig(server=self.sink_config.get("server", ""), token=self.sink_config.get("token", ""))) + # Query multiple aspects from entity + result = graph.get_entity_semityped( + entity_urn=term_urn, + aspects=["glossaryTermInfo"], + ) + + if result.get("glossaryTermInfo"): + return True + return False + + def parse_glossary_terms_from_metrics(self, metrics, last_modified) -> GlossaryTermsClass: + glossary_term_urns = [] + for metric in metrics: + ## We only sync in certified metrics + if "certified_by" in metric.get("extra", {}): + expression = metric.get("expression", "") + certification_details = metric.get("extra", "") + metric_name = metric.get("metric_name", "") + description = metric.get("description", "") + term_urn = make_term_urn(metric_name) + + if self.check_if_term_exists(term_urn): + logger.info(f"Term {term_urn} already exists") + glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn)) + continue + + term_properties_aspect = GlossaryTermInfoClass( + definition=f"Description: {description} \nSql Expression: {expression} \nCertification details: {certification_details}", + termSource="", + ) + + event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( + entityUrn=term_urn, + aspect=term_properties_aspect, + ) + + # Create rest emitter + rest_emitter = DatahubRestEmitter(gms_server=self.sink_config.get("server", ""), + token=self.sink_config.get("token", "")) + rest_emitter.emit(event) + logger.info(f"Created Glossary term {term_urn}") + glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn)) + + return GlossaryTermsClass(terms=glossary_term_urns, auditStamp=last_modified) + def construct_dataset_from_dataset_data( self, dataset_data: dict ) -> DatasetSnapshot: @@ -689,7 +744,11 @@ def construct_dataset_from_dataset_data( datasource_urn = self.get_datasource_urn_from_id( dataset_response, self.platform ) - + modified_ts = int( + dp.parse(dataset_data.get("changed_on_utc", "now")).timestamp() * 1000 + ) + modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}" + last_modified = AuditStampClass(time=modified_ts, actor=modified_actor) dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}" dataset_info = DatasetPropertiesClass( @@ -720,6 +779,13 @@ def construct_dataset_from_dataset_data( ], ) aspects_items.append(owners_info) + + metrics = dataset_response.get("result", {}).get("metrics", []) + + if metrics: + glossary_terms = self.parse_glossary_terms_from_metrics(metrics, last_modified) + aspects_items.append(glossary_terms) + dataset_snapshot = DatasetSnapshot( urn=datasource_urn, aspects=aspects_items, @@ -732,6 +798,7 @@ def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]: dataset_snapshot = self.construct_dataset_from_dataset_data( dataset_data ) + mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) except Exception as e: self.report.warning( From 862a5ceef0638973816a854bdbd155ce532af656 Mon Sep 17 00:00:00 2001 From: Shuixi Li Date: Mon, 13 Jan 2025 14:34:55 -0500 Subject: [PATCH 8/8] refactor preset & superset into own directory --- .../ingestion/source/superset/__init__.py | 0 .../ingestion/source/superset/constant.py | 66 +++++++++++++++++++ .../ingestion/source/{ => superset}/preset.py | 2 +- .../source/{ => superset}/superset.py | 2 +- .../tests/unit/test_preset_source.py | 2 +- .../tests/unit/test_superset_source.py | 2 +- 6 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/superset/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/superset/constant.py rename metadata-ingestion/src/datahub/ingestion/source/{ => superset}/preset.py (97%) rename metadata-ingestion/src/datahub/ingestion/source/{ => superset}/superset.py (99%) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/superset/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset/constant.py b/metadata-ingestion/src/datahub/ingestion/source/superset/constant.py new file mode 100644 index 00000000000000..a69a4a3f0dc20c --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/superset/constant.py @@ -0,0 +1,66 @@ +from typing import Dict, Type, Union +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + ArrayType, + BooleanType, + BytesType, + NullType, + NumberType, + RecordType, + StringType, + TimeType, +) + +SUPERSET_FIELD_TYPE_MAPPINGS: Dict[ + str, + Type[ + Union[ + ArrayType, + BytesType, + BooleanType, + NumberType, + RecordType, + StringType, + TimeType, + NullType, + ] + ], + ] = { + "BYTES": BytesType, + "BOOL": BooleanType, + "BOOLEAN": BooleanType, + "DOUBLE": NumberType, + "DOUBLE PRECISION": NumberType, + "DECIMAL": NumberType, + "NUMERIC": NumberType, + "BIGNUMERIC": NumberType, + "BIGDECIMAL": NumberType, + "FLOAT64": NumberType, + "INT": NumberType, + "INT64": NumberType, + "SMALLINT": NumberType, + "INTEGER": NumberType, + "BIGINT": NumberType, + "TINYINT": NumberType, + "BYTEINT": NumberType, + "STRING": StringType, + "TIME": TimeType, + "TIMESTAMP": TimeType, + "DATE": TimeType, + "DATETIME": TimeType, + "GEOGRAPHY": NullType, + "JSON": NullType, + "INTERVAL": NullType, + "ARRAY": ArrayType, + "STRUCT": RecordType, + "CHARACTER VARYING": StringType, + "CHARACTER": StringType, + "CHAR": StringType, + "TIMESTAMP WITHOUT TIME ZONE": TimeType, + "REAL": NumberType, + "VARCHAR": StringType, + "TIMESTAMPTZ": TimeType, + "GEOMETRY": NullType, + "HLLSKETCH": NullType, + "TIMETZ": TimeType, + "VARBYTE": StringType, + } \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/superset/preset.py similarity index 97% rename from metadata-ingestion/src/datahub/ingestion/source/preset.py rename to metadata-ingestion/src/datahub/ingestion/source/superset/preset.py index 322a3b42339d1c..dae6c148d52a9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/preset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset/preset.py @@ -19,7 +19,7 @@ StaleEntityRemovalSourceReport, StatefulStaleMetadataRemovalConfig, ) -from datahub.ingestion.source.superset import SupersetConfig, SupersetSource +from datahub.ingestion.source.superset.superset import SupersetConfig, SupersetSource from datahub.utilities import config_clean logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset/superset.py similarity index 99% rename from metadata-ingestion/src/datahub/ingestion/source/superset.py rename to metadata-ingestion/src/datahub/ingestion/source/superset/superset.py index 5906213bea27df..1b3d6d9237d6bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset/superset.py @@ -337,7 +337,7 @@ def get_dataset_info(self, dataset_id: int) -> dict: def get_datasource_urn_from_id( self, dataset_response: dict, platform_instance: str ) -> str: - schema_name = dataset_response.get("result", {}).get("schema") + schema_name = dataset_response.get("result", {}).get("../schema") table_name = dataset_response.get("result", {}).get("table_name") database_id = dataset_response.get("result", {}).get("database", {}).get("id") platform = self.get_platform_from_database_id(database_id) diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py index dc81f4c8284d50..4dfead2ad1ed46 100644 --- a/metadata-ingestion/tests/unit/test_preset_source.py +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -1,4 +1,4 @@ -from datahub.ingestion.source.preset import PresetConfig +from datahub.ingestion.source.superset.preset import PresetConfig def test_default_values(): diff --git a/metadata-ingestion/tests/unit/test_superset_source.py b/metadata-ingestion/tests/unit/test_superset_source.py index 912bfa3511421c..ddb4eef77813c7 100644 --- a/metadata-ingestion/tests/unit/test_superset_source.py +++ b/metadata-ingestion/tests/unit/test_superset_source.py @@ -1,4 +1,4 @@ -from datahub.ingestion.source.superset import SupersetConfig +from datahub.ingestion.source.superset.superset import SupersetConfig def test_default_values():