From 6787db7124861b2fdd35d073bf046bd5786425c0 Mon Sep 17 00:00:00 2001 From: Semion Sidorenko Date: Wed, 23 Oct 2024 23:42:27 +0200 Subject: [PATCH] fix(ingest/superset): Don't set schema/db for druid (#11682) --- .../src/datahub/ingestion/source/superset.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 4e40407fba9086..5ce33da5c55fac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -82,6 +82,8 @@ "box_plot": ChartTypeClass.BAR, } +platform_without_databases = ["druid"] + class SupersetConfig( StatefulIngestionConfigBase, EnvConfigMixin, PlatformInstanceConfigMixin @@ -252,17 +254,30 @@ def get_datasource_urn_from_id(self, datasource_id): dataset_response = self.session.get( f"{self.config.connect_uri}/api/v1/dataset/{datasource_id}" ).json() + schema_name = dataset_response.get("result", {}).get("schema") table_name = dataset_response.get("result", {}).get("table_name") database_id = dataset_response.get("result", {}).get("database", {}).get("id") + platform = self.get_platform_from_database_id(database_id) + database_name = ( dataset_response.get("result", {}).get("database", {}).get("database_name") ) database_name = self.config.database_alias.get(database_name, database_name) + # Druid do not have a database concept and has a limited schema concept, but they are nonetheless reported + # from superset. There is only one database per platform instance, and one schema named druid, so it would be + # redundant to systemically store them both in the URN. + if platform in platform_without_databases: + database_name = None + + if platform == "druid" and schema_name == "druid": + # Follow DataHub's druid source convention. + schema_name = None + if database_id and table_name: return make_dataset_urn( - platform=self.get_platform_from_database_id(database_id), + platform=platform, name=".".join( name for name in [database_name, schema_name, table_name] if name ),