Skip to content

Commit

Permalink
fix(airflow): add dag AllowDenyPattern config (#11472)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
dushayntAW and hsheth2 authored Oct 11, 2024
1 parent 4b66757 commit f13dae1
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 18 deletions.
3 changes: 2 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
```

| Name | Default value | Description |
| -------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
Expand All @@ -145,6 +145,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
| dag_filter_str | { "allow": [".*"] } | AllowDenyPattern value in form of JSON string to filter the DAGs from running. |

#### Validate that the plugin is working

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import datahub.emitter.mce_builder as builder
from airflow.configuration import conf
from datahub.configuration.common import ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from pydantic.fields import Field

if TYPE_CHECKING:
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
Expand Down Expand Up @@ -56,6 +57,11 @@ class DatahubLineageConfig(ConfigModel):
# Makes extraction of jinja-templated fields more accurate.
render_templates: bool = True

dag_filter_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for DAGs to ingest",
)

log_level: Optional[str] = None
debug_emitter: bool = False

Expand Down Expand Up @@ -93,6 +99,9 @@ def get_lineage_config() -> DatahubLineageConfig:
datajob_url_link = conf.get(
"datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
)
dag_filter_pattern = AllowDenyPattern.parse_raw(
conf.get("datahub", "dag_filter_str", fallback='{"allow": [".*"]}')
)

return DatahubLineageConfig(
enabled=enabled,
Expand All @@ -109,4 +118,5 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin=disable_openlineage_plugin,
datajob_url_link=datajob_url_link,
render_templates=render_templates,
dag_filter_pattern=dag_filter_pattern,
)
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,15 @@ def on_task_instance_running(
return

logger.debug(
f"DataHub listener got notification about task instance start for {task_instance.task_id}"
f"DataHub listener got notification about task instance start for {task_instance.task_id} of dag {task_instance.dag_run.dag_id}"
)

if not self.config.dag_filter_pattern.allowed(task_instance.dag_run.dag_id):
logger.debug(
f"DAG {task_instance.dag_run.dag_id} is not allowed by the pattern"
)
return

if self.config.render_templates:
task_instance = _render_templates(task_instance)

Expand Down Expand Up @@ -492,6 +498,10 @@ def on_task_instance_finish(

dag: "DAG" = task.dag # type: ignore[assignment]

if not self.config.dag_filter_pattern.allowed(dag.dag_id):
logger.debug(f"DAG {dag.dag_id} is not allowed by the pattern")
return

datajob = AirflowGenerator.generate_datajob(
cluster=self.config.cluster,
task=task,
Expand Down Expand Up @@ -689,8 +699,12 @@ def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
f"DataHub listener got notification about dag run start for {dag_run.dag_id}"
)

self.on_dag_start(dag_run)
assert dag_run.dag_id
if not self.config.dag_filter_pattern.allowed(dag_run.dag_id):
logger.debug(f"DAG {dag_run.dag_id} is not allowed by the pattern")
return

self.on_dag_start(dag_run)
self.emitter.flush()

# TODO: Add hooks for on_dag_run_success, on_dag_run_failed -> call AirflowGenerator.complete_dataflow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator

from datahub_airflow_plugin.entities import Dataset, Urn

with DAG(
"dag_to_skip",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
task1 = BashOperator(
task_id="dag_to_skip_task_1",
dag=dag,
bash_command="echo 'dag_to_skip_task_1'",
inlets=[
Dataset(platform="snowflake", name="mydb.schema.tableA"),
Urn(
"urn:li:dataset:(urn:li:dataPlatform:snowflake,mydb.schema.tableC,PROD)"
),
Urn("urn:li:dataJob:(urn:li:dataFlow:(airflow,test_dag,PROD),test_task)"),
],
outlets=[Dataset("snowflake", "mydb.schema.tableD")],
)

task2 = BashOperator(
task_id="dag_to_skip_task_2",
dag=dag,
bash_command="echo 'dag_to_skip_task_2'",
)

task1 >> task2
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
DAGS_FOLDER = pathlib.Path(__file__).parent / "dags"
GOLDENS_FOLDER = pathlib.Path(__file__).parent / "goldens"

DAG_TO_SKIP_INGESTION = "dag_to_skip"


@dataclasses.dataclass
class AirflowInstance:
Expand Down Expand Up @@ -140,6 +142,7 @@ def _run_airflow(
# Configure the datahub plugin and have it write the MCPs to a file.
"AIRFLOW__CORE__LAZY_LOAD_PLUGINS": "False" if is_v1 else "True",
"AIRFLOW__DATAHUB__CONN_ID": datahub_connection_name,
"AIRFLOW__DATAHUB__DAG_FILTER_STR": f'{{ "deny": ["{DAG_TO_SKIP_INGESTION}"] }}',
f"AIRFLOW_CONN_{datahub_connection_name.upper()}": Connection(
conn_id="datahub_file_default",
conn_type="datahub-file",
Expand Down Expand Up @@ -276,6 +279,7 @@ class DagTestCase:
test_cases = [
DagTestCase("simple_dag"),
DagTestCase("basic_iolets"),
DagTestCase("dag_to_skip", v2_only=True),
DagTestCase("snowflake_operator", success=False, v2_only=True),
DagTestCase("sqlite_operator", v2_only=True),
DagTestCase("custom_operator_dag", v2_only=True),
Expand Down Expand Up @@ -373,20 +377,24 @@ def test_airflow_plugin(
print("Sleeping for a few seconds to let the plugin finish...")
time.sleep(10)

_sanitize_output_file(airflow_instance.metadata_file)

check_golden_file(
pytestconfig=pytestconfig,
output_path=airflow_instance.metadata_file,
golden_path=golden_path,
ignore_paths=[
# TODO: If we switched to Git urls, maybe we could get this to work consistently.
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]",
],
)
if dag_id == DAG_TO_SKIP_INGESTION:
# Verify that no MCPs were generated.
assert not os.path.exists(airflow_instance.metadata_file)
else:
_sanitize_output_file(airflow_instance.metadata_file)

check_golden_file(
pytestconfig=pytestconfig,
output_path=airflow_instance.metadata_file,
golden_path=golden_path,
ignore_paths=[
# TODO: If we switched to Git urls, maybe we could get this to work consistently.
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]",
],
)


def _sanitize_output_file(output_path: pathlib.Path) -> None:
Expand Down

0 comments on commit f13dae1

Please sign in to comment.