Skip to content

Commit

Permalink
Merge pull request #6566 from grondo/dependency-afterexc
Browse files Browse the repository at this point in the history
support `afterexc` dependency scheme
  • Loading branch information
mergify[bot] authored Jan 21, 2025
2 parents 8a18be7 + e9178b1 commit 482028e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 8 deletions.
5 changes: 5 additions & 0 deletions doc/man1/common/job-dependencies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ afternotok:JOBID
This dependency is satisfied after JOBID enters the INACTIVE state
with an unsuccessful result.

afterexcept:JOBID
This dependency is satisfied when JOBID enters the INACTIVE state
and a fatal job exception caused the transition to CLEANUP (e.g.,
node failure, timeout, cancel, etc.).

begin-time:TIMESTAMP
This dependency is satisfied after TIMESTAMP, which is specified in
floating point seconds since the UNIX epoch. See the ``--begin-time``
Expand Down
1 change: 1 addition & 0 deletions doc/man7/flux-jobtap-plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ urgency i current urgency
priority I current priority
t_submit f submit timestamp in floating point seconds
entry o posted eventlog entry, including context
end_event o copy of event that cause transition to CLEANUP, if available
========== ==== ==========================================

Return arguments can be packed using the ``FLUX_PLUGIN_ARG_OUT`` and
Expand Down
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ reprioritization
afterany
afterok
afternotok
afterexcept
parsedatetime
cancelall
raiseall
Expand Down
10 changes: 7 additions & 3 deletions src/modules/job-manager/jobtap.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ static flux_plugin_arg_t *jobtap_args_create (struct jobtap *jobtap,
"R", job->R_redacted) < 0)
goto error;
}
if (job->end_event)
if (flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_IN,
"{s:O}",
"end_event", job->end_event) < 0)
goto error;
/*
* Always start with empty OUT args. This allows unpack of OUT
* args to work without error, even if plugin does not set any
Expand Down Expand Up @@ -1963,10 +1969,8 @@ static struct job * jobtap_lookup_jobid (flux_plugin_t *p, flux_jobid_t id)
return NULL;
}
job = current_job (jobtap);
if (id == FLUX_JOBTAP_CURRENT_JOB || (job && id == job->id)) {
errno = EINVAL;
if (id == FLUX_JOBTAP_CURRENT_JOB || (job && id == job->id))
return job;
}
return lookup_job (jobtap->ctx, id);
}

Expand Down
70 changes: 65 additions & 5 deletions src/modules/job-manager/plugins/dependency-after.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ enum after_type {
AFTER_START = 0x1,
AFTER_FINISH = 0x2,
AFTER_SUCCESS = 0x4,
AFTER_FAILURE = 0x8
AFTER_FAILURE = 0x8,
AFTER_EXCEPT = 0x10
};

struct after_info {
Expand Down Expand Up @@ -60,6 +61,8 @@ static const char * after_typestr (enum after_type type)
return "after-success";
case AFTER_FAILURE:
return "after-failure";
case AFTER_EXCEPT:
return "after-except";
}
return "";
}
Expand All @@ -74,6 +77,8 @@ static int after_type_parse (const char *s, enum after_type *tp)
*tp = AFTER_SUCCESS;
else if (streq (s, "afternotok"))
*tp = AFTER_FAILURE;
else if (streq (s, "afterexcept"))
*tp = AFTER_EXCEPT;
else
return -1;
return 0;
Expand Down Expand Up @@ -231,6 +236,29 @@ static int lookup_job_uid_state (flux_plugin_t *p,
return rc;
}

static int check_exception_end_event (flux_plugin_t *p, flux_jobid_t id)
{
int rc = -1;
flux_plugin_arg_t *args;
char *name;

if (!(args = flux_jobtap_job_lookup (p, id))
|| flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:{s:s}}",
"end_event",
"name", &name) < 0)
goto out;

/* If end_event is has name of "exception", then we know this job
* was terminated by a fatal exception:
*/
rc = streq (name, "exception");
out:
flux_plugin_arg_destroy (args);
return rc;
}

/* Handle a job in INACTIVE state.
*
* Get the job result and check for various error states (e.g. afterok
Expand Down Expand Up @@ -281,6 +309,21 @@ static int dependency_handle_inactive (flux_plugin_t *p,
"dependency: afternotok:"
" job %s succeeded",
jobid);
if (type == AFTER_EXCEPT) {
if ((rc = check_exception_end_event (p, afterid)) < 0)
return flux_jobtap_reject_job (p,
args,
"dependency: afterexcept:"
" failed to get end event for %s",
jobid);
else if (rc == 0)
return flux_jobtap_reject_job (p,
args,
"dependency: afterexcept:"
" job %s: no exception",
jobid);
}

rc = flux_jobtap_dependency_remove (p, after->depid, after->description);
if (rc < 0)
flux_log_error (flux_jobtap_get_flux (p),
Expand Down Expand Up @@ -543,7 +586,9 @@ static void release_dependency_references (flux_plugin_t *p)
flux_log_error (h, "release_references: flux_jobtap_job_aux_delete");
}

static int release_dependent_jobs (flux_plugin_t *p, zlistx_t *l)
static int release_dependent_jobs (flux_plugin_t *p,
const char *end_event_name,
zlistx_t *l)
{
flux_t *h = flux_jobtap_get_flux (p);
flux_job_result_t result;
Expand All @@ -570,8 +615,12 @@ static int release_dependent_jobs (flux_plugin_t *p, zlistx_t *l)
/* O/w, release dependent jobs based on requisite job result.
* Entries will be removed from the list as they are processed.
*/
if (result != FLUX_JOB_RESULT_COMPLETED)
release_all (p, l, AFTER_FINISH | AFTER_FAILURE);
if (result != FLUX_JOB_RESULT_COMPLETED) {
int typemask = AFTER_FINISH | AFTER_FAILURE;
if (end_event_name && streq (end_event_name, "exception"))
typemask |= AFTER_EXCEPT;
release_all (p, l, typemask);
}
else
release_all (p, l, AFTER_FINISH | AFTER_SUCCESS);

Expand Down Expand Up @@ -632,10 +681,20 @@ static int inactive_cb (flux_plugin_t *p,
flux_plugin_arg_t *args,
void *data)
{
const char *end_event_name = NULL;

/* Attempt to get end event name (event that cause transition to CLEANUP)
*/
(void) flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s?{s?s}}",
"end_event",
"name", &end_event_name);

/* Only need to check for dependent jobs if this job has an
* embedded dependency list
*/
release_dependent_jobs (p, after_list_check (p, 0));
release_dependent_jobs (p, end_event_name, after_list_check (p, 0));

/* "Release" any references this job had to any dependencies
* (references should still exist only if job skipped PRIORITY state.)
Expand Down Expand Up @@ -707,6 +766,7 @@ static const struct flux_plugin_handler tab[] = {
{ "job.dependency.afterok", dependency_after_cb, NULL },
{ "job.dependency.afterany", dependency_after_cb, NULL },
{ "job.dependency.afternotok", dependency_after_cb, NULL },
{ "job.dependency.afterexcept",dependency_after_cb, NULL },
{ "job.state.priority", priority_cb, NULL },
{ "job.state.inactive", inactive_cb, NULL },
{ "job.event.start", start_cb, NULL },
Expand Down
26 changes: 26 additions & 0 deletions t/t2271-job-dependency-after.t
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,32 @@ test_expect_success 'chain of afternotok jobs are canceled if one succeeds' '
flux job wait-event -vt 15 -m type=dependency $id2 exception
done
'
test_expect_success 'dependency=afterexcept works' '
exc_id=$(flux submit sleep 60) &&
successid=$(flux submit --urgency=hold true) &&
failid=$(flux submit --urgency=hold false) &&
id1=$(flux submit --dependency=afterexcept:${exc_id} true) &&
id2=$(flux submit --dependency=afterexcept:${successid} true) &&
id3=$(flux submit --dependency=afterexcept:${failid} true) &&
flux job wait-event $exc_id start &&
flux cancel $exc_id &&
flux job wait-event $exc_id clean &&
flux job urgency $successid default &&
flux job urgency $failid default &&
flux job wait-event -vt 15 \
-m description=after-except=$exc_id \
$id1 dependency-remove &&
flux job wait-event -vt 15 \
-m type=dependency \
$id2 exception &&
flux job wait-event -vt 15 \
-m type=dependency \
$id3 exception
'
test_expect_success 'dependency=afterexcept works for INACTIVE jobs' '
flux run -vvv --dependency=afterexcept:${exc_id} true &&
test_must_fail flux submit --dependency=afterexcept:${successid} true
'
test_expect_success 'dependency=after works for INACTIVE jobs' '
run_timeout 15 \
flux bulksubmit --wait --watch \
Expand Down

0 comments on commit 482028e

Please sign in to comment.