Skip to content

Commit

Permalink
job_id fix
Browse files Browse the repository at this point in the history
  • Loading branch information
venkatajagannath committed Sep 2, 2024
1 parent 67dbf4b commit d04081b
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions anyscale_provider/operators/anyscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def hook(self) -> AnyscaleHook:
"""Return an instance of the AnyscaleHook."""
return AnyscaleHook(conn_id=self.conn_id)

def execute(self, context: Context) -> None:
def execute(self, context: Context) -> str:
"""
Execute the job submission to Anyscale.
Expand Down Expand Up @@ -180,8 +180,9 @@ def execute(self, context: Context) -> None:
)
else:
raise Exception(f"Unexpected state `{current_state}` for job_id `{self.job_id}`.")
return self.job_id

def execute_complete(self, context: Context, event: Any) -> None:
def execute_complete(self, context: Context, event: Any) -> str:
"""
Complete the execution of the job based on the trigger event.
Expand All @@ -192,14 +193,16 @@ def execute_complete(self, context: Context, event: Any) -> None:
:param event: The event data from the trigger.
:return: None
"""
current_job_id = event["job_id"]
current_job_id: str = event["job_id"]

if event["state"] == JobState.FAILED:
self.log.info(f"Anyscale job {current_job_id} ended with state: {event['state']}")
raise AirflowException(f"Job {current_job_id} failed with error {event['message']}")
else:
self.log.info(f"Anyscale job {current_job_id} completed with state: {event['state']}")

return current_job_id


class RolloutAnyscaleService(BaseOperator):
"""
Expand Down

0 comments on commit d04081b

Please sign in to comment.