Skip to content

Commit

Permalink
fix: dataset_pattern changes for tables and views
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Oct 23, 2024
1 parent a6a9a91 commit b0c8c08
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ class DremioAPIOperations:
def __init__(self, connection_args: "DremioSourceConfig") -> None:
self.dremio_to_datahub_source_mapper = DremioToDataHubSourceTypeMapping()
self.allow_schema_pattern: List[str] = connection_args.schema_pattern.allow
self.allow_dataset_pattern: List[str] = connection_args.dataset_pattern.allow
self.deny_schema_pattern: List[str] = connection_args.schema_pattern.deny
self.deny_dataset_pattern: List[str] = connection_args.dataset_pattern.deny
self._max_workers: int = connection_args.max_workers
self.is_dremio_cloud = connection_args.is_dremio_cloud
self.session = requests.Session()
Expand Down Expand Up @@ -88,7 +86,7 @@ def _setup_session(self) -> None:
retry_strategy = Retry(
total=self._retry_count,
status_forcelist=[429, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS", "POST"],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
backoff_factor=1,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
Expand Down Expand Up @@ -403,20 +401,13 @@ def get_all_tables_and_columns(self, containers: Deque) -> List[Dict]:
query_template = DremioSQLQueries.QUERY_DATASETS_CE

schema_field = "CONCAT(REPLACE(REPLACE(REPLACE(UPPER(TABLE_SCHEMA), ', ', '.'), '[', ''), ']', ''))"
table_field = "UPPER(TABLE_NAME)"

schema_condition = self.get_pattern_condition(
self.allow_schema_pattern, schema_field
)
table_condition = self.get_pattern_condition(
self.allow_dataset_pattern, table_field
)
deny_schema_condition = self.get_pattern_condition(
self.deny_schema_pattern, schema_field, allow=False
)
deny_table_condition = self.get_pattern_condition(
self.deny_dataset_pattern, table_field, allow=False
)

all_tables_and_columns = []

Expand All @@ -425,9 +416,7 @@ def get_all_tables_and_columns(self, containers: Deque) -> List[Dict]:
try:
formatted_query = query_template.format(
schema_pattern=schema_condition,
table_pattern=table_condition,
deny_schema_pattern=deny_schema_condition,
deny_table_pattern=deny_table_condition,
container_name=schema.container_name.lower(),
)

Expand All @@ -438,7 +427,7 @@ def get_all_tables_and_columns(self, containers: Deque) -> List[Dict]:
)
except Exception as exc:
logger.warning(
f"{schema.subclass} {schema.container_name} had no tables or views {formatted_query}"
f"{schema.subclass} {schema.container_name} had no tables or views"
)
logger.debug(exc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ class DremioSourceConfig(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter",
)

dataset_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter",
description="Regex patterns for tables and views to filter in ingestion. Specify regex to match the entire table name in dremio.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex 'dremio.public.customer.*'",
)

usage: BaseUsageConfig = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
DremioGlossaryTerm,
DremioQuery,
)
from datahub.ingestion.source.dremio.dremio_profiling import (
DremioProfiler,
)
from datahub.ingestion.source.dremio.dremio_profiling import DremioProfiler
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand All @@ -72,7 +71,7 @@


@dataclass
class DremioSourceReport(StaleEntityRemovalSourceReport):
class DremioSourceReport(ProfilingSqlReport, StaleEntityRemovalSourceReport):
num_containers_failed: int = 0
num_datasets_failed: int = 0

Expand All @@ -81,6 +80,19 @@ def report_upstream_latency(self, start_time: datetime, end_time: datetime) -> N
# for future implementation of min / max / percentiles etc.
pass

def report_entity_scanned(
self, name: str, ent_type: str = DremioDatasetType.VIEW.value
) -> None:
"""
Entity could be a view or a table
"""
if ent_type == DremioDatasetType.TABLE.value:
self.tables_scanned += 1
elif ent_type == DremioDatasetType.VIEW.value:
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")


@platform_name("Dremio")
@config_class(DremioSourceConfig)
Expand Down Expand Up @@ -340,9 +352,16 @@ def process_dataset(

schema_str = ".".join(dataset_info.path)

dataset_name = f"{schema_str}.{dataset_info.resource_name}".lower()

self.report.report_entity_scanned(dataset_name, dataset_info.dataset_type.value)
if not self.config.dataset_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
return

dataset_urn = make_dataset_urn_with_platform_instance(
platform=make_data_platform_urn(self.get_platform()),
name=f"dremio.{schema_str}.{dataset_info.resource_name}".lower(),
name=f"dremio.{dataset_name}",
env=self.config.env,
platform_instance=self.config.platform_instance,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ class DremioSQLQueries:
)
WHERE 1=1
{schema_pattern}
{table_pattern}
{deny_schema_pattern}
{deny_table_pattern}
ORDER BY
TABLE_SCHEMA ASC,
TABLE_NAME ASC
Expand Down Expand Up @@ -130,9 +128,7 @@ class DremioSQLQueries:
)
WHERE 1=1
{schema_pattern}
{table_pattern}
{deny_schema_pattern}
{deny_table_pattern}
ORDER BY
TABLE_SCHEMA ASC,
TABLE_NAME ASC
Expand Down Expand Up @@ -234,9 +230,7 @@ class DremioSQLQueries:
)
WHERE 1=1
{schema_pattern}
{table_pattern}
{deny_schema_pattern}
{deny_table_pattern}
ORDER BY
TABLE_SCHEMA ASC,
TABLE_NAME ASC
Expand Down

0 comments on commit b0c8c08

Please sign in to comment.