diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 8272e9ea..2d2ed2cd 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -716,6 +716,7 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag_kwargs["wait_on_tasks"] = dag_params.get("wait_on_tasks", None) dag_kwargs["alert_on_start"] = dag_params.get("alert_on_start", None) dag_kwargs["alert_on_finish"] = dag_params.get("alert_on_finish", None) + dag_kwargs["is_dag_active"] = dag_params.get("is_dag_active", None) operator_defaults: Optional[Dict] = None if utils.check_dict_key(dag_params, "operator_defaults"): diff --git a/plugins/dags/peloton_dep_dag.py b/plugins/dags/peloton_dep_dag.py index f7af6fd3..fa772ec4 100644 --- a/plugins/dags/peloton_dep_dag.py +++ b/plugins/dags/peloton_dep_dag.py @@ -1,7 +1,7 @@ import logging from typing import List, Optional, Union -from airflow.models import DAG +from airflow.models import DAG, DagModel from airflow.operators.dummy import DummyOperator from airflow.models.baseoperator import BaseOperator @@ -61,6 +61,8 @@ class PelotonDepDag(DAG): :param alert_on_finish: whether slack alert is sent on completion of the dag, default is false :type alert_on_finish: boolean all other parameters inherited from DAG would also apply + :param is_dag_active: flag indicating DAG being active as controlled by Airflow scheduler + :type is_dag_active: boolean """ def __init__( @@ -72,6 +74,7 @@ def __init__( wait_on_tasks: Optional[List[dict]] = None, alert_on_start: bool = False, alert_on_finish: bool = False, + is_dag_active: bool = False, *args, **kwargs ): @@ -99,6 +102,14 @@ def __init__( self.wait_on_tasks = wait_on_tasks self.alert_on_start = alert_on_start self.alert_on_finish = alert_on_finish + self.is_dag_active = is_dag_active + + dag = DagModel.get_dagmodel(self.dag_id) + if dag: + if self.is_dag_active: + dag.set_is_paused(False) + else: + dag.set_is_paused(True) def __exit__(self, _type, _value, _tb): roots = self.roots