Skip to content

Commit

Permalink
feat(ingest/powerbi): add timeouts for m-query parsing (datahub-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 30, 2024
1 parent 799c452 commit 143fc01
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 56 deletions.
13 changes: 12 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@
*path_spec_common,
}

threading_timeout_common = {
"stopit==1.1.2",
}

abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.17.1",
Expand Down Expand Up @@ -492,7 +496,14 @@
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi": (
(
microsoft_common
| {"lark[regex]==1.1.4", "sqlparse", "more-itertools"}
| sqlglot_lib
| threading_timeout_common
)
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def __init__(

def get_query_result(self, query: str) -> RowIterator:
def _should_retry(exc: BaseException) -> bool:
logger.debug(f"Exception occured for job query. Reason: {exc}")
logger.debug(f"Exception occurred for job query. Reason: {exc}")
# Jobs sometimes fail with transient errors.
# This is not currently handled by the python-bigquery client.
# https://github.com/googleapis/python-bigquery/issues/23
Expand All @@ -197,7 +197,7 @@ def _should_retry(exc: BaseException) -> bool:
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
def _should_retry(exc: BaseException) -> bool:
logger.debug(
f"Exception occured for project.list api. Reason: {exc}. Retrying api request..."
f"Exception occurred for project.list api. Reason: {exc}. Retrying api request..."
)
self.report.num_list_projects_retry_request += 1
return True
Expand Down
19 changes: 12 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -176,11 +177,18 @@ class SupportedDataPlatform(Enum):

@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
all_workspace_count: int = 0
filtered_workspace_names: LossyList[str] = dataclass_field(
default_factory=LossyList
)
filtered_workspace_types: LossyList[str] = dataclass_field(
default_factory=LossyList
)

dashboards_scanned: int = 0
charts_scanned: int = 0
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list)
number_of_workspaces: int = 0

def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count
Expand All @@ -194,9 +202,6 @@ def report_dashboards_dropped(self, model: str) -> None:
def report_charts_dropped(self, view: str) -> None:
self.filtered_charts.append(view)

def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces


def default_for_dataset_type_mapping() -> Dict[str, str]:
dict_: dict = {}
Expand Down Expand Up @@ -331,7 +336,7 @@ class PowerBiDashboardSourceConfig(
)
workspace_id_as_urn_part: bool = pydantic.Field(
default=False,
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names."
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names. "
"To maintain backward compatibility, this is set to False which uses workspace name",
)
# Enable/Disable extracting ownership information of Dashboard
Expand Down Expand Up @@ -371,8 +376,8 @@ class PowerBiDashboardSourceConfig(
# any existing tags defined to those entities
extract_endorsements_to_tags: bool = pydantic.Field(
default=False,
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. Admin API "
"access is required is this setting is enabled",
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. "
"Admin API access is required if this setting is enabled.",
)
filter_dataset_endorsements: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import importlib.resources as pkg_resource
import logging
import os
from typing import Dict, List

import lark
Expand All @@ -19,9 +20,12 @@
TRACE_POWERBI_MQUERY_PARSER,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout

logger = logging.getLogger(__name__)

_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60))


@functools.lru_cache(maxsize=1)
def get_lark_parser() -> Lark:
Expand All @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree:
expression = expression.replace("\u00a0", " ")

logger.debug(f"Parsing expression = {expression}")
parse_tree: Tree = lark_parser.parse(expression)
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
parse_tree: Tree = lark_parser.parse(expression)

if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(parse_tree.pretty())
Expand Down Expand Up @@ -83,17 +88,26 @@ def get_upstream_tables(
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
return []
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
except (
BaseException
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
title = "Unexpected Character Found"
error_type = "Unexpected Character Error"
else:
title = "Unknown Parsing Error"
error_type = "Unknown Parsing Error"

reporter.warning(
title=title,
message="Unknown parsing error",
title="Unable to extract lineage from M-Query expression",
message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,9 @@ def internal(
)
if v_statement is None:
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
f"output variable ({current_identifier}) statement not found in table expression",
title="Unable to extract lineage from M-Query expression",
message="Lineage will be incomplete.",
context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression",
)
return None

Expand Down
56 changes: 35 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from datetime import datetime
from typing import Iterable, List, Optional, Tuple, Union

import more_itertools

import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -795,6 +797,11 @@ def generate_container_for_workspace(
container_key=self.workspace_key,
name=workspace.name,
sub_types=[workspace.type],
extra_properties={
"workspace_id": workspace.id,
"workspace_name": workspace.name,
"workspace_type": workspace.type,
},
)
return container_work_units

Expand Down Expand Up @@ -1256,20 +1263,33 @@ def create(cls, config_dict, ctx):

def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]:
all_workspaces = self.powerbi_client.get_workspaces()
logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.all_workspace_count = len(all_workspaces)
logger.debug(
f"All workspaces: {[workspace.format_name_for_logger() for workspace in all_workspaces]}"
)

allowed_wrk = [
workspace
for workspace in all_workspaces
if self.source_config.workspace_id_pattern.allowed(workspace.id)
and workspace.type in self.source_config.workspace_type_filter
]
allowed_workspaces = []
for workspace in all_workspaces:
if not self.source_config.workspace_id_pattern.allowed(workspace.id):
self.reporter.filtered_workspace_names.append(
f"{workspace.id} - {workspace.name}"
)
continue
elif workspace.type not in self.source_config.workspace_type_filter:
self.reporter.filtered_workspace_types.append(
f"{workspace.id} - {workspace.name} (type = {workspace.type})"
)
continue
else:
allowed_workspaces.append(workspace)

logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.report_number_of_workspaces(len(all_workspaces))
logger.info(f"Number of allowed workspaces = {len(allowed_wrk)}")
logger.debug(f"Workspaces = {all_workspaces}")
logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}")
logger.debug(
f"Allowed workspaces: {[workspace.format_name_for_logger() for workspace in allowed_workspaces]}"
)

return allowed_wrk
return allowed_workspaces

def validate_dataset_type_mapping(self):
powerbi_data_platforms: List[str] = [
Expand Down Expand Up @@ -1480,16 +1500,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# Fetch PowerBi workspace for given workspace identifier

allowed_workspaces = self.get_allowed_workspaces()
workspaces_len = len(allowed_workspaces)

batch_size = (
self.source_config.scan_batch_size
) # 100 is the maximum allowed for powerbi scan
num_batches = (workspaces_len + batch_size - 1) // batch_size
batches = [
allowed_workspaces[i * batch_size : (i + 1) * batch_size]
for i in range(num_batches)
]

batches = more_itertools.chunked(
allowed_workspaces, self.source_config.scan_batch_size
)
for batch_workspaces in batches:
for workspace in self.powerbi_client.fill_workspaces(
batch_workspaces, self.reporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def get_workspace_key(
instance=platform_instance,
)

def format_name_for_logger(self) -> str:
return f"{self.name} ({self.id})"


@dataclass
class DataSource:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def _is_scan_result_ready(
res.raise_for_status()

if res.json()[Constant.STATUS].upper() == Constant.SUCCEEDED:
logger.info(f"Scan result is available for scan id({scan_id})")
logger.debug(f"Scan result is available for scan id({scan_id})")
return True

if retry == max_retry:
Expand Down Expand Up @@ -898,8 +898,8 @@ def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User
return users

def get_scan_result(self, scan_id: str) -> Optional[dict]:
logger.info("Fetching scan result")
logger.info(f"{Constant.SCAN_ID}={scan_id}")
logger.debug("Fetching scan result")
logger.debug(f"{Constant.SCAN_ID}={scan_id}")
scan_result_get_endpoint = AdminAPIResolver.API_ENDPOINTS[
Constant.SCAN_RESULT_GET
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def _get_scan_result(self, workspace_ids: List[str]) -> Any:
)
return None

logger.info("Waiting for scan to complete")
logger.debug("Waiting for scan to complete")
if (
self.__admin_api_resolver.wait_for_scan_to_complete(
scan_id=scan_id, timeout=self.__config.scan_timeout
Expand Down Expand Up @@ -355,22 +355,32 @@ def _get_workspace_datasets(self, workspace: Workspace) -> dict:
logger.debug("Processing scan result for datasets")

for dataset_dict in datasets:
dataset_instance: PowerBIDataset = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_dict[Constant.ID],
)
dataset_id = dataset_dict[Constant.ID]
try:
dataset_instance = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_id,
)
if dataset_instance is None:
continue
except Exception as e:
self.reporter.warning(
title="Unable to fetch dataset details",
message="Skipping this dataset due to the error. Metadata will be incomplete.",
context=f"workspace={workspace.name}, dataset-id={dataset_id}",
exc=e,
)
continue

# fetch + set dataset parameters
try:
dataset_parameters = self._get_resolver().get_dataset_parameters(
workspace_id=workspace.id,
dataset_id=dataset_dict[Constant.ID],
dataset_id=dataset_id,
)
dataset_instance.parameters = dataset_parameters
except Exception as e:
logger.info(
f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}"
)
logger.info(f"Unable to fetch dataset parameters for {dataset_id}: {e}")

if self.__config.extract_endorsements_to_tags:
dataset_instance.tags = self._parse_endorsement(
Expand Down Expand Up @@ -564,8 +574,7 @@ def _fill_metadata_from_scan_result(
)
else:
logger.info(
"Skipping endorsements tag as extract_endorsements_to_tags is set to "
"false "
"Skipping endorsements tag as extract_endorsements_to_tags is not enabled"
)

self._populate_app_details(
Expand Down Expand Up @@ -641,6 +650,9 @@ def fill_dashboard_tags() -> None:
def fill_workspaces(
self, workspaces: List[Workspace], reporter: PowerBiDashboardSourceReport
) -> Iterable[Workspace]:
logger.info(
f"Fetching initial metadata for workspaces: {[workspace.format_name_for_logger() for workspace in workspaces]}"
)

workspaces = self._fill_metadata_from_scan_result(workspaces=workspaces)
# First try to fill the admin detail as some regular metadata contains lineage to admin metadata
Expand Down
Loading

0 comments on commit 143fc01

Please sign in to comment.