Skip to content

Commit

Permalink
fix(ingest/snowflake): handle dots in snowflake table names (datahub-…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Dec 17, 2024
1 parent 6db28b2 commit 6bb765e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
from functools import cached_property
from typing import ClassVar, Literal, Optional, Tuple
from typing import ClassVar, List, Literal, Optional, Tuple

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
Expand Down Expand Up @@ -184,6 +184,46 @@ def _is_sys_table(table_name: str) -> bool:
return table_name.lower().startswith("sys$")


def _split_qualified_name(qualified_name: str) -> List[str]:
"""
Split a qualified name into its constituent parts.
>>> _split_qualified_name("db.my_schema.my_table")
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('"db"."my_schema"."my_table"')
['db', 'my_schema', 'my_table']
>>> _split_qualified_name('TEST_DB.TEST_SCHEMA."TABLE.WITH.DOTS"')
['TEST_DB', 'TEST_SCHEMA', 'TABLE.WITH.DOTS']
>>> _split_qualified_name('TEST_DB."SCHEMA.WITH.DOTS".MY_TABLE')
['TEST_DB', 'SCHEMA.WITH.DOTS', 'MY_TABLE']
"""

# Fast path - no quotes.
if '"' not in qualified_name:
return qualified_name.split(".")

# First pass - split on dots that are not inside quotes.
in_quote = False
parts: List[List[str]] = [[]]
for char in qualified_name:
if char == '"':
in_quote = not in_quote
elif char == "." and not in_quote:
parts.append([])
else:
parts[-1].append(char)

# Second pass - remove outer pairs of quotes.
result = []
for part in parts:
if len(part) > 2 and part[0] == '"' and part[-1] == '"':
part = part[1:-1]

result.append("".join(part))

return result


# Qualified Object names from snowflake audit logs have quotes for for snowflake quoted identifiers,
# For example "test-database"."test-schema".test_table
# whereas we generate urns without quotes even for quoted identifiers for backward compatibility
Expand All @@ -192,7 +232,7 @@ def _is_sys_table(table_name: str) -> bool:
def _cleanup_qualified_name(
qualified_name: str, structured_reporter: SourceReport
) -> str:
name_parts = qualified_name.split(".")
name_parts = _split_qualified_name(qualified_name)
if len(name_parts) != 3:
if not _is_sys_table(qualified_name):
structured_reporter.info(
Expand All @@ -203,9 +243,9 @@ def _cleanup_qualified_name(
)
return qualified_name.replace('"', "")
return _combine_identifier_parts(
db_name=name_parts[0].strip('"'),
schema_name=name_parts[1].strip('"'),
table_name=name_parts[2].strip('"'),
db_name=name_parts[0],
schema_name=name_parts[1],
table_name=name_parts[2],
)


Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/testing/doctest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import doctest
from types import ModuleType


def assert_doctest(module: ModuleType) -> None:
result = doctest.testmod(
module,
raise_on_error=True,
verbose=True,
)
if result.attempted == 0:
raise ValueError(f"No doctests found in {module.__name__}")
14 changes: 4 additions & 10 deletions metadata-ingestion/tests/integration/git/test_git_clone.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import doctest
import os

import pytest
from pydantic import SecretStr

import datahub.ingestion.source.git.git_import
from datahub.configuration.common import ConfigurationWarning
from datahub.configuration.git import GitInfo, GitReference
from datahub.ingestion.source.git.git_import import GitClone
from datahub.testing.doctest import assert_doctest

LOOKML_TEST_SSH_KEY = os.environ.get("DATAHUB_LOOKML_GIT_TEST_SSH_KEY")

Expand Down Expand Up @@ -82,15 +83,8 @@ def test_github_branch():
assert config.branch_for_clone == "main"


def test_sanitize_repo_url():
import datahub.ingestion.source.git.git_import

assert (
doctest.testmod(
datahub.ingestion.source.git.git_import, raise_on_error=True
).attempted
== 3
)
def test_sanitize_repo_url() -> None:
assert_doctest(datahub.ingestion.source.git.git_import)


def test_git_clone_public(tmp_path):
Expand Down
17 changes: 4 additions & 13 deletions metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from botocore.stub import Stubber
from freezegun import freeze_time

import datahub.ingestion.source.aws.sagemaker_processors.models
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.aws.sagemaker import (
Expand All @@ -13,6 +14,7 @@
job_type_to_info,
job_types,
)
from datahub.testing.doctest import assert_doctest
from tests.test_helpers import mce_helpers
from tests.unit.sagemaker.test_sagemaker_source_stubs import (
describe_endpoint_response_1,
Expand Down Expand Up @@ -243,16 +245,5 @@ def test_sagemaker_ingest(tmp_path, pytestconfig):
)


def test_doc_test_run():
import doctest

import datahub.ingestion.source.aws.sagemaker_processors.models

assert (
doctest.testmod(
datahub.ingestion.source.aws.sagemaker_processors.models,
raise_on_error=True,
verbose=True,
).attempted
== 1
)
def test_doc_test_run() -> None:
assert_doctest(datahub.ingestion.source.aws.sagemaker_processors.models)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
from pydantic import ValidationError

import datahub.ingestion.source.snowflake.snowflake_utils
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.ingestion.api.source import SourceCapability
Expand All @@ -26,6 +27,7 @@
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
from datahub.testing.doctest import assert_doctest
from tests.test_helpers import test_connection_helpers

default_oauth_dict: Dict[str, Any] = {
Expand Down Expand Up @@ -658,3 +660,7 @@ def test_create_snowsight_base_url_ap_northeast_1():
).snowsight_base_url

assert result == "https://app.snowflake.com/ap-northeast-1.aws/account_locator/"


def test_snowflake_utils() -> None:
assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils)
4 changes: 2 additions & 2 deletions metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import doctest
from datetime import timedelta
from typing import Dict, List, Union
from unittest import mock
Expand All @@ -22,6 +21,7 @@
OwnershipSourceTypeClass,
OwnershipTypeClass,
)
from datahub.testing.doctest import assert_doctest


def create_owners_list_from_urn_list(
Expand Down Expand Up @@ -442,7 +442,7 @@ def test_dbt_cloud_config_with_defined_metadata_endpoint():


def test_infer_metadata_endpoint() -> None:
assert doctest.testmod(dbt_cloud, raise_on_error=True).attempted > 0
assert_doctest(dbt_cloud)


def test_dbt_time_parsing() -> None:
Expand Down
14 changes: 4 additions & 10 deletions metadata-ingestion/tests/unit/utilities/test_utilities.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import doctest
import re
from typing import List

import datahub.utilities.logging_manager
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.testing.doctest import assert_doctest
from datahub.utilities.delayed_iter import delayed_iter
from datahub.utilities.is_pytest import is_pytest_running
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -328,15 +329,8 @@ def test_sqllineage_sql_parser_tables_with_special_names():
assert sorted(SqlLineageSQLParser(sql_query).get_columns()) == expected_columns


def test_logging_name_extraction():
import datahub.utilities.logging_manager

assert (
doctest.testmod(
datahub.utilities.logging_manager, raise_on_error=True
).attempted
> 0
)
def test_logging_name_extraction() -> None:
assert_doctest(datahub.utilities.logging_manager)


def test_is_pytest_running() -> None:
Expand Down

0 comments on commit 6bb765e

Please sign in to comment.