From 720adcb0dc8b38992b4b0a291441e4b3be211aff Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Tue, 27 Feb 2024 21:11:03 +0530 Subject: [PATCH 1/5] enable stateful_ingestion by default for DataHub rest sink --- .../ingestion/source/state/stateful_ingestion_base.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index c8bf6c2fd0d5b4..786a6544041148 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -52,7 +52,7 @@ class StatefulIngestionConfig(ConfigModel): enabled: bool = Field( default=False, - description="The type of the ingestion state provider registered with datahub.", + description="Default as True if datahub-rest sink is used or if datahub_api is specified, otherwise False", ) max_checkpoint_state_size: pydantic.PositiveInt = Field( default=2**24, # 16 MB @@ -231,6 +231,14 @@ def _initialize_checkpointing_state_provider(self) -> None: self.ingestion_checkpointing_state_provider: Optional[ IngestionCheckpointingProviderBase ] = None + + if self.stateful_ingestion_config is None and self.ctx.graph: + # If stateful ingestion config not set, enable it by default if graph object is not none + self.stateful_ingestion_config = StatefulIngestionConfig( + enabled=True, + state_provider=DynamicTypedStateProviderConfig(type="datahub"), + ) + if ( self.stateful_ingestion_config is not None and self.stateful_ingestion_config.state_provider is not None From ad4bf8d0894888cbd462849432c4144bde91ff4b Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Wed, 28 Feb 2024 17:46:35 +0530 Subject: [PATCH 2/5] Add test cases for state provider wrapper --- .../provider/test_provider.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py index 4387e5a17790f7..eca9dd50382428 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py @@ -17,6 +17,10 @@ from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, ) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StateProviderWrapper, +) from datahub.ingestion.source.state.usage_common_state import ( BaseTimeWindowCheckpointState, ) @@ -181,3 +185,32 @@ def test_providers(self): state_class=type(job2_state_obj), ) self.assertEqual(job2_last_checkpoint, job2_checkpoint) + + def test_state_provider_wrapper(self): + ctx: PipelineContext = PipelineContext( + run_id=self.run_id, pipeline_name=self.pipeline_name + ) + ctx.graph = self.mock_graph + # Test 1: stateful_ingestion_config provided with enabled as true + state_provider = StateProviderWrapper( + StatefulIngestionConfig(enabled=True), ctx + ) + assert state_provider.stateful_ingestion_config + assert state_provider.ingestion_checkpointing_state_provider + ctx.checkpointers = {} + # Test 2: stateful_ingestion_config provided with enabled as false + state_provider = StateProviderWrapper( + StatefulIngestionConfig(enabled=False), ctx + ) + assert state_provider.stateful_ingestion_config + assert not state_provider.ingestion_checkpointing_state_provider + # Test 3: stateful_ingestion_config not provided but graph object is present + state_provider = StateProviderWrapper(None, ctx) + assert state_provider.stateful_ingestion_config + assert state_provider.ingestion_checkpointing_state_provider + ctx.checkpointers = {} + # Test 4: stateful_ingestion_config not provided and graph object is none + ctx.graph = None + state_provider = StateProviderWrapper(None, ctx) + assert not state_provider.stateful_ingestion_config + assert not state_provider.ingestion_checkpointing_state_provider From e8d2449cf4a44a9e456cbb52177a0f2dd261038b Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Thu, 29 Feb 2024 15:41:06 +0530 Subject: [PATCH 3/5] Fix failed test case --- metadata-ingestion/tests/unit/test_dbt_source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index 6e8c08d5bdf351..c38ae0db28f3e6 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -34,7 +34,7 @@ def create_owners_list_from_urn_list( def create_mocked_dbt_source() -> DBTCoreSource: - ctx = PipelineContext("test-run-id") + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") graph = mock.MagicMock() graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list( ["urn:li:corpuser:test_user"], "AUDIT" From 7a48dea17fd2641ed40364e2dcd8451a7f806a6b Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 1 Mar 2024 15:57:11 +0530 Subject: [PATCH 4/5] Address review comments --- .../source/state/stateful_ingestion_base.py | 7 ++++-- .../provider/test_provider.py | 24 ++++++++++--------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 786a6544041148..521f8f5ee07d82 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -52,7 +52,8 @@ class StatefulIngestionConfig(ConfigModel): enabled: bool = Field( default=False, - description="Default as True if datahub-rest sink is used or if datahub_api is specified, otherwise False", + description="Whether or not to enable stateful ingest. " + "Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False", ) max_checkpoint_state_size: pydantic.PositiveInt = Field( default=2**24, # 16 MB @@ -233,7 +234,9 @@ def _initialize_checkpointing_state_provider(self) -> None: ] = None if self.stateful_ingestion_config is None and self.ctx.graph: - # If stateful ingestion config not set, enable it by default if graph object is not none + logger.info( + "Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified" + ) self.stateful_ingestion_config = StatefulIngestionConfig( enabled=True, state_provider=DynamicTypedStateProviderConfig(type="datahub"), diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py index eca9dd50382428..d097c75ebd952c 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py @@ -186,31 +186,33 @@ def test_providers(self): ) self.assertEqual(job2_last_checkpoint, job2_checkpoint) - def test_state_provider_wrapper(self): - ctx: PipelineContext = PipelineContext( - run_id=self.run_id, pipeline_name=self.pipeline_name - ) + def test_state_provider_wrapper_with_config_provided(self): + # stateful_ingestion_config.enabled as true + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) ctx.graph = self.mock_graph - # Test 1: stateful_ingestion_config provided with enabled as true state_provider = StateProviderWrapper( StatefulIngestionConfig(enabled=True), ctx ) assert state_provider.stateful_ingestion_config assert state_provider.ingestion_checkpointing_state_provider - ctx.checkpointers = {} - # Test 2: stateful_ingestion_config provided with enabled as false + # stateful_ingestion_config.enabled as false + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + ctx.graph = self.mock_graph state_provider = StateProviderWrapper( StatefulIngestionConfig(enabled=False), ctx ) assert state_provider.stateful_ingestion_config assert not state_provider.ingestion_checkpointing_state_provider - # Test 3: stateful_ingestion_config not provided but graph object is present + + def test_state_provider_wrapper_with_config_not_provided(self): + # graph object is present + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + ctx.graph = self.mock_graph state_provider = StateProviderWrapper(None, ctx) assert state_provider.stateful_ingestion_config assert state_provider.ingestion_checkpointing_state_provider - ctx.checkpointers = {} - # Test 4: stateful_ingestion_config not provided and graph object is none - ctx.graph = None + # graph object is none + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) state_provider = StateProviderWrapper(None, ctx) assert not state_provider.stateful_ingestion_config assert not state_provider.ingestion_checkpointing_state_provider From c70b17e0bd118891e8e4e246951461789a181865 Mon Sep 17 00:00:00 2001 From: shubhamjagtap639 Date: Fri, 1 Mar 2024 16:07:37 +0530 Subject: [PATCH 5/5] Update updating-datahub.md file --- docs/how/updating-datahub.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 430537bf684f63..cd28cc1b20f311 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -36,6 +36,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks. - #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release. +- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified ### Potential Downtime