diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 8b393a8f6f1c6..32b3a351ad1fe 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -24,6 +24,7 @@ ExpectationSuiteIdentifier, ValidationResultIdentifier, ) +from great_expectations.dataset.sqlalchemy_dataset import SqlAlchemyDataset from great_expectations.execution_engine.sqlalchemy_execution_engine import ( SqlAlchemyExecutionEngine, ) @@ -686,10 +687,36 @@ def get_dataset_partitions(self, batch_identifier, data_asset): batch_spec_type=type(ge_batch_spec) ) ) + elif isinstance(data_asset, SqlAlchemyDataset): + if "." in data_asset._table.name: + # bigquery case + schema_name, table_name = data_asset._table.name.split(".") + sqlalchemy_uri = f"{data_asset.engine.url}/{schema_name}" + else: + schema_name = data_asset._table.schema + table_name = data_asset._table.name + sqlalchemy_uri = data_asset.engine.url + + dataset_urn = make_dataset_urn_from_sqlalchemy_uri( + sqlalchemy_uri=sqlalchemy_uri, + schema_name=schema_name, + table_name=table_name, + env=self.env + ) + partitionSpec = None + batchSpec = None + dataset_partitions.append( + { + "dataset_urn": dataset_urn, + "partitionSpec": partitionSpec, + "batchSpec": batchSpec, + } + ) else: - # TODO - v2-spec - SqlAlchemyDataset support warn( - "DataHubValidationAction does not recognize this GE data asset type - {asset_type}. This is either using v2-api or execution engine other than sqlalchemy.".format( + """ + DataHubValidationAction does not recognize this GE data asset type - {asset_type}. + This is either using v2-api or execution engine other than sqlalchemy.""".format( asset_type=type(data_asset) ) ) @@ -763,7 +790,8 @@ def make_dataset_urn_from_sqlalchemy_uri( return None # If data platform is snowflake, we artificially lowercase the Database name. # This is because DataHub also does this during ingestion. - # Ref: https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155 + # Ref: + # https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py#L155 database_name = ( url_instance.database.lower() if data_platform == "snowflake"