Skip to content

Commit

Permalink
feat(ingest): add looker meta extractor support in sql parsing (data…
Browse files Browse the repository at this point in the history
…hub-project#12062)

Co-authored-by: Mayuri N <[email protected]>
Co-authored-by: Mayuri Nehate <[email protected]>
  • Loading branch information
3 people authored Dec 19, 2024
1 parent 953893c commit 2e54461
Show file tree
Hide file tree
Showing 26 changed files with 1,026 additions and 79 deletions.
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ class DatasetLineageProviderConfigBase(EnvConfigMixin):
default=None,
description="A holder for platform -> platform_instance mappings to generate correct dataset urns",
)


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = Field(
default=None,
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
"with platform instance name used in ingestion "
"recipe of other datahub sources.",
)
env: str = Field(
default=DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
from pydantic.class_validators import validator

import datahub.emitter.mce_builder as builder
from datahub.api.entities.platformresource.platform_resource import (
PlatformResource,
PlatformResourceKey,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ContainerKey, create_embed_mcp
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -106,7 +110,7 @@
from datahub.utilities.url_util import remove_port_from_url

CORPUSER_DATAHUB = "urn:li:corpuser:datahub"

LOOKER = "looker"
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -1411,6 +1415,7 @@ class LookerDashboardSourceReport(StaleEntityRemovalSourceReport):

resolved_user_ids: int = 0
email_ids_missing: int = 0 # resolved users with missing email addresses
looker_user_count: int = 0

_looker_api: Optional[LookerAPI] = None
query_latency: Dict[str, datetime.timedelta] = dataclasses_field(
Expand Down Expand Up @@ -1614,16 +1619,31 @@ def get_urn_dashboard_id(self):
class LookerUserRegistry:
looker_api_wrapper: LookerAPI
fields: str = ",".join(["id", "email", "display_name", "first_name", "last_name"])
_user_cache: Dict[str, LookerUser] = {}

def __init__(self, looker_api: LookerAPI):
def __init__(self, looker_api: LookerAPI, report: LookerDashboardSourceReport):
self.looker_api_wrapper = looker_api
self.report = report
self._initialize_user_cache()

def _initialize_user_cache(self) -> None:
raw_users: Sequence[User] = self.looker_api_wrapper.all_users(
user_fields=self.fields
)

for raw_user in raw_users:
looker_user = LookerUser.create_looker_user(raw_user)
self._user_cache[str(looker_user.id)] = looker_user

def get_by_id(self, id_: str) -> Optional[LookerUser]:
if not id_:
return None

logger.debug(f"Will get user {id_}")

if str(id_) in self._user_cache:
return self._user_cache.get(str(id_))

raw_user: Optional[User] = self.looker_api_wrapper.get_user(
str(id_), user_fields=self.fields
)
Expand All @@ -1632,3 +1652,35 @@ def get_by_id(self, id_: str) -> Optional[LookerUser]:

looker_user = LookerUser.create_looker_user(raw_user)
return looker_user

def to_platform_resource(
self, platform_instance: Optional[str]
) -> Iterable[MetadataChangeProposalWrapper]:
try:
platform_resource_key = PlatformResourceKey(
platform=LOOKER,
resource_type="USER_ID_MAPPING",
platform_instance=platform_instance,
primary_key="",
)

# Extract user email mappings
user_email_cache = {
user_id: user.email
for user_id, user in self._user_cache.items()
if user.email
}

platform_resource = PlatformResource.create(
key=platform_resource_key,
value=user_email_cache,
)

self.report.looker_user_count = len(user_email_cache)
yield from platform_resource.to_mcps()

except Exception as exc:
self.report.warning(
message="Failed to generate platform resource for looker id mappings",
exc=exc,
)
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class LookerAPIStats(BaseModel):
get_look_calls: int = 0
search_looks_calls: int = 0
search_dashboards_calls: int = 0
all_user_calls: int = 0


class LookerAPI:
Expand Down Expand Up @@ -135,7 +136,7 @@ def get_available_permissions(self) -> Set[str]:

return permissions

@lru_cache(maxsize=1000)
@lru_cache(maxsize=5000)
def get_user(self, id_: str, user_fields: str) -> Optional[User]:
self.client_stats.user_calls += 1
try:
Expand All @@ -154,6 +155,17 @@ def get_user(self, id_: str, user_fields: str) -> Optional[User]:
# User not found
return None

def all_users(self, user_fields: str) -> Sequence[User]:
self.client_stats.all_user_calls += 1
try:
return self.client.all_users(
fields=cast(str, user_fields),
transport_options=self.transport_options,
)
except SDKError as e:
logger.warning(f"Failure was {e}")
return []

def execute_query(self, write_query: WriteQuery) -> List[Dict]:
logger.debug(f"Executing query {write_query}")
self.client_stats.query_calls += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
self.source_config: LookerDashboardSourceConfig = config
self.reporter: LookerDashboardSourceReport = LookerDashboardSourceReport()
self.looker_api: LookerAPI = LookerAPI(self.source_config)
self.user_registry: LookerUserRegistry = LookerUserRegistry(self.looker_api)
self.user_registry: LookerUserRegistry = LookerUserRegistry(
self.looker_api, self.reporter
)
self.explore_registry: LookerExploreRegistry = LookerExploreRegistry(
self.looker_api, self.reporter, self.source_config
)
Expand Down Expand Up @@ -1673,5 +1675,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield usage_mcp.as_workunit()
self.reporter.report_stage_end("usage_extraction")

# Dump looker user resource mappings.
logger.info("Ingesting looker user resource mapping workunits")
self.reporter.report_stage_start("user_resource_extraction")
yield from auto_workunit(
self.user_registry.to_platform_resource(
self.source_config.platform_instance
)
)

def get_report(self) -> SourceReport:
return self.reporter
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import DatasetSourceConfigMixin, PlatformDetail
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -232,19 +232,6 @@ def default_for_dataset_type_mapping() -> Dict[str, str]:
return dict_


class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
"with platform instance name used in ingestion "
"recipe of other datahub sources.",
)
env: str = pydantic.Field(
default=builder.DEFAULT_ENV,
description="The environment that all assets produced by DataHub platform ingestion source belong to",
)


class DataBricksPlatformDetail(PlatformDetail):
"""
metastore is an additional field used in Databricks connector to generate the dataset urn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from abc import ABC, abstractmethod
from typing import Union

from datahub.configuration.source_common import PlatformDetail
from datahub.ingestion.source.powerbi.config import (
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBIPlatformDetail,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

from lark import Tree

from datahub.configuration.source_common import PlatformDetail
from datahub.emitter import mce_builder as builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.powerbi.config import (
Constant,
DataBricksPlatformDetail,
DataPlatformPair,
PlatformDetail,
PowerBiDashboardSourceConfig,
PowerBiDashboardSourceReport,
PowerBIPlatformDetail,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
identifiers=self.identifiers,
schema_resolver=schema_resolver,
discovered_tables=discovered_datasets,
graph=self.ctx.graph,
)

# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ def __init__(
self._exit_stack.push(self._query_usage_counts)

# Tool Extractor
self._tool_meta_extractor = ToolMetaExtractor()
self._tool_meta_extractor = ToolMetaExtractor.create(graph)
self.report.tool_meta_report = self._tool_meta_extractor.report

def close(self) -> None:
Expand Down
Loading

0 comments on commit 2e54461

Please sign in to comment.