diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index aca6d30619ea8..35f2ff862e695 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default ``` | Name | Default value | Description | -| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | enabled | true | If the plugin should be enabled. | | conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. | | cluster | prod | name of the airflow cluster | @@ -145,6 +145,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default | datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | | | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | +| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. | #### Validate that the plugin is working diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index 8deba22a107ce..c4964712cf9f7 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -3,7 +3,8 @@ import datahub.emitter.mce_builder as builder from airflow.configuration import conf -from datahub.configuration.common import ConfigModel +from datahub.configuration.common import AllowDenyPattern, ConfigModel +from pydantic.fields import Field if TYPE_CHECKING: from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -56,6 +57,11 @@ class DatahubLineageConfig(ConfigModel): # Makes extraction of jinja-templated fields more accurate. render_templates: bool = True + dag_filter_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="regex patterns for DAGs to ingest", + ) + log_level: Optional[str] = None debug_emitter: bool = False @@ -93,6 +99,9 @@ def get_lineage_config() -> DatahubLineageConfig: datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) + dag_filter_pattern = AllowDenyPattern.parse_raw( + conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}') + ) return DatahubLineageConfig( enabled=enabled, @@ -109,4 +118,5 @@ def get_lineage_config() -> DatahubLineageConfig: disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, render_templates=render_templates, + dag_filter_pattern=dag_filter_pattern, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index b818b76de9f7f..d1c7e996dd03d 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -383,9 +383,15 @@ def on_task_instance_running( return logger.debug( - f"DataHub listener got notification about task instance start for {task_instance.task_id}" + f"DataHub listener got notification about task instance start for {task_instance.task_id} of dag {task_instance.dag_run.dag_id}" ) + if not self.config.dag_filter_pattern.allowed(task_instance.dag_run.dag_id): + logger.debug( + f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern" + ) + return + if self.config.render_templates: task_instance = _render_templates(task_instance) @@ -492,6 +498,10 @@ def on_task_instance_finish( dag: "DAG" = task.dag # type: ignore[assignment] + if not self.config.dag_filter_pattern.allowed(dag.dag_id): + logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern") + return + datajob = AirflowGenerator.generate_datajob( cluster=self.config.cluster, task=task, @@ -689,8 +699,12 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None: f"DataHub listener got notification about dag run start for {dag_run.dag_id}" ) - self.on_dag_start(dag_run) + assert dag_run.dag_id + if not self.config.dag_filter_pattern.allowed(dag_run.dag_id): + logger.debug(f"DAG {dag_run.dag_id} is not allowed by the pattern") + return + self.on_dag_start(dag_run) self.emitter.flush() # TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py new file mode 100644 index 0000000000000..a805a2219d142 --- /dev/null +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/dags/dag_to_skip.py @@ -0,0 +1,34 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.bash import BashOperator + +from datahub_airflow_plugin.entities import Dataset, Urn + +with DAG( + "dag_to_skip", + start_date=datetime(2023, 1, 1), + schedule_interval=None, + catchup=False, +) as dag: + task1 = BashOperator( + task_id="dag_to_skip_task_1", + dag=dag, + bash_command="echo 'dag_to_skip_task_1'", + inlets=[ + Dataset(platform="snowflake", name="mydb.schema.tableA"), + Urn( + "urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)" + ), + Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"), + ], + outlets=[Dataset("snowflake", "mydb.schema.tableD")], + ) + + task2 = BashOperator( + task_id="dag_to_skip_task_2", + dag=dag, + bash_command="echo 'dag_to_skip_task_2'", + ) + + task1 >> task2 diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 37cd3b792d535..44efd94f834b1 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -33,6 +33,8 @@ DAGS_FOLDER = pathlib.Path(__file__).parent / "dags" GOLDENS_FOLDER = pathlib.Path(__file__).parent / "goldens" +DAG_TO_SKIP_INGESTION = "dag_to_skip" + @dataclasses.dataclass class AirflowInstance: @@ -140,6 +142,7 @@ def _run_airflow( # Configure the datahub plugin and have it write the MCPs to a file. "AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True", "AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name, + "AIRFLOW__DATAHUB__DAG_FILTER_STR": f'{{ "deny": ["{DAG_TO_SKIP_INGESTION}"] }}', f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection( conn_id="datahub_file_default", conn_type="datahub-file", @@ -276,6 +279,7 @@ class DagTestCase: test_cases = [ DagTestCase("simple_dag"), DagTestCase("basic_iolets"), + DagTestCase("dag_to_skip", v2_only=True), DagTestCase("snowflake_operator", success=False, v2_only=True), DagTestCase("sqlite_operator", v2_only=True), DagTestCase("custom_operator_dag", v2_only=True), @@ -373,20 +377,24 @@ def test_airflow_plugin( print("Sleeping for a few seconds to let the plugin finish...") time.sleep(10) - _sanitize_output_file(airflow_instance.metadata_file) - - check_golden_file( - pytestconfig=pytestconfig, - output_path=airflow_instance.metadata_file, - golden_path=golden_path, - ignore_paths=[ - # TODO: If we switched to Git urls, maybe we could get this to work consistently. - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]", - r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]", - ], - ) + if dag_id == DAG_TO_SKIP_INGESTION: + # Verify that no MCPs were generated. + assert not os.path.exists(airflow_instance.metadata_file) + else: + _sanitize_output_file(airflow_instance.metadata_file) + + check_golden_file( + pytestconfig=pytestconfig, + output_path=airflow_instance.metadata_file, + golden_path=golden_path, + ignore_paths=[ + # TODO: If we switched to Git urls, maybe we could get this to work consistently. + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]", + ], + ) def _sanitize_output_file(output_path: pathlib.Path) -> None: