From 7ac6523b2de6f46edadd83d834f694c892a4c761 Mon Sep 17 00:00:00 2001 From: josges Date: Mon, 20 Jan 2025 15:07:12 +0100 Subject: [PATCH] fix(ingestion): fix stateful ingestion for GCS source (#11879) --- .../src/datahub/ingestion/source/gcs/gcs_source.py | 3 ++- metadata-ingestion/tests/unit/test_gcs_source.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py index 5196c8ec5b998..95490147d4820 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gcs/gcs_source.py @@ -88,6 +88,7 @@ def __init__(self, config: GCSSourceConfig, ctx: PipelineContext): super().__init__(config, ctx) self.config = config self.report = GCSSourceReport() + self.platform: str = PLATFORM_GCS self.s3_source = self.create_equivalent_s3_source(ctx) @classmethod @@ -135,7 +136,7 @@ def create_equivalent_s3_path_specs(self): def create_equivalent_s3_source(self, ctx: PipelineContext) -> S3Source: config = self.create_equivalent_s3_config() - return self.s3_source_overrides(S3Source(config, ctx)) + return self.s3_source_overrides(S3Source(config, PipelineContext(ctx.run_id))) def s3_source_overrides(self, source: S3Source) -> S3Source: source.source_config.platform = PLATFORM_GCS diff --git a/metadata-ingestion/tests/unit/test_gcs_source.py b/metadata-ingestion/tests/unit/test_gcs_source.py index 9d5f4e915b18c..5b99862c92a04 100644 --- a/metadata-ingestion/tests/unit/test_gcs_source.py +++ b/metadata-ingestion/tests/unit/test_gcs_source.py @@ -1,13 +1,17 @@ +from unittest import mock + import pytest from pydantic import ValidationError from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.data_lake_common.data_lake_utils import PLATFORM_GCS from datahub.ingestion.source.gcs.gcs_source import GCSSource def test_gcs_source_setup(): - ctx = PipelineContext(run_id="test-gcs") + graph = mock.MagicMock(spec=DataHubGraph) + ctx = PipelineContext(run_id="test-gcs", graph=graph, pipeline_name="test-gcs") # Baseline: valid config source: dict = { @@ -18,6 +22,7 @@ def test_gcs_source_setup(): } ], "credential": {"hmac_access_id": "id", "hmac_access_secret": "secret"}, + "stateful_ingestion": {"enabled": "true"}, } gcs = GCSSource.create(source, ctx) assert gcs.s3_source.source_config.platform == PLATFORM_GCS