diff --git a/src/modules/job-list/job_data.c b/src/modules/job-list/job_data.c index 1222e68a62b7..9b4e8898a800 100644 --- a/src/modules/job-list/job_data.c +++ b/src/modules/job-list/job_data.c @@ -41,6 +41,7 @@ void job_destroy (void *data) json_decref (job->R); json_decref (job->exception_context); json_decref (job->jobspec_updates); + json_decref (job->R_updates); zlist_destroy (&job->updates); free (job); errno = save_errno; @@ -385,7 +386,20 @@ int job_parse_jobspec_fatal (struct job *job, const char *s, json_t *updates) return job_jobspec_update (job, updates); } -static int parse_R (struct job *job, const char *s, bool allow_nonfatal) +static int load_R (struct job *job, const char *s, bool allow_nonfatal) +{ + json_error_t error; + + if (!(job->R = json_loads (s, 0, &error))) { + flux_log (job->h, LOG_ERR, + "%s: job %s invalid R: %s", + __FUNCTION__, idf58 (job->id), error.text); + return allow_nonfatal ? 0 : -1; + } + return 0; +} + +static int parse_R (struct job *job, bool allow_nonfatal) { struct rlist *rl = NULL; struct idset *idset = NULL; @@ -396,13 +410,6 @@ static int parse_R (struct job *job, const char *s, bool allow_nonfatal) struct rnode *rnode; int saved_errno, rc = -1; - if (!(job->R = json_loads (s, 0, &error))) { - flux_log (job->h, LOG_ERR, - "%s: job %s invalid R: %s", - __FUNCTION__, idf58 (job->id), error.text); - goto nonfatal_error; - } - if (!(rl = rlist_from_json (job->R, &error))) { flux_log_error (job->h, "rlist_from_json: %s", error.text); goto nonfatal_error; @@ -454,14 +461,22 @@ static int parse_R (struct job *job, const char *s, bool allow_nonfatal) return rc; } -int job_parse_R (struct job *job, const char *s) +int job_parse_R (struct job *job, const char *s, json_t *updates) { - return parse_R (job, s, true); + if (load_R (job, s, true) < 0) + return -1; + if (parse_R (job, true) < 0) + return -1; + return job_R_update (job, updates); } -int job_parse_R_fatal (struct job *job, const char *s) +int job_parse_R_fatal (struct job *job, const char *s, json_t *updates) { - return parse_R (job, s, false); + if (load_R (job, s, false) < 0) + return -1; + if (parse_R (job, false) < 0) + return -1; + return job_R_update (job, updates); } int job_jobspec_update (struct job *job, json_t *updates) @@ -494,6 +509,29 @@ int job_jobspec_update (struct job *job, json_t *updates) return parse_jobspec (job, false); } +int job_R_update (struct job *job, json_t *updates) +{ + const char *key; + json_t *value; + + if (!updates) + return 0; + + json_object_foreach (updates, key, value) { + /* RFC 21 resource-update event only allows update + * to: + * - expiration + */ + if (streq (key, "expiration")) + if (jpath_set (job->R, "execution.expiration", value) < 0) + flux_log (job->h, LOG_INFO, + "%s: job %s failed to update R key %s", + __FUNCTION__, idf58 (job->id), key); + } + + return parse_R (job, false); +} + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/modules/job-list/job_data.h b/src/modules/job-list/job_data.h index 713efd5a8e8d..9dcd3409bce7 100644 --- a/src/modules/job-list/job_data.h +++ b/src/modules/job-list/job_data.h @@ -86,8 +86,9 @@ struct job { unsigned int states_mask; unsigned int states_events_mask; void *list_handle; - /* if updates in eventlog before jobspec read from KVS */ + /* if updates in eventlog before jobspec / R read from KVS */ json_t *jobspec_updates; + json_t *R_updates; int eventlog_seq; /* last event seq read */ int submit_version; /* version number in submit context */ @@ -127,12 +128,17 @@ int job_jobspec_update (struct job *job, json_t *updates); * - ncores * - ntasks (if necessary) */ -int job_parse_R (struct job *job, const char *s); +int job_parse_R (struct job *job, const char *s, json_t *updates); /* identical to above, but all nonfatal errors will return error. * Primarily used for testing. */ -int job_parse_R_fatal (struct job *job, const char *s); +int job_parse_R_fatal (struct job *job, const char *s, json_t *updates); + +/* Update R with RFC21 defined keys + * (i.e. "expiration") and value. + */ +int job_R_update (struct job *job, json_t *updates); #endif /* ! _FLUX_JOB_LIST_JOB_DATA_H */ diff --git a/src/modules/job-list/job_state.c b/src/modules/job-list/job_state.c index 37f965aae057..74d0ab2bd066 100644 --- a/src/modules/job-list/job_state.c +++ b/src/modules/job-list/job_state.c @@ -48,6 +48,7 @@ typedef enum { JOB_UPDATE_TYPE_STATE_TRANSITION, JOB_UPDATE_TYPE_JOBSPEC_UPDATE, + JOB_UPDATE_TYPE_RESOURCE_UPDATE, } job_update_type_t; struct job_update { @@ -59,8 +60,8 @@ struct job_update { int flags; flux_job_state_t expected_state; - /* jobspec_update */ - json_t *jobspec_update_context; + /* jobspec_update, resource_update */ + json_t *update_context; /* all updates */ bool processing; /* indicates we are waiting for @@ -359,7 +360,7 @@ static void state_run_lookup_continuation (flux_future_t *f, void *arg) goto out; } - if (job_parse_R (job, s) < 0) + if (job_parse_R (job, s, NULL) < 0) goto out; updt = zlist_head (job->updates); @@ -413,7 +414,7 @@ static void job_update_destroy (void *data) struct job_update *updt = data; if (updt) { int saved_errno = errno; - json_decref (updt->jobspec_update_context); + json_decref (updt->update_context); free (updt); errno = saved_errno; } @@ -473,14 +474,14 @@ static int add_state_transition (struct job *job, return -1; } -static int add_jobspec_update (struct job *job, json_t *context) +static int add_update (struct job *job, json_t *context, job_update_type_t type) { struct job_update *updt = NULL; - if (!(updt = job_update_create (JOB_UPDATE_TYPE_JOBSPEC_UPDATE))) + if (!(updt = job_update_create (type))) return -1; - updt->jobspec_update_context = json_incref (context); + updt->update_context = json_incref (context); if (append_update (job, updt) < 0) goto cleanup; @@ -492,6 +493,16 @@ static int add_jobspec_update (struct job *job, json_t *context) return -1; } +static int add_jobspec_update (struct job *job, json_t *context) +{ + return add_update (job, context, JOB_UPDATE_TYPE_JOBSPEC_UPDATE); +} + +static int add_resource_update (struct job *job, json_t *context) +{ + return add_update (job, context, JOB_UPDATE_TYPE_RESOURCE_UPDATE); +} + static void process_state_transition_update (struct job_state_ctx *jsctx, struct job *job, struct job_update *updt) @@ -608,7 +619,40 @@ static void process_jobspec_update (struct job_state_ctx *jsctx, * example, via the job expiration time in R). */ if (job->state < FLUX_JOB_STATE_RUN) - update_jobspec (jsctx, job, updt->jobspec_update_context, true); + update_jobspec (jsctx, job, updt->update_context, true); + updt->finished = true; +} + +static void update_resource (struct job_state_ctx *jsctx, + struct job *job, + json_t *context) +{ + /* we have not loaded the R yet, save off R updates + * for an update after jobspec retrieved + */ + if (!job->R) { + if (!job->R_updates) + job->R_updates = json_incref (context); + else { + if (json_object_update (job->R_updates, context) < 0) + flux_log (jsctx->h, LOG_INFO, + "%s: job %s failed to update R", + __FUNCTION__, idf58 (job->id)); + } + return; + } + + job_R_update (job, context); +} + +static void process_resource_update (struct job_state_ctx *jsctx, + struct job *job, + struct job_update *updt) +{ + /* Generally speaking, resource-update events only have an effect + * when a job is running. */ + if (job->state == FLUX_JOB_STATE_RUN) + update_resource (jsctx, job, updt->update_context); updt->finished = true; } @@ -624,8 +668,10 @@ static void process_updates (struct job_state_ctx *jsctx, struct job *job) if (updt->type == JOB_UPDATE_TYPE_STATE_TRANSITION) process_state_transition_update (jsctx, job, updt); - else /* updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE */ + else if (updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE) process_jobspec_update (jsctx, job, updt); + else /* updt->type == JOB_UPDATE_TYPE_RESOURCE_UPDATE */ + process_resource_update (jsctx, job, updt); next: if (updt->finished) @@ -752,6 +798,9 @@ static struct job *eventlog_restart_parse (struct job_state_ctx *jsctx, else if (streq (name, "jobspec-update")) { update_jobspec (jsctx, job, context, false); } + else if (streq (name, "resource-update")) { + update_resource (jsctx, job, context); + } else if (streq (name, "flux-restart")) { revert_job_state (jsctx, job, timestamp); } @@ -852,7 +901,7 @@ static int depthfirst_map_one (struct job_state_ctx *jsctx, if (flux_kvs_lookup_get (f3, &R) < 0) goto done; - if (job_parse_R (job, R) < 0) + if (job_parse_R (job, R, job->R_updates) < 0) goto done; } @@ -1379,6 +1428,26 @@ static int journal_jobspec_update_event (struct job_state_ctx *jsctx, return 0; } +static int journal_resource_update_event (struct job_state_ctx *jsctx, + struct job *job, + json_t *context) +{ + if (!context) { + flux_log (jsctx->h, LOG_ERR, + "%s: resource-update event context invalid: %s", + __FUNCTION__, idf58 (job->id)); + errno = EPROTO; + return -1; + } + + if (add_resource_update (job, context) < 0) { + flux_log_error (jsctx->h, "%s: add_resource_update", __FUNCTION__); + return -1; + } + process_updates (jsctx, job); + return 0; +} + static int journal_dependency_event (struct job_state_ctx *jsctx, struct job *job, const char *cmd, @@ -1532,6 +1601,10 @@ static int journal_process_event (struct job_state_ctx *jsctx, json_t *event) if (journal_jobspec_update_event (jsctx, job, context) < 0) return -1; } + else if (streq (name, "resource-update")) { + if (journal_resource_update_event (jsctx, job, context) < 0) + return -1; + } else if (streq (name, "memo")) { if (memo_update (jsctx->h, job, context) < 0) return -1; diff --git a/src/modules/job-list/test/job_data.c b/src/modules/job-list/test/job_data.c index 9ca287a5ab04..ad65147fbf03 100644 --- a/src/modules/job-list/test/job_data.c +++ b/src/modules/job-list/test/job_data.c @@ -328,7 +328,7 @@ static int parse_R (struct job *job, const char *filename) read_file (filename, (void **)&data); - ret = job_parse_R_fatal (job, data); + ret = job_parse_R_fatal (job, data, NULL); free (data); return ret; @@ -756,7 +756,7 @@ static void test_jobspec_update (void) ok (ret == -1, "job_parse_jobspec does not set non jobspec field"); ret = job_jobspec_update (job, NULL); - ok (ret == 0, "jobspec update jobspec success with no update"); + ok (ret == 0, "job_jobspec_update success with no update"); if (!(o = json_pack ("{s:[{s:[s] s:s s:{s:i}}] s:s s:s s:f}", "tasks", @@ -769,7 +769,7 @@ static void test_jobspec_update (void) "attributes.system.duration", 100.0))) BAIL_OUT ("json_pack failed"); ret = job_jobspec_update (job, o); - ok (ret == 0, "jobspec update jobspec"); + ok (ret == 0, "job_jobspec_update"); json_decref (o); ret = json_unpack (job->jobspec, @@ -806,6 +806,66 @@ static void test_jobspec_update (void) free (data); } +static void test_R_update (void) +{ + struct job *job = job_create (NULL, FLUX_JOBID_ANY); + const char *filename = TEST_SRCDIR "/R/1node_1core.R"; + char *data; + int ret; + double expiration; + const char *tmp = NULL; + json_t *o; + + if (!job) + BAIL_OUT ("job_create failed"); + + read_file (filename, (void **)&data); + + if (!(o = json_pack ("{s:f}", "expiration", 100.0))) + BAIL_OUT ("json_pack failed"); + + if (job_parse_R (job, data, o) < 0) + BAIL_OUT ("cannot load basic R"); + + json_decref (o); + + ret = json_unpack (job->R, + "{s:{s:F}}", + "execution", + "expiration", &expiration); + ok (ret == 0, "parsed initial R expiration"); + + ok (expiration == 100.0, "initial R expiration == 100.0"); + ok (job->expiration == 100.0, "initial job->expiration == 100.0"); + + ret = job_R_update (job, NULL); + ok (ret == 0, "job_R_update success with no update"); + + if (!(o = json_pack ("{s:f s:s}", + "expiration", 200.0, + "dummy", "dummy"))) + BAIL_OUT ("json_pack failed"); + ret = job_R_update (job, o); + ok (ret == 0, "job_R_update"); + json_decref (o); + + ret = json_unpack (job->R, + "{s:{s:F}}", + "execution", + "expiration", &expiration); + ok (ret == 0, "parsed updated R expiration"); + + ok (expiration == 200.0, "R expiration == 200.0"); + ok (job->expiration == 200.0, "job->expiration == 200.0"); + + ret = json_unpack (job->R, "{s?s}", "dummy", &tmp); + ok (ret == 0, "parsed updated R dummy"); + + ok (tmp == NULL, "R not updated with illegal update key"); + + free (data); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); @@ -823,6 +883,7 @@ int main (int argc, char *argv[]) test_ntasks (); test_ncores (); test_jobspec_update (); + test_R_update (); done_testing (); } diff --git a/t/t2260-job-list.t b/t/t2260-job-list.t index 9e09762254b3..68c58ec5ef84 100755 --- a/t/t2260-job-list.t +++ b/t/t2260-job-list.t @@ -2236,6 +2236,46 @@ test_expect_success 'remove jobtap plugins and remove queue config' ' flux config load < /dev/null ' +# +# resource-update event testing +# + +# the resource-update-expiration plugin will add 60 minutes to the +# expiration time of a job +# N.B. in future, may wish to do test via a `flux update` or similar tool +test_expect_success 'support resource-update event' ' + flux jobtap load --remove=all ${PLUGINPATH}/resource-update-expiration.so +' + +test_expect_success 'run a job' ' + flux submit --time-limit=5m --wait-event=start sleep 300 | flux job id > rupdate1.id +' + +# expiration should be 65m out, make sure it is > 10m, i.e. not 5m +test_expect_success 'expiration is in the future' ' + current=$(date +%s) && + testexp=`expr $current + 10 \* 60` && + echo $testexp > test_expiration.out && + flux job list -s active | grep $(cat rupdate1.id) | jq -e ".expiration > $(cat test_expiration.out)" +' + +test_expect_success 'cancel job' ' + flux cancel $(cat rupdate1.id) +' + +test_expect_success 'reload the job-list module' ' + flux module reload job-list && + wait_id_inactive $(cat rupdate1.id) +' + +test_expect_success 'job-list returns expected resource changes after reload' ' + flux job list -s inactive | grep $(cat rupdate1.id) | jq -e ".expiration > $(cat test_expiration.out)" +' + +test_expect_success 'remove jobtap plugins' ' + flux jobtap remove all +' + # # job list special cases #