Skip to content

Commit

Permalink
Merge pull request #5463 from chu11/issue5397_job_list_resource_update
Browse files Browse the repository at this point in the history
job-list: support resource-update event
  • Loading branch information
mergify[bot] authored Oct 26, 2023
2 parents 6745065 + 026e017 commit ce603cc
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 28 deletions.
62 changes: 50 additions & 12 deletions src/modules/job-list/job_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void job_destroy (void *data)
json_decref (job->R);
json_decref (job->exception_context);
json_decref (job->jobspec_updates);
json_decref (job->R_updates);
zlist_destroy (&job->updates);
free (job);
errno = save_errno;
Expand Down Expand Up @@ -385,7 +386,20 @@ int job_parse_jobspec_fatal (struct job *job, const char *s, json_t *updates)
return job_jobspec_update (job, updates);
}

static int parse_R (struct job *job, const char *s, bool allow_nonfatal)
static int load_R (struct job *job, const char *s, bool allow_nonfatal)
{
json_error_t error;

if (!(job->R = json_loads (s, 0, &error))) {
flux_log (job->h, LOG_ERR,
"%s: job %s invalid R: %s",
__FUNCTION__, idf58 (job->id), error.text);
return allow_nonfatal ? 0 : -1;
}
return 0;
}

static int parse_R (struct job *job, bool allow_nonfatal)
{
struct rlist *rl = NULL;
struct idset *idset = NULL;
Expand All @@ -396,13 +410,6 @@ static int parse_R (struct job *job, const char *s, bool allow_nonfatal)
struct rnode *rnode;
int saved_errno, rc = -1;

if (!(job->R = json_loads (s, 0, &error))) {
flux_log (job->h, LOG_ERR,
"%s: job %s invalid R: %s",
__FUNCTION__, idf58 (job->id), error.text);
goto nonfatal_error;
}

if (!(rl = rlist_from_json (job->R, &error))) {
flux_log_error (job->h, "rlist_from_json: %s", error.text);
goto nonfatal_error;
Expand Down Expand Up @@ -454,14 +461,22 @@ static int parse_R (struct job *job, const char *s, bool allow_nonfatal)
return rc;
}

int job_parse_R (struct job *job, const char *s)
int job_parse_R (struct job *job, const char *s, json_t *updates)
{
return parse_R (job, s, true);
if (load_R (job, s, true) < 0)
return -1;
if (parse_R (job, true) < 0)
return -1;
return job_R_update (job, updates);
}

int job_parse_R_fatal (struct job *job, const char *s)
int job_parse_R_fatal (struct job *job, const char *s, json_t *updates)
{
return parse_R (job, s, false);
if (load_R (job, s, false) < 0)
return -1;
if (parse_R (job, false) < 0)
return -1;
return job_R_update (job, updates);
}

int job_jobspec_update (struct job *job, json_t *updates)
Expand Down Expand Up @@ -494,6 +509,29 @@ int job_jobspec_update (struct job *job, json_t *updates)
return parse_jobspec (job, false);
}

int job_R_update (struct job *job, json_t *updates)
{
const char *key;
json_t *value;

if (!updates)
return 0;

json_object_foreach (updates, key, value) {
/* RFC 21 resource-update event only allows update
* to:
* - expiration
*/
if (streq (key, "expiration"))
if (jpath_set (job->R, "execution.expiration", value) < 0)
flux_log (job->h, LOG_INFO,
"%s: job %s failed to update R key %s",
__FUNCTION__, idf58 (job->id), key);
}

return parse_R (job, false);
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
12 changes: 9 additions & 3 deletions src/modules/job-list/job_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ struct job {
unsigned int states_mask;
unsigned int states_events_mask;
void *list_handle;
/* if updates in eventlog before jobspec read from KVS */
/* if updates in eventlog before jobspec / R read from KVS */
json_t *jobspec_updates;
json_t *R_updates;

int eventlog_seq; /* last event seq read */
int submit_version; /* version number in submit context */
Expand Down Expand Up @@ -127,12 +128,17 @@ int job_jobspec_update (struct job *job, json_t *updates);
* - ncores
* - ntasks (if necessary)
*/
int job_parse_R (struct job *job, const char *s);
int job_parse_R (struct job *job, const char *s, json_t *updates);

/* identical to above, but all nonfatal errors will return error.
* Primarily used for testing.
*/
int job_parse_R_fatal (struct job *job, const char *s);
int job_parse_R_fatal (struct job *job, const char *s, json_t *updates);

/* Update R with RFC21 defined keys
* (i.e. "expiration") and value.
*/
int job_R_update (struct job *job, json_t *updates);

#endif /* ! _FLUX_JOB_LIST_JOB_DATA_H */

Expand Down
93 changes: 83 additions & 10 deletions src/modules/job-list/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
typedef enum {
JOB_UPDATE_TYPE_STATE_TRANSITION,
JOB_UPDATE_TYPE_JOBSPEC_UPDATE,
JOB_UPDATE_TYPE_RESOURCE_UPDATE,
} job_update_type_t;

struct job_update {
Expand All @@ -59,8 +60,8 @@ struct job_update {
int flags;
flux_job_state_t expected_state;

/* jobspec_update */
json_t *jobspec_update_context;
/* jobspec_update, resource_update */
json_t *update_context;

/* all updates */
bool processing; /* indicates we are waiting for
Expand Down Expand Up @@ -359,7 +360,7 @@ static void state_run_lookup_continuation (flux_future_t *f, void *arg)
goto out;
}

if (job_parse_R (job, s) < 0)
if (job_parse_R (job, s, NULL) < 0)
goto out;

updt = zlist_head (job->updates);
Expand Down Expand Up @@ -413,7 +414,7 @@ static void job_update_destroy (void *data)
struct job_update *updt = data;
if (updt) {
int saved_errno = errno;
json_decref (updt->jobspec_update_context);
json_decref (updt->update_context);
free (updt);
errno = saved_errno;
}
Expand Down Expand Up @@ -473,14 +474,14 @@ static int add_state_transition (struct job *job,
return -1;
}

static int add_jobspec_update (struct job *job, json_t *context)
static int add_update (struct job *job, json_t *context, job_update_type_t type)
{
struct job_update *updt = NULL;

if (!(updt = job_update_create (JOB_UPDATE_TYPE_JOBSPEC_UPDATE)))
if (!(updt = job_update_create (type)))
return -1;

updt->jobspec_update_context = json_incref (context);
updt->update_context = json_incref (context);

if (append_update (job, updt) < 0)
goto cleanup;
Expand All @@ -492,6 +493,16 @@ static int add_jobspec_update (struct job *job, json_t *context)
return -1;
}

static int add_jobspec_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_JOBSPEC_UPDATE);
}

static int add_resource_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_RESOURCE_UPDATE);
}

static void process_state_transition_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
Expand Down Expand Up @@ -608,7 +619,40 @@ static void process_jobspec_update (struct job_state_ctx *jsctx,
* example, via the job expiration time in R).
*/
if (job->state < FLUX_JOB_STATE_RUN)
update_jobspec (jsctx, job, updt->jobspec_update_context, true);
update_jobspec (jsctx, job, updt->update_context, true);
updt->finished = true;
}

static void update_resource (struct job_state_ctx *jsctx,
struct job *job,
json_t *context)
{
/* we have not loaded the R yet, save off R updates
* for an update after jobspec retrieved
*/
if (!job->R) {
if (!job->R_updates)
job->R_updates = json_incref (context);
else {
if (json_object_update (job->R_updates, context) < 0)
flux_log (jsctx->h, LOG_INFO,
"%s: job %s failed to update R",
__FUNCTION__, idf58 (job->id));
}
return;
}

job_R_update (job, context);
}

static void process_resource_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
{
/* Generally speaking, resource-update events only have an effect
* when a job is running. */
if (job->state == FLUX_JOB_STATE_RUN)
update_resource (jsctx, job, updt->update_context);
updt->finished = true;
}

Expand All @@ -624,8 +668,10 @@ static void process_updates (struct job_state_ctx *jsctx, struct job *job)

if (updt->type == JOB_UPDATE_TYPE_STATE_TRANSITION)
process_state_transition_update (jsctx, job, updt);
else /* updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE */
else if (updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE)
process_jobspec_update (jsctx, job, updt);
else /* updt->type == JOB_UPDATE_TYPE_RESOURCE_UPDATE */
process_resource_update (jsctx, job, updt);

next:
if (updt->finished)
Expand Down Expand Up @@ -752,6 +798,9 @@ static struct job *eventlog_restart_parse (struct job_state_ctx *jsctx,
else if (streq (name, "jobspec-update")) {
update_jobspec (jsctx, job, context, false);
}
else if (streq (name, "resource-update")) {
update_resource (jsctx, job, context);
}
else if (streq (name, "flux-restart")) {
revert_job_state (jsctx, job, timestamp);
}
Expand Down Expand Up @@ -852,7 +901,7 @@ static int depthfirst_map_one (struct job_state_ctx *jsctx,
if (flux_kvs_lookup_get (f3, &R) < 0)
goto done;

if (job_parse_R (job, R) < 0)
if (job_parse_R (job, R, job->R_updates) < 0)
goto done;
}

Expand Down Expand Up @@ -1379,6 +1428,26 @@ static int journal_jobspec_update_event (struct job_state_ctx *jsctx,
return 0;
}

static int journal_resource_update_event (struct job_state_ctx *jsctx,
struct job *job,
json_t *context)
{
if (!context) {
flux_log (jsctx->h, LOG_ERR,
"%s: resource-update event context invalid: %s",
__FUNCTION__, idf58 (job->id));
errno = EPROTO;
return -1;
}

if (add_resource_update (job, context) < 0) {
flux_log_error (jsctx->h, "%s: add_resource_update", __FUNCTION__);
return -1;
}
process_updates (jsctx, job);
return 0;
}

static int journal_dependency_event (struct job_state_ctx *jsctx,
struct job *job,
const char *cmd,
Expand Down Expand Up @@ -1532,6 +1601,10 @@ static int journal_process_event (struct job_state_ctx *jsctx, json_t *event)
if (journal_jobspec_update_event (jsctx, job, context) < 0)
return -1;
}
else if (streq (name, "resource-update")) {
if (journal_resource_update_event (jsctx, job, context) < 0)
return -1;
}
else if (streq (name, "memo")) {
if (memo_update (jsctx->h, job, context) < 0)
return -1;
Expand Down
Loading

0 comments on commit ce603cc

Please sign in to comment.