From 244f35de5c3381a33b35628967d9e78d33bfb66d Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 13 Jan 2025 12:55:57 -0800 Subject: [PATCH] fix(ingest): tighten Source.create type annotations (#12325) --- metadata-ingestion/src/datahub/ingestion/api/source.py | 4 ++-- .../src/datahub/ingestion/source/delta_lake/source.py | 5 ----- .../src/datahub/ingestion/source/demo_data.py | 2 +- .../src/datahub/ingestion/source/fivetran/fivetran.py | 7 +------ .../ingestion/source/kafka_connect/kafka_connect.py | 7 +------ .../src/datahub/ingestion/source/metabase.py | 7 +------ metadata-ingestion/src/datahub/ingestion/source/mlflow.py | 5 ----- metadata-ingestion/src/datahub/ingestion/source/nifi.py | 5 ----- metadata-ingestion/src/datahub/ingestion/source/redash.py | 5 ----- .../src/datahub/ingestion/source/snowflake/snowflake_v2.py | 6 ------ .../src/datahub/ingestion/source/superset.py | 7 +------ .../src/datahub/ingestion/source/tableau/tableau.py | 6 ------ metadata-ingestion/tests/unit/api/test_pipeline.py | 3 ++- 13 files changed, 9 insertions(+), 60 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 53cb1b0ecad4ee..b04ffdb3258934 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -23,7 +23,7 @@ ) from pydantic import BaseModel -from typing_extensions import LiteralString +from typing_extensions import LiteralString, Self from datahub.configuration.common import ConfigModel from datahub.configuration.source_common import PlatformInstanceConfigMixin @@ -400,7 +400,7 @@ class Source(Closeable, metaclass=ABCMeta): ctx: PipelineContext @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": + def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: # Technically, this method should be abstract. However, the @config_class # decorator automatically generates a create method at runtime if one is # not defined. Python still treats the class as abstract because it thinks diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index 98133ca69011e7..9df3905437b3b2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -122,11 +122,6 @@ def __init__(self, config: DeltaLakeSourceConfig, ctx: PipelineContext): config_report, ) - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": - config = DeltaLakeSourceConfig.parse_obj(config_dict) - return cls(config, ctx) - def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]: raw_field_json = json.loads(raw_field_json_str) diff --git a/metadata-ingestion/src/datahub/ingestion/source/demo_data.py b/metadata-ingestion/src/datahub/ingestion/source/demo_data.py index 79831c016e2d5d..1d7aedb151864f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/demo_data.py +++ b/metadata-ingestion/src/datahub/ingestion/source/demo_data.py @@ -29,7 +29,7 @@ class DemoDataSource(Source): def __init__(self, ctx: PipelineContext, config: DemoDataConfig): file_config = FileSourceConfig(path=str(download_sample_data())) - self.file_source = GenericFileSource(ctx, file_config) + self.file_source: GenericFileSource = GenericFileSource(ctx, file_config) def get_workunits(self) -> Iterable[MetadataWorkUnit]: yield from self.file_source.get_workunits() diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index adbfc48692db93..d8ebbe5b63d1ae 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -16,7 +16,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport +from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.fivetran.config import ( KNOWN_DATA_PLATFORM_MAPPING, @@ -291,11 +291,6 @@ def _get_connector_workunits( dpi = self._generate_dpi_from_job(job, datajob) yield from self._get_dpi_workunits(job, dpi) - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = FivetranSourceConfig.parse_obj(config_dict) - return cls(config, ctx) - def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py index fa6b614c4b52a6..72be864fc30a1c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py @@ -17,7 +17,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source +from datahub.ingestion.api.source import MetadataWorkUnitProcessor from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.kafka_connect.common import ( CONNECTOR_CLASS, @@ -94,11 +94,6 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext): if not jpype.isJVMStarted(): jpype.startJVM() - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = KafkaConnectSourceConfig.parse_obj(config_dict) - return cls(config, ctx) - def get_connectors_manifest(self) -> Iterable[ConnectorManifest]: """Get Kafka Connect connectors manifest using REST API. Enrich with lineages metadata. diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index 828bbd213a796f..ef16dc0a49a223 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -23,7 +23,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport +from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -789,11 +789,6 @@ def get_datasource_from_id( return platform, dbname, schema, platform_instance - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = MetabaseConfig.parse_obj(config_dict) - return cls(ctx, config) - def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py index 26d160acf330cf..b0b04dff20bffc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py @@ -333,8 +333,3 @@ def _get_global_tags_workunit( aspect=global_tags, ) return wu - - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = MLflowConfig.parse_obj(config_dict) - return cls(ctx, config) diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index f55d7a883edefe..7f446f6d1c2718 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -484,11 +484,6 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None: def rest_api_base_url(self): return self.config.site_url[: -len("nifi/")] + "nifi-api/" - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": - config = NifiSourceConfig.parse_obj(config_dict) - return cls(config, ctx) - def get_report(self) -> SourceReport: return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index f11d1944029ebb..666cc8c63aa9ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -369,11 +369,6 @@ def validate_connection(self) -> None: else: raise ValueError(f"Failed to connect to {self.config.connect_uri}/api") - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = RedashConfig.parse_obj(config_dict) - return cls(ctx, config) - def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict: url = f"/api/data_sources/{data_source_id}" resp = self.client._get(url).json() diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index c0385a8d5af30a..b8afd145727400 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -23,7 +23,6 @@ from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, - Source, SourceCapability, SourceReport, TestableSource, @@ -251,11 +250,6 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): self.add_config_to_report() - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": - config = SnowflakeV2Config.parse_obj(config_dict) - return cls(ctx, config) - @staticmethod def test_connection(config_dict: dict) -> TestConnectionReport: test_report = TestConnectionReport() diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 1da233bf0b22ab..a8b328f6e17739 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -33,7 +33,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source +from datahub.ingestion.api.source import MetadataWorkUnitProcessor from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( @@ -265,11 +265,6 @@ def login(self) -> requests.Session: # TODO(Gabe): how should we message about this error? return requests_session - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = SupersetConfig.parse_obj(config_dict) - return cls(ctx, config) - def paginate_entity_api_results(self, entity_type, page_size=100): current_page = 0 total_items = page_size diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 2543cbe653ba72..ea3fb6c979a19c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -71,7 +71,6 @@ from datahub.ingestion.api.source import ( CapabilityReport, MetadataWorkUnitProcessor, - Source, StructuredLogLevel, TestableSource, TestConnectionReport, @@ -804,11 +803,6 @@ def test_connection(config_dict: dict) -> TestConnectionReport: def get_report(self) -> TableauSourceReport: return self.report - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: - config = TableauConfig.parse_obj(config_dict) - return cls(config, ctx) - def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), diff --git a/metadata-ingestion/tests/unit/api/test_pipeline.py b/metadata-ingestion/tests/unit/api/test_pipeline.py index fe3d3160b729a1..324e4ed0f6e853 100644 --- a/metadata-ingestion/tests/unit/api/test_pipeline.py +++ b/metadata-ingestion/tests/unit/api/test_pipeline.py @@ -4,6 +4,7 @@ import pytest from freezegun import freeze_time +from typing_extensions import Self from datahub.configuration.common import DynamicTypedConfig from datahub.ingestion.api.committable import CommitPolicy, Committable @@ -440,7 +441,7 @@ def __init__(self, ctx: PipelineContext): ] @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": + def create(cls, config_dict: dict, ctx: PipelineContext) -> Self: assert not config_dict return cls(ctx)