Skip to content

Commit

Permalink
revert catalog for non-athena platform
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko committed Jan 15, 2025
1 parent e2b1a9a commit 0626fda
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 95 deletions.
59 changes: 45 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class GlueSourceConfig(
default=None,
description="The aws account id where the target glue catalog lives. If None, datahub will ingest glue in aws caller's account.",
)
catalog_name: Optional[str] = Field(
athena_catalog_name: Optional[str] = Field(
default=None, description="The aws athena catalog name"
)
ignore_resource_links: Optional[bool] = Field(
Expand Down Expand Up @@ -229,25 +229,41 @@ def platform_validator(cls, v: str) -> str:
)

def __init__(self, **data: Any):
"""Post init configuration operations."""
super().__init__(**data)
if self.catalog_id:
current_account_id = self.sts_client.get_caller_identity().get("Account")
if self.catalog_id == current_account_id:
self.catalog_name = DEFAULT_CATALOG_NAME
self._set_athena_catalog_name()

def _set_athena_catalog_name(self) -> None:
"""Set the correct athena catalog name or raise an exception in case of misconfiguration."""
if self.platform == "athena":
if self.catalog_id:
current_account_id = self.sts_client.get_caller_identity().get(

Check warning on line 240 in metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py#L240

Added line #L240 was not covered by tests
"Account"
)
if self.catalog_id == current_account_id:
self.athena_catalog_name = DEFAULT_CATALOG_NAME

Check warning on line 244 in metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py#L243-L244

Added lines #L243 - L244 were not covered by tests
else:
self._validate_athena_catalog_name()

Check warning on line 246 in metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py#L246

Added line #L246 was not covered by tests
else:
self._validate_catalog_name()
self.athena_catalog_name = DEFAULT_CATALOG_NAME
else:
self.catalog_name = DEFAULT_CATALOG_NAME
self.athena_catalog_name = None

def _validate_athena_catalog_name(self) -> None:
"""Validate if athena catalog name is set correctly.
def _validate_catalog_name(self) -> None:
This method helps to avoid issue when the `athena_catalog_name` does not exist in a specified AWS account.
"""
effective_catalog_id = (

Check warning on line 257 in metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py#L257

Added line #L257 was not covered by tests
self.athena_client.get_data_catalog(Name=self.catalog_name)["DataCatalog"]
self.athena_client.get_data_catalog(Name=self.athena_catalog_name)[
"DataCatalog"
]
.get("Parameters", {})
.get("catalog-id", "")
)
if effective_catalog_id != self.catalog_id:
raise ValueError(

Check warning on line 265 in metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py#L264-L265

Added lines #L264 - L265 were not covered by tests
f"Catalog configuration mismatch for catalog name {self.catalog_name}."
f"Catalog configuration mismatch for catalog name {self.athena_catalog_name}."
f"Effective catalog_id: {effective_catalog_id}, configured catalog_id: {self.catalog_id}."
)

Expand Down Expand Up @@ -495,8 +511,16 @@ def process_dataflow_node(

# if data object is Glue table
if "database" in node_args and "table_name" in node_args:
full_table_name = f"{self.source_config.catalog_name}.{node_args['database']}.{node_args['table_name']}"

full_table_name = ".".join(
filter(
None,
[
self.source_config.athena_catalog_name,
node_args["database"],
node_args["table_name"],
],
)
)
# we know that the table will already be covered when ingesting Glue tables
node_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
Expand Down Expand Up @@ -1120,8 +1144,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def _gen_table_wu(self, table: TablePaginatorTypeDef) -> Iterable[MetadataWorkUnit]:
database_name = table["DatabaseName"]
table_name = table["Name"]
full_table_name = (
f"{self.source_config.catalog_name}.{database_name}.{table_name}"
full_table_name = ".".join(
filter(
None,
[
self.source_config.athena_catalog_name,
database_name,
table_name,
],
)
)
self.report.report_table_scanned()
if not self.source_config.database_pattern.allowed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -133,7 +133,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.test-database.test_jsons_markers",
"schemaName": "test-database.test_jsons_markers",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -250,7 +250,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -269,7 +269,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand All @@ -286,7 +286,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
Expand All @@ -309,7 +309,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -346,7 +346,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.test-database.test_parquet",
"schemaName": "test-database.test_parquet",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -464,7 +464,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -483,7 +483,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand All @@ -500,7 +500,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
Expand All @@ -522,7 +522,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.flights-database.avro,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand All @@ -82,7 +82,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.delta-database.delta_table_1",
"schemaName": "delta-database.delta_table_1",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -1525,7 +1525,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -1538,7 +1538,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -105,7 +105,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -118,7 +118,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.awsdatacatalog.delta-database.delta_table_1,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,delta_platform_instance.delta-database.delta_table_1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand Down
28 changes: 14 additions & 14 deletions metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.flights-database.avro,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -205,7 +205,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.flights-database.avro",
"schemaName": "flights-database.avro",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -370,7 +370,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.flights-database.avro,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -383,7 +383,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.flights-database.avro,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand All @@ -395,7 +395,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -432,7 +432,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.test-database.test_jsons_markers",
"schemaName": "test-database.test_jsons_markers",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -555,7 +555,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -568,7 +568,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_jsons_markers,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand All @@ -580,7 +580,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
Expand Down Expand Up @@ -617,7 +617,7 @@
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "awsdatacatalog.test-database.test_parquet",
"schemaName": "test-database.test_parquet",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
Expand Down Expand Up @@ -741,7 +741,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
Expand All @@ -754,7 +754,7 @@
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
Expand Down Expand Up @@ -896,7 +896,7 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.flights-database.avro,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)"
],
"outputDatasets": [],
"inputDatajobs": []
Expand Down Expand Up @@ -1085,7 +1085,7 @@
{
"com.linkedin.pegasus2avro.datajob.DataJobInputOutput": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:glue,awsdatacatalog.test-database.test_parquet,PROD)"
"urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)"
],
"outputDatasets": [],
"inputDatajobs": []
Expand Down
Loading

0 comments on commit 0626fda

Please sign in to comment.