Skip to content

Commit

Permalink
modify peloton dep dag to add active flag
Browse files Browse the repository at this point in the history
  • Loading branch information
namm-phamm committed Mar 4, 2024
1 parent e462fe8 commit 60f3029
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
1 change: 1 addition & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ def build(self) -> Dict[str, Union[str, DAG]]:
dag_kwargs["wait_on_tasks"] = dag_params.get("wait_on_tasks", None)
dag_kwargs["alert_on_start"] = dag_params.get("alert_on_start", None)
dag_kwargs["alert_on_finish"] = dag_params.get("alert_on_finish", None)
dag_kwargs["is_dag_active"] = dag_params.get("is_dag_active", None)

operator_defaults: Optional[Dict] = None
if utils.check_dict_key(dag_params, "operator_defaults"):
Expand Down
13 changes: 12 additions & 1 deletion plugins/dags/peloton_dep_dag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import List, Optional, Union

from airflow.models import DAG
from airflow.models import DAG, DagModel
from airflow.operators.dummy import DummyOperator
from airflow.models.baseoperator import BaseOperator

Expand Down Expand Up @@ -61,6 +61,8 @@ class PelotonDepDag(DAG):
:param alert_on_finish: whether slack alert is sent on completion of the dag, default is false
:type alert_on_finish: boolean
all other parameters inherited from DAG would also apply
:param is_dag_active: flag indicating DAG being active as controlled by Airflow scheduler
:type is_dag_active: boolean
"""

def __init__(
Expand All @@ -72,6 +74,7 @@ def __init__(
wait_on_tasks: Optional[List[dict]] = None,
alert_on_start: bool = False,
alert_on_finish: bool = False,
is_dag_active: bool = False,
*args,
**kwargs
):
Expand Down Expand Up @@ -99,6 +102,14 @@ def __init__(
self.wait_on_tasks = wait_on_tasks
self.alert_on_start = alert_on_start
self.alert_on_finish = alert_on_finish
self.is_dag_active = is_dag_active

dag = DagModel.get_dagmodel(self.dag_id)
if dag:
if self.is_dag_active:
dag.set_is_paused(False)
else:
dag.set_is_paused(True)

def __exit__(self, _type, _value, _tb):
roots = self.roots
Expand Down

0 comments on commit 60f3029

Please sign in to comment.