diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py index ce45a5c9b95dcc..161aed5bb59881 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union import boto3 from boto3.session import Session @@ -107,6 +107,14 @@ class AwsConnectionConfig(ConfigModel): default=None, description="A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.", ) + aws_retry_num: int = Field( + default=5, + description="Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.", + ) + aws_retry_mode: Literal["legacy", "standard", "adaptive"] = Field( + default="standard", + description="Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.", + ) read_timeout: float = Field( default=DEFAULT_TIMEOUT, @@ -199,6 +207,10 @@ def _aws_config(self) -> Config: return Config( proxies=self.aws_proxy, read_timeout=self.read_timeout, + retries={ + "max_attempts": self.aws_retry_num, + "mode": self.aws_retry_mode, + }, **self.aws_advanced_config, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker.py index b63fa57f069b5b..55b8f4d889072d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker.py @@ -1,3 +1,4 @@ +import logging from collections import defaultdict from typing import TYPE_CHECKING, DefaultDict, Dict, Iterable, List, Optional @@ -36,6 +37,8 @@ if TYPE_CHECKING: from mypy_boto3_sagemaker import SageMakerClient +logger = logging.getLogger(__name__) + @platform_name("SageMaker") @config_class(SagemakerSourceConfig) @@ -75,6 +78,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + logger.info("Starting SageMaker ingestion...") # get common lineage graph lineage_processor = LineageProcessor( sagemaker_client=self.sagemaker_client, env=self.env, report=self.report @@ -83,6 +87,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # extract feature groups if specified if self.source_config.extract_feature_groups: + logger.info("Extracting feature groups...") feature_group_processor = FeatureGroupProcessor( sagemaker_client=self.sagemaker_client, env=self.env, report=self.report ) @@ -95,6 +100,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # extract jobs if specified if self.source_config.extract_jobs is not False: + logger.info("Extracting jobs...") job_processor = JobProcessor( sagemaker_client=self.client_factory.get_client, env=self.env, @@ -109,6 +115,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # extract models if specified if self.source_config.extract_models: + logger.info("Extracting models...") + model_processor = ModelProcessor( sagemaker_client=self.sagemaker_client, env=self.env, diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/common.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/common.py index 45dadab7c24dff..73d8d33dd11be7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/common.py @@ -40,8 +40,11 @@ class SagemakerSourceReport(StaleEntityRemovalSourceReport): groups_scanned = 0 models_scanned = 0 jobs_scanned = 0 + jobs_processed = 0 datasets_scanned = 0 filtered: List[str] = field(default_factory=list) + model_endpoint_lineage = 0 + model_group_lineage = 0 def report_feature_group_scanned(self) -> None: self.feature_groups_scanned += 1 @@ -58,6 +61,9 @@ def report_group_scanned(self) -> None: def report_model_scanned(self) -> None: self.models_scanned += 1 + def report_job_processed(self) -> None: + self.jobs_processed += 1 + def report_job_scanned(self) -> None: self.jobs_scanned += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py index 73a83295ec8cba..be0a99c6d32346 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/jobs.py @@ -1,3 +1,4 @@ +import logging from collections import defaultdict from dataclasses import dataclass, field from enum import Enum @@ -49,6 +50,8 @@ if TYPE_CHECKING: from mypy_boto3_sagemaker import SageMakerClient +logger = logging.getLogger(__name__) + JobInfo = TypeVar( "JobInfo", AutoMlJobInfo, @@ -274,15 +277,18 @@ def get_job_details(self, job_name: str, job_type: JobType) -> Dict[str, Any]: ) def get_workunits(self) -> Iterable[MetadataWorkUnit]: + logger.info("Getting all SageMaker jobs") jobs = self.get_all_jobs() processed_jobs: Dict[str, SageMakerJob] = {} + logger.info("Processing SageMaker jobs") # first pass: process jobs and collect datasets used + logger.info("first pass: process jobs and collect datasets used") for job in jobs: job_type = job_type_to_info[job["type"]] job_name = job[job_type.list_name_key] - + logger.debug(f"Processing job {job_name} with type {job_type}") job_details = self.get_job_details(job_name, job["type"]) processed_job = getattr(self, job_type.processor)(job_details) @@ -293,6 +299,9 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # second pass: # - move output jobs to inputs # - aggregate i/o datasets + logger.info( + "second pass: move output jobs to inputs and aggregate i/o datasets" + ) for job_urn in sorted(processed_jobs): processed_job = processed_jobs[job_urn] @@ -301,6 +310,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: all_datasets.update(processed_job.input_datasets) all_datasets.update(processed_job.output_datasets) + self.report.report_job_processed() # yield datasets for dataset_urn, dataset in all_datasets.items(): @@ -322,6 +332,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.report.report_dataset_scanned() # third pass: construct and yield MCEs + logger.info("third pass: construct and yield MCEs") for job_urn in sorted(processed_jobs): processed_job = processed_jobs[job_urn] job_snapshot = processed_job.job_snapshot diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/lineage.py index b677dccad24ac4..24e5497269c738 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/lineage.py @@ -1,3 +1,4 @@ +import logging from collections import defaultdict from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Set @@ -6,6 +7,8 @@ SagemakerSourceReport, ) +logger = logging.getLogger(__name__) + if TYPE_CHECKING: from mypy_boto3_sagemaker import SageMakerClient from mypy_boto3_sagemaker.type_defs import ( @@ -88,7 +91,6 @@ def get_all_contexts(self) -> List["ContextSummaryTypeDef"]: paginator = self.sagemaker_client.get_paginator("list_contexts") for page in paginator.paginate(): contexts += page["ContextSummaries"] - return contexts def get_incoming_edges(self, node_arn: str) -> List["AssociationSummaryTypeDef"]: @@ -225,27 +227,32 @@ def get_lineage(self) -> LineageInfo: """ Get the lineage of all artifacts in SageMaker. """ - + logger.info("Getting lineage for SageMaker artifacts...") + logger.info("Getting all actions") for action in self.get_all_actions(): self.nodes[action["ActionArn"]] = {**action, "node_type": "action"} + logger.info("Getting all artifacts") for artifact in self.get_all_artifacts(): self.nodes[artifact["ArtifactArn"]] = {**artifact, "node_type": "artifact"} + logger.info("Getting all contexts") for context in self.get_all_contexts(): self.nodes[context["ContextArn"]] = {**context, "node_type": "context"} + logger.info("Getting lineage for model deployments and model groups") for node_arn, node in self.nodes.items(): + logger.debug(f"Getting lineage for node {node_arn}") # get model-endpoint lineage if ( node["node_type"] == "action" and node.get("ActionType") == "ModelDeployment" ): self.get_model_deployment_lineage(node_arn) - + self.report.model_endpoint_lineage += 1 # get model-group lineage if ( node["node_type"] == "context" and node.get("ContextType") == "ModelGroup" ): self.get_model_group_lineage(node_arn, node) - + self.report.model_group_lineage += 1 return self.lineage_info diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py index f1691b5df68a94..8c9ba3b458ab25 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py @@ -1,5 +1,4 @@ import os -from abc import ABC from dataclasses import dataclass from enum import Enum from typing import Any, Dict, List, Optional @@ -12,18 +11,8 @@ TRACE_POWERBI_MQUERY_PARSER = os.getenv("DATAHUB_TRACE_POWERBI_MQUERY_PARSER", False) -class AbstractIdentifierAccessor(ABC): # To pass lint - pass - - -# @dataclass -# class ItemSelector: -# items: Dict[str, Any] -# next: Optional[AbstractIdentifierAccessor] - - @dataclass -class IdentifierAccessor(AbstractIdentifierAccessor): +class IdentifierAccessor: """ statement public_order_date = Source{[Schema="public",Item="order_date"]}[Data] @@ -40,7 +29,7 @@ class IdentifierAccessor(AbstractIdentifierAccessor): identifier: str items: Dict[str, Any] - next: Optional[AbstractIdentifierAccessor] + next: Optional["IdentifierAccessor"] @dataclass diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py index 13d97a70290298..ffaed79f4e42a6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py @@ -1,7 +1,7 @@ import logging from abc import ABC, abstractmethod from enum import Enum -from typing import Dict, List, Optional, Tuple, Type, Union, cast +from typing import Dict, List, Optional, Tuple, Type, cast from lark import Tree @@ -22,7 +22,6 @@ ) from datahub.ingestion.source.powerbi.m_query import native_sql_parser, tree_function from datahub.ingestion.source.powerbi.m_query.data_classes import ( - AbstractIdentifierAccessor, DataAccessFunctionDetail, DataPlatformTable, FunctionName, @@ -412,33 +411,25 @@ def create_lineage( ) table_detail: Dict[str, str] = {} temp_accessor: Optional[ - Union[IdentifierAccessor, AbstractIdentifierAccessor] + IdentifierAccessor ] = data_access_func_detail.identifier_accessor while temp_accessor: - if isinstance(temp_accessor, IdentifierAccessor): - # Condition to handle databricks M-query pattern where table, schema and database all are present in - # the same invoke statement - if all( - element in temp_accessor.items - for element in ["Item", "Schema", "Catalog"] - ): - table_detail["Schema"] = temp_accessor.items["Schema"] - table_detail["Table"] = temp_accessor.items["Item"] - else: - table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[ - "Name" - ] - - if temp_accessor.next is not None: - temp_accessor = temp_accessor.next - else: - break + # Condition to handle databricks M-query pattern where table, schema and database all are present in + # the same invoke statement + if all( + element in temp_accessor.items + for element in ["Item", "Schema", "Catalog"] + ): + table_detail["Schema"] = temp_accessor.items["Schema"] + table_detail["Table"] = temp_accessor.items["Item"] else: - logger.debug( - "expecting instance to be IdentifierAccessor, please check if parsing is done properly" - ) - return Lineage.empty() + table_detail[temp_accessor.items["Kind"]] = temp_accessor.items["Name"] + + if temp_accessor.next is not None: + temp_accessor = temp_accessor.next + else: + break table_reference = self.create_reference_table( arg_list=data_access_func_detail.arg_list, @@ -786,9 +777,10 @@ def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]: def create_lineage( self, data_access_func_detail: DataAccessFunctionDetail ) -> Lineage: - t1: Tree = cast( - Tree, tree_function.first_arg_list_func(data_access_func_detail.arg_list) + t1: Optional[Tree] = tree_function.first_arg_list_func( + data_access_func_detail.arg_list ) + assert t1 is not None flat_argument_list: List[Tree] = tree_function.flat_argument_list(t1) if len(flat_argument_list) != 2: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 81a0e1ef2d79b1..2756a113d1ef0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Tuple, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union from lark import Tree @@ -95,14 +95,12 @@ def get_item_selector_tokens( # remove whitespaces and quotes from token tokens: List[str] = tree_function.strip_char_from_list( tree_function.remove_whitespaces_from_list( - tree_function.token_values( - cast(Tree, item_selector), parameters=self.parameters - ) + tree_function.token_values(item_selector, parameters=self.parameters) ), ) identifier: List[str] = tree_function.token_values( - cast(Tree, identifier_tree) - ) # type :ignore + identifier_tree, parameters={} + ) # convert tokens to dict iterator = iter(tokens) @@ -238,10 +236,10 @@ def _process_invoke_expression( def _process_item_selector_expression( self, rh_tree: Tree ) -> Tuple[Optional[str], Optional[Dict[str, str]]]: - new_identifier, key_vs_value = self.get_item_selector_tokens( # type: ignore - cast(Tree, tree_function.first_expression_func(rh_tree)) - ) + first_expression: Optional[Tree] = tree_function.first_expression_func(rh_tree) + assert first_expression is not None + new_identifier, key_vs_value = self.get_item_selector_tokens(first_expression) return new_identifier, key_vs_value @staticmethod @@ -327,7 +325,7 @@ def internal( # The first argument can be a single table argument or list of table. # For example Table.Combine({t1,t2},....), here first argument is list of table. # Table.AddColumn(t1,....), here first argument is single table. - for token in cast(List[str], result): + for token in result: internal(token, identifier_accessor) else: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py index 186f03fe136393..d48e251bd00906 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/tree_function.py @@ -1,6 +1,6 @@ import logging from functools import partial -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Union from lark import Token, Tree @@ -58,7 +58,7 @@ def internal(node: Union[Tree, Token]) -> Optional[Tree]: if isinstance(node, Token): return None - for child in cast(Tree, node).children: + for child in node.children: child_node: Optional[Tree] = internal(child) if child_node is not None: return child_node @@ -99,7 +99,7 @@ def internal(node: Union[Tree, Token]) -> None: logger.debug(f"Unable to resolve parameter reference to {ref}") values.append(ref) elif isinstance(node, Token): - values.append(cast(Token, node).value) + values.append(node.value) return else: for child in node.children: diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py index 6f53223e000f1b..7b0bc89648c529 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/preset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -85,6 +85,7 @@ def __init__(self, ctx: PipelineContext, config: PresetConfig): super().__init__(ctx, config) self.config = config self.report = StaleEntityRemovalSourceReport() + self.platform = "preset" def login(self): try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 229c0e292fbafe..c30a26fbbd02cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict from dataclasses import dataclass -from typing import Dict, List, Optional, Set, cast +from typing import Dict, List, Optional, Set import pydantic from pydantic import Field, SecretStr, root_validator, validator @@ -118,9 +118,10 @@ def validate_legacy_schema_pattern(cls, values: Dict) -> Dict: ) # Always exclude reporting metadata for INFORMATION_SCHEMA schema - if schema_pattern is not None and schema_pattern: + if schema_pattern: logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.") - cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$") + assert isinstance(schema_pattern, AllowDenyPattern) + schema_pattern.deny.append(r".*INFORMATION_SCHEMA$") return values diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 662e1cc2509eae..bb5d0636f67123 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -132,7 +132,7 @@ def tables_for_database(db_name: Optional[str]) -> str: auto_clustering_on AS "AUTO_CLUSTERING_ON" FROM {db_clause}information_schema.tables t WHERE table_schema != 'INFORMATION_SCHEMA' - and table_type in ( 'BASE TABLE', 'EXTERNAL TABLE') + and table_type in ( 'BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE') order by table_schema, table_name""" @staticmethod @@ -152,7 +152,7 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str: auto_clustering_on AS "AUTO_CLUSTERING_ON" FROM {db_clause}information_schema.tables t where table_schema='{schema_name}' - and table_type in ('BASE TABLE', 'EXTERNAL TABLE') + and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE') order by table_schema, table_name""" @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index 7a2dbda8b4a939..414c1faaa1661a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -5,8 +5,6 @@ import pydantic import sqlalchemy.dialects.mssql - -# This import verifies that the dependencies are available. from pydantic.fields import Field from sqlalchemy import create_engine, inspect from sqlalchemy.engine.base import Connection diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 64aa8cfc6ef6c7..4e22930e7a2a0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -582,6 +582,8 @@ def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: generate_operations=False, ) for dataset_name in self._view_definition_cache.keys(): + # TODO: Ensure that the lineage generated from the view definition + # matches the dataset_name. view_definition = self._view_definition_cache[dataset_name] result = self._run_sql_parser( dataset_name, @@ -1059,6 +1061,20 @@ def loop_views( exc=e, ) + def _get_view_definition(self, inspector: Inspector, schema: str, view: str) -> str: + try: + view_definition = inspector.get_view_definition(view, schema) + if view_definition is None: + view_definition = "" + else: + # Some dialects return a TextClause instead of a raw string, + # so we need to convert them to a string. + view_definition = str(view_definition) + except NotImplementedError: + view_definition = "" + + return view_definition + def _process_view( self, dataset_name: str, @@ -1077,7 +1093,10 @@ def _process_view( columns = inspector.get_columns(view, schema) except KeyError: # For certain types of views, we are unable to fetch the list of columns. - self.warn(logger, dataset_name, "unable to get schema for this view") + self.report.warning( + message="Unable to get schema for a view", + context=f"{dataset_name}", + ) schema_metadata = None else: schema_fields = self.get_schema_fields(dataset_name, columns, inspector) @@ -1091,19 +1110,12 @@ def _process_view( if self._save_schema_to_resolver(): self.schema_resolver.add_schema_metadata(dataset_urn, schema_metadata) self.discovered_datasets.add(dataset_name) + description, properties, _ = self.get_table_properties(inspector, schema, view) - try: - view_definition = inspector.get_view_definition(view, schema) - if view_definition is None: - view_definition = "" - else: - # Some dialects return a TextClause instead of a raw string, - # so we need to convert them to a string. - view_definition = str(view_definition) - except NotImplementedError: - view_definition = "" - properties["view_definition"] = view_definition properties["is_view"] = "True" + + view_definition = self._get_view_definition(inspector, schema, view) + properties["view_definition"] = view_definition if view_definition and self.config.include_view_lineage: self._view_definition_cache[dataset_name] = view_definition @@ -1135,15 +1147,14 @@ def _process_view( entityUrn=dataset_urn, aspect=SubTypesClass(typeNames=[DatasetSubTypes.VIEW]), ).as_workunit() - if "view_definition" in properties: - view_definition_string = properties["view_definition"] - view_properties_aspect = ViewPropertiesClass( - materialized=False, viewLanguage="SQL", viewLogic=view_definition_string - ) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, - aspect=view_properties_aspect, - ).as_workunit() + + view_properties_aspect = ViewPropertiesClass( + materialized=False, viewLanguage="SQL", viewLogic=view_definition + ) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=view_properties_aspect, + ).as_workunit() if self.config.domain and self.domain_registry: yield from get_domain_wu( diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 5ce33da5c55fac..1da233bf0b22ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -1,10 +1,12 @@ import json import logging +from datetime import datetime from functools import lru_cache -from typing import Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional import dateutil.parser as dp import requests +from pydantic import BaseModel from pydantic.class_validators import root_validator, validator from pydantic.fields import Field @@ -16,7 +18,9 @@ from datahub.emitter.mce_builder import ( make_chart_urn, make_dashboard_urn, + make_data_platform_urn, make_dataset_urn, + make_dataset_urn_with_platform_instance, make_domain_urn, ) from datahub.emitter.mcp_builder import add_domain_to_entity_wu @@ -31,6 +35,7 @@ ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( get_platform_from_sqlalchemy_uri, ) @@ -47,16 +52,26 @@ AuditStamp, ChangeAuditStamps, Status, + TimeStamp, ) from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( ChartSnapshot, DashboardSnapshot, + DatasetSnapshot, ) from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + MySqlDDL, + NullType, + SchemaField, + SchemaFieldDataType, + SchemaMetadata, +) from datahub.metadata.schema_classes import ( ChartInfoClass, ChartTypeClass, DashboardInfoClass, + DatasetPropertiesClass, ) from datahub.utilities import config_clean from datahub.utilities.registries.domain_registry import DomainRegistry @@ -82,9 +97,29 @@ "box_plot": ChartTypeClass.BAR, } + platform_without_databases = ["druid"] +class SupersetDataset(BaseModel): + id: int + table_name: str + changed_on_utc: Optional[str] = None + explore_url: Optional[str] = "" + + @property + def modified_dt(self) -> Optional[datetime]: + if self.changed_on_utc: + return dp.parse(self.changed_on_utc) + return None + + @property + def modified_ts(self) -> Optional[int]: + if self.modified_dt: + return int(self.modified_dt.timestamp() * 1000) + return None + + class SupersetConfig( StatefulIngestionConfigBase, EnvConfigMixin, PlatformInstanceConfigMixin ): @@ -103,15 +138,17 @@ class SupersetConfig( ) username: Optional[str] = Field(default=None, description="Superset username.") password: Optional[str] = Field(default=None, description="Superset password.") - api_key: Optional[str] = Field(default=None, description="Preset.io API key.") - api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") - manager_uri: str = Field( - default="https://api.app.preset.io", description="Preset.io API URL" - ) # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( default=None, description="Superset Stateful Ingestion Config." ) + ingest_dashboards: bool = Field( + default=True, description="Enable to ingest dashboards." + ) + ingest_charts: bool = Field(default=True, description="Enable to ingest charts.") + ingest_datasets: bool = Field( + default=False, description="Enable to ingest datasets." + ) provider: str = Field(default="db", description="Superset provider.") options: Dict = Field(default={}, description="") @@ -123,6 +160,10 @@ class SupersetConfig( description="Can be used to change mapping for database names in superset to what you have in datahub", ) + class Config: + # This is required to allow preset configs to get parsed + extra = "allow" + @validator("connect_uri", "display_uri") def remove_trailing_slash(cls, v): return config_clean.remove_trailing_slashes(v) @@ -229,6 +270,28 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: config = SupersetConfig.parse_obj(config_dict) return cls(ctx, config) + def paginate_entity_api_results(self, entity_type, page_size=100): + current_page = 0 + total_items = page_size + + while current_page * page_size < total_items: + response = self.session.get( + f"{self.config.connect_uri}/api/v1/{entity_type}/", + params={"q": f"(page:{current_page},page_size:{page_size})"}, + ) + + if response.status_code != 200: + logger.warning(f"Failed to get {entity_type} data: {response.text}") + + payload = response.json() + # Update total_items with the actual count from the response + total_items = payload.get("count", total_items) + # Yield each item in the result, this gets passed into the construct functions + for item in payload.get("result", []): + yield item + + current_page += 1 + @lru_cache(maxsize=None) def get_platform_from_database_id(self, database_id): database_response = self.session.get( @@ -250,11 +313,18 @@ def get_platform_from_database_id(self, database_id): return platform_name @lru_cache(maxsize=None) - def get_datasource_urn_from_id(self, datasource_id): + def get_dataset_info(self, dataset_id: int) -> dict: dataset_response = self.session.get( - f"{self.config.connect_uri}/api/v1/dataset/{datasource_id}" - ).json() - + f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}", + ) + if dataset_response.status_code != 200: + logger.warning(f"Failed to get dataset info: {dataset_response.text}") + dataset_response.raise_for_status() + return dataset_response.json() + + def get_datasource_urn_from_id( + self, dataset_response: dict, platform_instance: str + ) -> str: schema_name = dataset_response.get("result", {}).get("schema") table_name = dataset_response.get("result", {}).get("table_name") database_id = dataset_response.get("result", {}).get("database", {}).get("id") @@ -283,9 +353,11 @@ def get_datasource_urn_from_id(self, datasource_id): ), env=self.config.env, ) - return None + raise ValueError("Could not construct dataset URN") - def construct_dashboard_from_api_data(self, dashboard_data): + def construct_dashboard_from_api_data( + self, dashboard_data: dict + ) -> DashboardSnapshot: dashboard_urn = make_dashboard_urn( platform=self.platform, name=dashboard_data["id"], @@ -340,7 +412,7 @@ def construct_dashboard_from_api_data(self, dashboard_data): } if dashboard_data.get("certified_by"): - custom_properties["CertifiedBy"] = dashboard_data.get("certified_by") + custom_properties["CertifiedBy"] = dashboard_data.get("certified_by", "") custom_properties["CertificationDetails"] = str( dashboard_data.get("certification_details") ) @@ -358,38 +430,25 @@ def construct_dashboard_from_api_data(self, dashboard_data): return dashboard_snapshot def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: - current_dashboard_page = 0 - # we will set total dashboards to the actual number after we get the response - total_dashboards = PAGE_SIZE - - while current_dashboard_page * PAGE_SIZE <= total_dashboards: - dashboard_response = self.session.get( - f"{self.config.connect_uri}/api/v1/dashboard/", - params=f"q=(page:{current_dashboard_page},page_size:{PAGE_SIZE})", - ) - if dashboard_response.status_code != 200: - logger.warning( - f"Failed to get dashboard data: {dashboard_response.text}" - ) - dashboard_response.raise_for_status() - - payload = dashboard_response.json() - total_dashboards = payload.get("count") or 0 - - current_dashboard_page += 1 - - for dashboard_data in payload["result"]: + for dashboard_data in self.paginate_entity_api_results("dashboard", PAGE_SIZE): + try: dashboard_snapshot = self.construct_dashboard_from_api_data( dashboard_data ) - mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) - yield MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce) - yield from self._get_domain_wu( - title=dashboard_data.get("dashboard_title", ""), - entity_urn=dashboard_snapshot.urn, + except Exception as e: + self.report.warning( + f"Failed to construct dashboard snapshot. Dashboard name: {dashboard_data.get('dashboard_title')}. Error: \n{e}" ) + continue + # Emit the dashboard + mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) + yield MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=dashboard_data.get("dashboard_title", ""), + entity_urn=dashboard_snapshot.urn, + ) - def construct_chart_from_chart_data(self, chart_data): + def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: chart_urn = make_chart_urn( platform=self.platform, name=chart_data["id"], @@ -415,9 +474,12 @@ def construct_chart_from_chart_data(self, chart_data): chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}" datasource_id = chart_data.get("datasource_id") - datasource_urn = self.get_datasource_urn_from_id(datasource_id) + dataset_response = self.get_dataset_info(datasource_id) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) - params = json.loads(chart_data.get("params")) + params = json.loads(chart_data.get("params", "{}")) metrics = [ get_metric_name(metric) for metric in (params.get("metrics", []) or [params.get("metric")]) @@ -467,36 +529,124 @@ def construct_chart_from_chart_data(self, chart_data): return chart_snapshot def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: - current_chart_page = 0 - # we will set total charts to the actual number after we get the response - total_charts = PAGE_SIZE - - while current_chart_page * PAGE_SIZE <= total_charts: - chart_response = self.session.get( - f"{self.config.connect_uri}/api/v1/chart/", - params=f"q=(page:{current_chart_page},page_size:{PAGE_SIZE})", + for chart_data in self.paginate_entity_api_results("chart", PAGE_SIZE): + try: + chart_snapshot = self.construct_chart_from_chart_data(chart_data) + + mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot) + except Exception as e: + self.report.warning( + f"Failed to construct chart snapshot. Chart name: {chart_data.get('table_name')}. Error: \n{e}" + ) + continue + # Emit the chart + yield MetadataWorkUnit(id=chart_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=chart_data.get("slice_name", ""), + entity_urn=chart_snapshot.urn, ) - if chart_response.status_code != 200: - logger.warning(f"Failed to get chart data: {chart_response.text}") - chart_response.raise_for_status() - current_chart_page += 1 + def gen_schema_fields(self, column_data: List[Dict[str, str]]) -> List[SchemaField]: + schema_fields: List[SchemaField] = [] + for col in column_data: + col_type = (col.get("type") or "").lower() + data_type = resolve_sql_type(col_type) + if data_type is None: + data_type = NullType() + + field = SchemaField( + fieldPath=col.get("column_name", ""), + type=SchemaFieldDataType(data_type), + nativeDataType="", + description=col.get("column_name", ""), + nullable=True, + ) + schema_fields.append(field) + return schema_fields + + def gen_schema_metadata( + self, + dataset_response: dict, + ) -> SchemaMetadata: + dataset_response = dataset_response.get("result", {}) + column_data = dataset_response.get("columns", []) + schema_metadata = SchemaMetadata( + schemaName=dataset_response.get("table_name", ""), + platform=make_data_platform_urn(self.platform), + version=0, + hash="", + platformSchema=MySqlDDL(tableSchema=""), + fields=self.gen_schema_fields(column_data), + ) + return schema_metadata - payload = chart_response.json() - total_charts = payload["count"] - for chart_data in payload["result"]: - chart_snapshot = self.construct_chart_from_chart_data(chart_data) + def gen_dataset_urn(self, datahub_dataset_name: str) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, + name=datahub_dataset_name, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) - mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot) - yield MetadataWorkUnit(id=chart_snapshot.urn, mce=mce) - yield from self._get_domain_wu( - title=chart_data.get("slice_name", ""), - entity_urn=chart_snapshot.urn, + def construct_dataset_from_dataset_data( + self, dataset_data: dict + ) -> DatasetSnapshot: + dataset_response = self.get_dataset_info(dataset_data.get("id")) + dataset = SupersetDataset(**dataset_response["result"]) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) + + dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}" + + dataset_info = DatasetPropertiesClass( + name=dataset.table_name, + description="", + lastModified=TimeStamp(time=dataset.modified_ts) + if dataset.modified_ts + else None, + externalUrl=dataset_url, + ) + aspects_items: List[Any] = [] + aspects_items.extend( + [ + self.gen_schema_metadata(dataset_response), + dataset_info, + ] + ) + + dataset_snapshot = DatasetSnapshot( + urn=datasource_urn, + aspects=aspects_items, + ) + return dataset_snapshot + + def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]: + for dataset_data in self.paginate_entity_api_results("dataset", PAGE_SIZE): + try: + dataset_snapshot = self.construct_dataset_from_dataset_data( + dataset_data ) + mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) + except Exception as e: + self.report.warning( + f"Failed to construct dataset snapshot. Dataset name: {dataset_data.get('table_name')}. Error: \n{e}" + ) + continue + # Emit the dataset + yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=dataset_data.get("table_name", ""), + entity_urn=dataset_snapshot.urn, + ) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - yield from self.emit_dashboard_mces() - yield from self.emit_chart_mces() + if self.config.ingest_dashboards: + yield from self.emit_dashboard_mces() + if self.config.ingest_charts: + yield from self.emit_chart_mces() + if self.config.ingest_datasets: + yield from self.emit_dataset_mces() def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ diff --git a/metadata-ingestion/tests/integration/superset/golden_test_ingest.json b/metadata-ingestion/tests/integration/superset/golden_test_ingest.json index 767b85a72b975a..4801af9465f2c9 100644 --- a/metadata-ingestion/tests/integration/superset/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/superset/golden_test_ingest.json @@ -26,6 +26,7 @@ "urn:li:chart:(superset,11)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, @@ -73,6 +74,7 @@ "urn:li:chart:(superset,13)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, diff --git a/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json b/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json index 0f207990179793..ac6a3b6942a32f 100644 --- a/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json +++ b/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json @@ -26,6 +26,7 @@ "urn:li:chart:(superset,11)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, @@ -44,7 +45,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -92,7 +93,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -140,7 +141,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -188,47 +189,413 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } }, { "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { - "urn": "urn:li:chart:(superset,13)", + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", "aspects": [ { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] } }, { - "com.linkedin.pegasus2avro.chart.ChartInfo": { - "customProperties": { - "Metrics": "", - "Filters": "", - "Dimensions": "" + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" }, - "title": "test_chart_title_4", + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, "lastModified": { - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 1586847600000, - "actor": "urn:li:corpuser:test_username_2" + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" } }, - "chartUrl": "mock://mock-domain.superset.com/explore/test_chart_url_13", - "inputs": [ - { - "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" } - ], - "type": "HISTOGRAM" + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] } } ] @@ -236,7 +603,41 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema1.Test Table 1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -253,7 +654,24 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(superset,13)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } diff --git a/metadata-ingestion/tests/integration/superset/test_superset.py b/metadata-ingestion/tests/integration/superset/test_superset.py index b3b59820161467..e8251e54a1f85a 100644 --- a/metadata-ingestion/tests/integration/superset/test_superset.py +++ b/metadata-ingestion/tests/integration/superset/test_superset.py @@ -133,6 +133,222 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: ], }, }, + "mock://mock-domain.superset.com/api/v1/dataset/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 215, + "description_columns": {}, + "ids": [1, 2, 3], + "result": [ + { + "changed_by": { + "first_name": "Test", + "id": 1, + "last_name": "User1", + "username": "test_username_1", + }, + "changed_by_name": "test_username_1", + "changed_on_delta_humanized": "10 months ago", + "changed_on_utc": "2024-01-05T21:10:15.650819+0000", + "database": {"database_name": "test_database1", "id": 1}, + "datasource_type": "table", + "default_endpoint": None, + "description": None, + "explore_url": "/explore/?datasource_type=table&datasource_id=1", + "extra": None, + "id": 1, + "kind": "virtual", + "owners": [ + { + "first_name": "Test", + "id": 1, + "last_name": "Owner1", + "username": "test_username_1", + } + ], + "schema": "test_schema1", + "sql": "SELECT * FROM test_table1", + "table_name": "Test Table 1", + }, + { + "changed_by": { + "first_name": "Test", + "id": 2, + "last_name": "User2", + "username": "test_username_2", + }, + "changed_by_name": "test_username_2", + "changed_on_delta_humanized": "9 months ago", + "changed_on_utc": "2024-02-10T15:30:20.123456+0000", + "database": {"database_name": "test_database2", "id": 2}, + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "explore_url": "/explore/?datasource_type=table&datasource_id=2", + "extra": None, + "id": 2, + "kind": "physical", + "owners": [ + { + "first_name": "Test", + "id": 2, + "last_name": "Owner2", + "username": "test_username_2", + } + ], + "schema": "test_schema2", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + }, + ], + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/1": { + "method": "GET", + "status_code": 200, + "json": { + "id": 1, + "result": { + "always_filter_main_dttm": False, + "cache_timeout": None, + "changed_by": {"first_name": "Test", "last_name": "User1"}, + "changed_on": "2024-01-05T21:10:15.650819+0000", + "changed_on_humanized": "10 months ago", + "created_by": {"first_name": "Test", "last_name": "User1"}, + "created_on": "2024-01-05T21:10:15.650819+0000", + "created_on_humanized": "10 months ago", + "currency_formats": {}, + "database": { + "backend": "postgresql", + "database_name": "test_database1", + "id": 1, + }, + "datasource_name": "Test Table 1", + "datasource_type": "table", + "default_endpoint": None, + "description": None, + "extra": None, + "fetch_values_predicate": None, + "filter_select_enabled": True, + "granularity_sqla": [ + ["created_at", "created_at"], + ["updated_at", "updated_at"], + ], + "id": 1, + "is_managed_externally": False, + "is_sqllab_view": False, + "kind": "virtual", + "main_dttm_col": None, + "metrics": [ + { + "changed_on": "2024-01-05T21:10:15.650819+0000", + "created_on": "2024-01-05T21:10:15.650819+0000", + "currency": None, + "d3format": None, + "description": None, + "expression": "count(*)", + "extra": None, + "id": 1, + "metric_name": "count", + "metric_type": None, + "rendered_expression": "count(*)", + "verbose_name": None, + "warning_text": None, + } + ], + "name": "Test Table 1", + "normalize_columns": True, + "offset": 0, + "owners": [{"first_name": "Test", "id": 1, "last_name": "Owner1"}], + "rendered_sql": "SELECT * FROM test_table1", + "schema": "test_schema1", + "select_star": "SELECT * FROM test_schema1.test_table1 LIMIT 100", + "sql": "SELECT * FROM test_table1", + "table_name": "Test Table 1", + "uid": "1__table", + "url": "/tablemodelview/edit/1", + "verbose_map": { + "__timestamp": "Time", + "id": "ID", + "name": "Name", + "created_at": "Created At", + "updated_at": "Updated At", + }, + }, + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/2": { + "method": "GET", + "status_code": 200, + "json": { + "id": 2, + "result": { + "always_filter_main_dttm": False, + "cache_timeout": None, + "changed_by": {"first_name": "Test", "last_name": "User2"}, + "changed_on": "2024-02-10T15:30:20.123456+0000", + "changed_on_humanized": "9 months ago", + "created_by": {"first_name": "Test", "last_name": "User2"}, + "created_on": "2024-02-10T15:30:20.123456+0000", + "created_on_humanized": "9 months ago", + "currency_formats": {}, + "database": { + "backend": "postgresql", + "database_name": "test_database1", + "id": 1, + }, + "datasource_name": "Test Table 2", + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "extra": None, + "fetch_values_predicate": None, + "filter_select_enabled": True, + "granularity_sqla": [["date_column", "date_column"]], + "id": 2, + "is_managed_externally": False, + "is_sqllab_view": True, + "kind": "virtual", + "main_dttm_col": "date_column", + "metrics": [ + { + "changed_on": "2024-02-10T15:30:20.123456+0000", + "created_on": "2024-02-10T15:30:20.123456+0000", + "currency": None, + "d3format": None, + "description": None, + "expression": "sum(value)", + "extra": None, + "id": 2, + "metric_name": "total_value", + "metric_type": None, + "rendered_expression": "sum(value)", + "verbose_name": "Total Value", + "warning_text": None, + } + ], + "name": "Test Table 2", + "normalize_columns": True, + "offset": 0, + "owners": [{"first_name": "Test", "id": 2, "last_name": "Owner2"}], + "rendered_sql": "SELECT * FROM test_table2", + "schema": "test_schema2", + "select_star": "SELECT * FROM test_schema2.test_table2 LIMIT 100", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + "uid": "2__table", + "url": "/tablemodelview/edit/2", + "verbose_map": { + "__timestamp": "Time", + "id": "ID", + "name": "Name", + "value": "Value", + "date_column": "Date", + }, + }, + }, + }, "mock://mock-domain.superset.com/api/v1/dataset/20": { "method": "GET", "status_code": 200, @@ -147,6 +363,19 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: }, }, }, + "mock://mock-domain.superset.com/api/v1/database/1": { + "method": "GET", + "status_code": 200, + "json": { + "id": 1, + "result": { + "configuration_method": "sqlalchemy_form", + "database_name": "test_database1", + "id": 1, + "sqlalchemy_uri": "postgresql://user:password@host:port/test_database1", + }, + }, + }, "mock://mock-domain.superset.com/api/v1/database/30": { "method": "GET", "status_code": 200, @@ -225,6 +454,8 @@ def test_superset_stateful_ingest( "username": "test_username", "password": "test_password", "provider": "db", + # Enable dataset ingestion + "ingest_datasets": True, # enable stateful ingestion "stateful_ingestion": { "enabled": True, @@ -244,7 +475,7 @@ def test_superset_stateful_ingest( "pipeline_name": "test_pipeline", } - dashboard_endpoint_override = { + asset_override = { "mock://mock-domain.superset.com/api/v1/dashboard/": { "method": "GET", "status_code": 200, @@ -276,6 +507,92 @@ def test_superset_stateful_ingest( ], }, }, + "mock://mock-domain.superset.com/api/v1/chart/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 3, + "result": [ + { + "id": "10", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_1", + "viz_type": "box_plot", + "url": "/explore/test_chart_url_10", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "11", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_2", + "viz_type": "pie", + "url": "/explore/test_chart_url_11", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "12", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_3", + "viz_type": "treemap", + "url": "/explore/test_chart_url_12", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + ], + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 214, + "description_columns": {}, + "ids": [1, 2], + "result": [ + { + "changed_by": { + "first_name": "Test", + "id": 2, + "last_name": "User2", + "username": "test_username_2", + }, + "changed_by_name": "test_username_2", + "changed_on_delta_humanized": "9 months ago", + "changed_on_utc": "2024-02-10T15:30:20.123456+0000", + "database": {"database_name": "test_database1", "id": 1}, + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "explore_url": "/explore/?datasource_type=table&datasource_id=2", + "extra": None, + "id": 2, + "kind": "physical", + "owners": [ + { + "first_name": "Test", + "id": 2, + "last_name": "Owner2", + "username": "test_username_2", + } + ], + "schema": "test_schema2", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + }, + ], + }, + }, } with patch( @@ -292,10 +609,8 @@ def test_superset_stateful_ingest( assert checkpoint1 assert checkpoint1.state - # Remove one dashboard from the superset config. - register_mock_api( - request_mock=requests_mock, override_data=dashboard_endpoint_override - ) + # Remove one dashboard, chart, dataset from the superset config. + register_mock_api(request_mock=requests_mock, override_data=asset_override) # Capture MCEs of second run to validate Status(removed=true) deleted_mces_path = f"{tmp_path}/superset_deleted_mces.json" @@ -313,15 +628,27 @@ def test_superset_stateful_ingest( # part of the second state state1 = checkpoint1.state state2 = checkpoint2.state - difference_urns = list( + dashboard_difference_urns = list( state1.get_urns_not_in(type="dashboard", other_checkpoint_state=state2) ) + chart_difference_urns = list( + state1.get_urns_not_in(type="chart", other_checkpoint_state=state2) + ) + dataset_difference_urns = list( + state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) + ) - assert len(difference_urns) == 1 + assert len(dashboard_difference_urns) == 1 + assert len(chart_difference_urns) == 1 + assert len(dataset_difference_urns) == 1 urn1 = "urn:li:dashboard:(superset,2)" + urn2 = "urn:li:chart:(superset,13)" + urn3 = "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema1.Test Table 1,PROD)" - assert urn1 in difference_urns + assert urn1 in dashboard_difference_urns + assert urn2 in chart_difference_urns + assert urn3 in dataset_difference_urns # Validate that all providers have committed successfully. validate_all_providers_have_committed_successfully( diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py index d97db651f4c795..dc81f4c8284d50 100644 --- a/metadata-ingestion/tests/unit/test_preset_source.py +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -20,3 +20,23 @@ def test_set_display_uri(): assert config.connect_uri == "" assert config.manager_uri == "https://api.app.preset.io" assert config.display_uri == display_uri + + +def test_preset_config_parsing(): + preset_config = { + "connect_uri": "https://preset.io", + "api_key": "dummy_api_key", + "api_secret": "dummy_api_secret", + "manager_uri": "https://api.app.preset.io", + } + + # Tests if SupersetConfig fields are parsed extra fields correctly + config = PresetConfig.parse_obj(preset_config) + + # Test Preset-specific fields + assert config.api_key == "dummy_api_key" + assert config.api_secret == "dummy_api_secret" + assert config.manager_uri == "https://api.app.preset.io" + + # Test that regular Superset fields are still parsed + assert config.connect_uri == "https://preset.io" diff --git a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java index 0a54677eb6149b..30f98180f80180 100644 --- a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java +++ b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java @@ -98,11 +98,12 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } if (authentication != null) { + String actorUrnStr = authentication.getActor().toUrnStr(); // Successfully authenticated. log.debug( - String.format( - "Successfully authenticated request for Actor with type: %s, id: %s", - authentication.getActor().getType(), authentication.getActor().getId())); + "Successfully authenticated request for Actor with type: {}, id: {}", + authentication.getActor().getType(), + authentication.getActor().getId()); AuthenticationContext.setAuthentication(authentication); chain.doFilter(request, response); } else { @@ -110,7 +111,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha log.debug( "Failed to authenticate request. Received 'null' Authentication value from authenticator chain."); ((HttpServletResponse) response) - .sendError(HttpServletResponse.SC_UNAUTHORIZED, "Unauthorized to perform this action."); + .sendError( + HttpServletResponse.SC_UNAUTHORIZED, + "Unauthorized to perform this action due to expired auth."); return; } AuthenticationContext.remove(); diff --git a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java index de2582af00a932..5d4542cf0826e8 100644 --- a/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java +++ b/metadata-service/auth-servlet-impl/src/main/java/com/datahub/auth/authentication/AuthServiceController.java @@ -138,7 +138,9 @@ CompletableFuture> generateSessionTokenForUser( } log.info("Attempting to generate session token for user {}", userId.asText()); - final String actorId = AuthenticationContext.getAuthentication().getActor().getId(); + Authentication authentication = AuthenticationContext.getAuthentication(); + final String actorId = authentication.getActor().getId(); + final String actorUrn = authentication.getActor().toUrnStr(); return CompletableFuture.supplyAsync( () -> { // 1. Verify that only those authorized to generate a token (datahub system) are able to. @@ -164,7 +166,7 @@ CompletableFuture> generateSessionTokenForUser( } throw HttpClientErrorException.create( HttpStatus.UNAUTHORIZED, - "Unauthorized to perform this action.", + actorUrn + " unauthorized to perform this action.", new HttpHeaders(), null, null); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index a8b9c34ab66ae6..6033ead36f10ec 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -281,12 +281,13 @@ private Task ingestProposals( boolean asyncBool) throws URISyntaxException { Authentication authentication = AuthenticationContext.getAuthentication(); + String actorUrnStr = authentication.getActor().toUrnStr(); Set entityTypes = metadataChangeProposals.stream() .map(MetadataChangeProposal::getEntityType) .collect(Collectors.toSet()); final OperationContext opContext = OperationContext.asSession( - systemOperationContext, RequestContext.builder().buildRestli(authentication.getActor().toUrnStr(), getContext(), + systemOperationContext, RequestContext.builder().buildRestli(actorUrnStr, getContext(), ACTION_INGEST_PROPOSAL, entityTypes), _authorizer, authentication, true); // Ingest Authorization Checks @@ -299,9 +300,8 @@ private Task ingestProposals( .map(ex -> String.format("HttpStatus: %s Urn: %s", ex.getSecond(), ex.getFirst().getEntityUrn())) .collect(Collectors.joining(", ")); throw new RestLiServiceException( - HttpStatus.S_403_FORBIDDEN, "User is unauthorized to modify entity: " + errorMessages); + HttpStatus.S_403_FORBIDDEN, "User " + actorUrnStr + " is unauthorized to modify entity: " + errorMessages); } - String actorUrnStr = authentication.getActor().toUrnStr(); final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr)); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java index 6c5576f2e5d9f4..0c374c29cf958a 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java @@ -274,7 +274,7 @@ public Task ingest( String actorUrnStr = authentication.getActor().toUrnStr(); final Urn urn = com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()); final OperationContext opContext = OperationContext.asSession( - systemOperationContext, RequestContext.builder().buildRestli(authentication.getActor().toUrnStr(), getContext(), + systemOperationContext, RequestContext.builder().buildRestli(actorUrnStr, getContext(), ACTION_INGEST, urn.getEntityType()), authorizer, authentication, true); if (!isAPIAuthorizedEntityUrns( @@ -282,7 +282,7 @@ public Task ingest( CREATE, List.of(urn))) { throw new RestLiServiceException( - HttpStatus.S_403_FORBIDDEN, "User is unauthorized to edit entity " + urn); + HttpStatus.S_403_FORBIDDEN, "User " + actorUrnStr + " is unauthorized to edit entity " + urn); } try { @@ -320,7 +320,7 @@ public Task batchIngest( .map(Entity::getValue) .map(com.datahub.util.ModelUtils::getUrnFromSnapshotUnion).collect(Collectors.toList()); final OperationContext opContext = OperationContext.asSession( - systemOperationContext, RequestContext.builder().buildRestli(authentication.getActor().toUrnStr(), + systemOperationContext, RequestContext.builder().buildRestli(actorUrnStr, getContext(), ACTION_BATCH_INGEST, urns.stream().map(Urn::getEntityType).collect(Collectors.toList())), authorizer, authentication, true); @@ -328,7 +328,7 @@ public Task batchIngest( opContext, CREATE, urns)) { throw new RestLiServiceException( - HttpStatus.S_403_FORBIDDEN, "User is unauthorized to edit entities."); + HttpStatus.S_403_FORBIDDEN, "User " + actorUrnStr + " is unauthorized to edit entities."); } for (Entity entity : entities) { diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java index a0c3d460951605..426eff20c9c6eb 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/usage/UsageStats.java @@ -104,9 +104,10 @@ public Task batchIngest(@ActionParam(PARAM_BUCKETS) @Nonnull UsageAggregat () -> { final Authentication auth = AuthenticationContext.getAuthentication(); + String actorUrnStr = auth.getActor().toUrnStr(); Set urns = Arrays.stream(buckets).sequential().map(UsageAggregation::getResource).collect(Collectors.toSet()); final OperationContext opContext = OperationContext.asSession( - systemOperationContext, RequestContext.builder().buildRestli(auth.getActor().toUrnStr(), getContext(), + systemOperationContext, RequestContext.builder().buildRestli(actorUrnStr, getContext(), ACTION_BATCH_INGEST, urns.stream().map(Urn::getEntityType).collect(Collectors.toList())), _authorizer, auth, true); @@ -115,7 +116,7 @@ public Task batchIngest(@ActionParam(PARAM_BUCKETS) @Nonnull UsageAggregat UPDATE, urns)) { throw new RestLiServiceException( - HttpStatus.S_403_FORBIDDEN, "User is unauthorized to edit entities."); + HttpStatus.S_403_FORBIDDEN, "User " + actorUrnStr + " is unauthorized to edit entities."); } for (UsageAggregation agg : buckets) {