diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 430537bf684f6..cd28cc1b20f31 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -36,6 +36,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks. - #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release. +- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index c8bf6c2fd0d5b..521f8f5ee07d8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -52,7 +52,8 @@ class StatefulIngestionConfig(ConfigModel): enabled: bool = Field( default=False, - description="The type of the ingestion state provider registered with datahub.", + description="Whether or not to enable stateful ingest. " + "Default: True if datahub-rest sink is used or if a `datahub_api` is specified, otherwise False", ) max_checkpoint_state_size: pydantic.PositiveInt = Field( default=2**24, # 16 MB @@ -231,6 +232,16 @@ def _initialize_checkpointing_state_provider(self) -> None: self.ingestion_checkpointing_state_provider: Optional[ IngestionCheckpointingProviderBase ] = None + + if self.stateful_ingestion_config is None and self.ctx.graph: + logger.info( + "Stateful ingestion got enabled by default, as datahub-rest sink is used or `datahub_api` is specified" + ) + self.stateful_ingestion_config = StatefulIngestionConfig( + enabled=True, + state_provider=DynamicTypedStateProviderConfig(type="datahub"), + ) + if ( self.stateful_ingestion_config is not None and self.stateful_ingestion_config.state_provider is not None diff --git a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py index 4387e5a17790f..d097c75ebd952 100644 --- a/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py +++ b/metadata-ingestion/tests/unit/stateful_ingestion/provider/test_provider.py @@ -17,6 +17,10 @@ from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, ) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StateProviderWrapper, +) from datahub.ingestion.source.state.usage_common_state import ( BaseTimeWindowCheckpointState, ) @@ -181,3 +185,34 @@ def test_providers(self): state_class=type(job2_state_obj), ) self.assertEqual(job2_last_checkpoint, job2_checkpoint) + + def test_state_provider_wrapper_with_config_provided(self): + # stateful_ingestion_config.enabled as true + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + ctx.graph = self.mock_graph + state_provider = StateProviderWrapper( + StatefulIngestionConfig(enabled=True), ctx + ) + assert state_provider.stateful_ingestion_config + assert state_provider.ingestion_checkpointing_state_provider + # stateful_ingestion_config.enabled as false + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + ctx.graph = self.mock_graph + state_provider = StateProviderWrapper( + StatefulIngestionConfig(enabled=False), ctx + ) + assert state_provider.stateful_ingestion_config + assert not state_provider.ingestion_checkpointing_state_provider + + def test_state_provider_wrapper_with_config_not_provided(self): + # graph object is present + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + ctx.graph = self.mock_graph + state_provider = StateProviderWrapper(None, ctx) + assert state_provider.stateful_ingestion_config + assert state_provider.ingestion_checkpointing_state_provider + # graph object is none + ctx = PipelineContext(run_id=self.run_id, pipeline_name=self.pipeline_name) + state_provider = StateProviderWrapper(None, ctx) + assert not state_provider.stateful_ingestion_config + assert not state_provider.ingestion_checkpointing_state_provider diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index 6e8c08d5bdf35..c38ae0db28f3e 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -34,7 +34,7 @@ def create_owners_list_from_urn_list( def create_mocked_dbt_source() -> DBTCoreSource: - ctx = PipelineContext("test-run-id") + ctx = PipelineContext(run_id="test-run-id", pipeline_name="dbt-source") graph = mock.MagicMock() graph.get_ownership.return_value = mce_builder.make_ownership_aspect_from_urn_list( ["urn:li:corpuser:test_user"], "AUDIT"