Skip to content

Commit

Permalink
feat(great-expectations): add SqlAlchemyDataset support
Browse files Browse the repository at this point in the history
  • Loading branch information
seuf committed Nov 13, 2023
1 parent 7ba54fd commit 2d5894c
Showing 1 changed file with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from great_expectations.dataset.sqlalchemy_dataset import SqlAlchemyDataset
from great_expectations.execution_engine.sqlalchemy_execution_engine import (
SqlAlchemyExecutionEngine,
)
Expand Down Expand Up @@ -686,10 +687,36 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
batch_spec_type=type(ge_batch_spec)
)
)
elif isinstance(data_asset, SqlAlchemyDataset):
if "." in data_asset._table.name:

Check warning on line 691 in metadata-ingestion/src/datahub/integrations/great_expectations/action.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/integrations/great_expectations/action.py#L690-L691

Added lines #L690 - L691 were not covered by tests
# bigquery case
schema_name, table_name = data_asset._table.name.split(".")
sqlalchemy_uri = f"{data_asset.engine.url}/{schema_name}"

Check warning on line 694 in metadata-ingestion/src/datahub/integrations/great_expectations/action.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/integrations/great_expectations/action.py#L693-L694

Added lines #L693 - L694 were not covered by tests
else:
schema_name = data_asset._table.schema
table_name = data_asset._table.name
sqlalchemy_uri = data_asset.engine.url

Check warning on line 698 in metadata-ingestion/src/datahub/integrations/great_expectations/action.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/integrations/great_expectations/action.py#L696-L698

Added lines #L696 - L698 were not covered by tests

dataset_urn = make_dataset_urn_from_sqlalchemy_uri(

Check warning on line 700 in metadata-ingestion/src/datahub/integrations/great_expectations/action.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/integrations/great_expectations/action.py#L700

Added line #L700 was not covered by tests
sqlalchemy_uri=sqlalchemy_uri,
schema_name=schema_name,
table_name=table_name,
env=self.env
)
partitionSpec = None
batchSpec = None
dataset_partitions.append(

Check warning on line 708 in metadata-ingestion/src/datahub/integrations/great_expectations/action.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/integrations/great_expectations/action.py#L706-L708

Added lines #L706 - L708 were not covered by tests
{
"dataset_urn": dataset_urn,
"partitionSpec": partitionSpec,
"batchSpec": batchSpec,
}
)
else:
# TODO - v2-spec - SqlAlchemyDataset support
warn(
"DataHubValidationAction does not recognize this GE data asset type - {asset_type}. This is either using v2-api or execution engine other than sqlalchemy.".format(
"""
DataHubValidationAction does not recognize this GE data asset type - {asset_type}.
This is either using v2-api or execution engine other than sqlalchemy.""".format(
asset_type=type(data_asset)
)
)
Expand Down Expand Up @@ -763,7 +790,8 @@ def make_dataset_urn_from_sqlalchemy_uri(
return None
# If data platform is snowflake, we artificially lowercase the Database name.
# This is because DataHub also does this during ingestion.
# Ref: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155
# Ref:
# https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155
database_name = (
url_instance.database.lower()
if data_platform == "snowflake"
Expand Down

0 comments on commit 2d5894c

Please sign in to comment.