Skip to content

Commit

Permalink
workflows: fix restart workflows issue
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Dec 20, 2024
1 parent 9e78629 commit 95141d5
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
Binary file not shown.
11 changes: 11 additions & 0 deletions backoffice/.ipython/profile_default/startup/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
This is the IPython startup directory

.py and .ipy files in this directory will be run *prior* to any code or files specified
via the exec_lines or exec_files configurables whenever you load this profile.

Files will be run in lexicographical order, so you can control the execution order of files
with a prefix, e.g.::

00-first.py
50-middle.py
99-last.ipy
17 changes: 12 additions & 5 deletions backoffice/backoffice/authors/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,18 @@ def restart_workflow_dags(workflow_id, workflow_type, params=None):
:param workflow_id: workflow_id for dags that should be restarted
:param workflow_type: type of workflow the will be restarted
:param params: parameters of new dag execution
:param params: parameters of new dag execution, if not provided will be fetched from the workflow
:returns: request response
"""
conf = params if params else fetch_conf_workflow_dag(workflow_id, workflow_type)

data = fetch_data_workflow_dag(workflow_id, workflow_type)
delete_workflow_dag_runs(workflow_id, workflow_type)

return trigger_airflow_dag(
WORKFLOW_DAGS[workflow_type].initialize, str(workflow_id), params or data
WORKFLOW_DAGS[workflow_type].initialize,
str(workflow_id),
workflow=conf.get("workflow"),
extra_data=conf.get("data"),
)


Expand All @@ -183,7 +186,7 @@ def delete_workflow_dag_runs(workflow_id, workflow_type):
delete_workflow_dag(dag_id, str(workflow_id))


def fetch_data_workflow_dag(workflow_id, workflow_type):
def fetch_conf_workflow_dag(workflow_id, workflow_type):
"""Fetches Data that the workflow ran with
:param workflow_id: workflow_id for dag to get data of
Expand All @@ -193,5 +196,9 @@ def fetch_data_workflow_dag(workflow_id, workflow_type):

executed_dags_for_workflow = find_executed_dags(workflow_id, workflow_type)

print("fetch_conf_workflow_dag")
print(workflow_id, workflow_type)
print(executed_dags_for_workflow)

_, dag = next(iter(executed_dags_for_workflow.items()))
return dag["conf"].get("data")
return dag["conf"]
4 changes: 2 additions & 2 deletions backoffice/backoffice/authors/tests/test_airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def test_delete_workflow_dag_runs(self):
airflow_utils.delete_workflow_dag_runs(self.workflow_id, self.workflow_type)

@pytest.mark.vcr
def test_fetch_data_workflow_dag(self):
result = airflow_utils.fetch_data_workflow_dag(
def test_fetch_conf_workflow_dag(self):
result = airflow_utils.fetch_conf_workflow_dag(
self.workflow_id, self.workflow_type
)

Expand Down

0 comments on commit 95141d5

Please sign in to comment.