Skip to content

Commit

Permalink
chore: improve workflow on_failure behaivour
Browse files Browse the repository at this point in the history
  • Loading branch information
talboren committed Oct 18, 2024
1 parent 0372589 commit 751f2c9
Showing 1 changed file with 49 additions and 10 deletions.
59 changes: 49 additions & 10 deletions keep/workflowmanager/workflowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,50 @@ def _check_premium_providers(self, workflow: Workflow):
f"Provider {provider} is a premium provider. You can self-host or contact us to get access to it."
)

def _run_workflow_on_failure(
self, workflow: Workflow, workflow_execution_id: str, error_message: str
):
"""
Runs the workflow on_failure action.
Args:
workflow (Workflow): The workflow that fails
workflow_execution_id (str): Workflow execution id
error_message (str): The error message(s)
"""
if workflow.on_failure:
self.logger.info(
f"Running on_failure action for workflow {workflow.workflow_id}",
extra={
"workflow_execution_id": workflow_execution_id,
"workflow_id": workflow.workflow_id,
"tenant_id": workflow.context_manager.tenant_id,
},
)
# Adding the exception message to the provider context, so it'll be available for the action
message = (
f"Workflow {workflow.workflow_id} failed with errors: {error_message}"
)
workflow.on_failure.provider_parameters = {"message": message}
workflow.on_failure.run()
self.logger.info(
"Ran on_failure action for workflow",
extra={
"workflow_execution_id": workflow_execution_id,
"workflow_id": workflow.workflow_id,
"tenant_id": workflow.context_manager.tenant_id,
},
)
else:
self.logger.debug(
"No on_failure configured for workflow",
extra={
"workflow_execution_id": workflow_execution_id,
"workflow_id": workflow.workflow_id,
"tenant_id": workflow.context_manager.tenant_id,
},
)

def _run_workflow(
self, workflow: Workflow, workflow_execution_id: str, test_run=False
):
Expand All @@ -393,21 +437,16 @@ def _run_workflow(
try:
self._check_premium_providers(workflow)
errors = workflow.run(workflow_execution_id)
if errors:
self._run_workflow_on_failure(
workflow, workflow_execution_id, ", ".join(errors)
)
except Exception as e:
self.logger.error(
f"Error running workflow {workflow.workflow_id}",
extra={"exception": e, "workflow_execution_id": workflow_execution_id},
)
if workflow.on_failure:
self.logger.info(
f"Running on_failure action for workflow {workflow.workflow_id}"
)
# Adding the exception message to the provider context, so it'll be available for the action
message = (
f"Workflow {workflow.workflow_id} failed with exception: {str(e)}å"
)
workflow.on_failure.provider_parameters = {"message": message}
workflow.on_failure.run()
self._run_workflow_on_failure(workflow, workflow_execution_id, str(e))
raise
finally:
if not test_run:
Expand Down

0 comments on commit 751f2c9

Please sign in to comment.