Skip to content

Commit

Permalink
Return job_id from execute() and execute_complete() method in SubmitA…
Browse files Browse the repository at this point in the history
…nyscaleJob (#48)

* job_id fix

* unit test fixed
  • Loading branch information
venkatajagannath authored Sep 9, 2024
1 parent 87dbc3c commit fa7f930
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
9 changes: 6 additions & 3 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def hook(self) -> AnyscaleHook:
"""Return an instance of the AnyscaleHook."""
return AnyscaleHook(conn_id=self.conn_id)

def execute(self, context: Context) -> None:
def execute(self, context: Context) -> str:
"""
Execute the job submission to Anyscale.
Expand Down Expand Up @@ -180,8 +180,9 @@ def execute(self, context: Context) -> None:
)
else:
raise Exception(f"Unexpected state `{current_state}` for job_id `{self.job_id}`.")
return self.job_id

def execute_complete(self, context: Context, event: Any) -> None:
def execute_complete(self, context: Context, event: Any) -> str:
"""
Complete the execution of the job based on the trigger event.
Expand All @@ -192,14 +193,16 @@ def execute_complete(self, context: Context, event: Any) -> None:
:param event: The event data from the trigger.
:return: None
"""
current_job_id = event["job_id"]
current_job_id: str = event["job_id"]

if event["state"] == JobState.FAILED:
self.log.info(f"Anyscale job {current_job_id} ended with state: {event['state']}")
raise AirflowException(f"Job {current_job_id} failed with error {event['message']}")
else:
self.log.info(f"Anyscale job {current_job_id} completed with state: {event['state']}")

return current_job_id


class RolloutAnyscaleService(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/operators/test_anyscale_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_on_kill(self, mock_hook):
@patch("anyscale_provider.operators.anyscale.SubmitAnyscaleJob.hook")
def test_execute_complete(self, mock_hook):
event = {"state": JobState.SUCCEEDED, "job_id": "123", "message": "Job completed successfully"}
self.assertEqual(self.operator.execute_complete(Context(), event), None)
self.assertEqual(self.operator.execute_complete(Context(), event), "123")

@patch("anyscale_provider.operators.anyscale.SubmitAnyscaleJob.hook")
def test_execute_complete_failure(self, mock_hook):
Expand Down

0 comments on commit fa7f930

Please sign in to comment.