Skip to content

Commit

Permalink
job-list: support resource-update event
Browse files Browse the repository at this point in the history
Problem: resource-update events will allow users to update the internally
viewed R.  Job-list does not handle these events and therefore
the R data returned by job-list may not represent what is used
by the job-manager and other parts of flux.

Support the resource-update event in job-list.

Fixes #5397
  • Loading branch information
chu11 authored and grondo committed Oct 26, 2023
1 parent f25c1aa commit a1dd5fc
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/modules/job-list/job_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void job_destroy (void *data)
json_decref (job->R);
json_decref (job->exception_context);
json_decref (job->jobspec_updates);
json_decref (job->R_updates);
zlist_destroy (&job->updates);
free (job);
errno = save_errno;
Expand Down
3 changes: 2 additions & 1 deletion src/modules/job-list/job_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ struct job {
unsigned int states_mask;
unsigned int states_events_mask;
void *list_handle;
/* if updates in eventlog before jobspec read from KVS */
/* if updates in eventlog before jobspec / R read from KVS */
json_t *jobspec_updates;
json_t *R_updates;

int eventlog_seq; /* last event seq read */
int submit_version; /* version number in submit context */
Expand Down
91 changes: 82 additions & 9 deletions src/modules/job-list/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
typedef enum {
JOB_UPDATE_TYPE_STATE_TRANSITION,
JOB_UPDATE_TYPE_JOBSPEC_UPDATE,
JOB_UPDATE_TYPE_RESOURCE_UPDATE,
} job_update_type_t;

struct job_update {
Expand All @@ -59,8 +60,8 @@ struct job_update {
int flags;
flux_job_state_t expected_state;

/* jobspec_update */
json_t *jobspec_update_context;
/* jobspec_update, resource_update */
json_t *update_context;

/* all updates */
bool processing; /* indicates we are waiting for
Expand Down Expand Up @@ -413,7 +414,7 @@ static void job_update_destroy (void *data)
struct job_update *updt = data;
if (updt) {
int saved_errno = errno;
json_decref (updt->jobspec_update_context);
json_decref (updt->update_context);
free (updt);
errno = saved_errno;
}
Expand Down Expand Up @@ -473,14 +474,14 @@ static int add_state_transition (struct job *job,
return -1;
}

static int add_jobspec_update (struct job *job, json_t *context)
static int add_update (struct job *job, json_t *context, job_update_type_t type)
{
struct job_update *updt = NULL;

if (!(updt = job_update_create (JOB_UPDATE_TYPE_JOBSPEC_UPDATE)))
if (!(updt = job_update_create (type)))
return -1;

updt->jobspec_update_context = json_incref (context);
updt->update_context = json_incref (context);

if (append_update (job, updt) < 0)
goto cleanup;
Expand All @@ -492,6 +493,16 @@ static int add_jobspec_update (struct job *job, json_t *context)
return -1;
}

static int add_jobspec_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_JOBSPEC_UPDATE);
}

static int add_resource_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_RESOURCE_UPDATE);
}

static void process_state_transition_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
Expand Down Expand Up @@ -608,7 +619,40 @@ static void process_jobspec_update (struct job_state_ctx *jsctx,
* example, via the job expiration time in R).
*/
if (job->state < FLUX_JOB_STATE_RUN)
update_jobspec (jsctx, job, updt->jobspec_update_context, true);
update_jobspec (jsctx, job, updt->update_context, true);
updt->finished = true;
}

static void update_resource (struct job_state_ctx *jsctx,
struct job *job,
json_t *context)
{
/* we have not loaded the R yet, save off R updates
* for an update after jobspec retrieved
*/
if (!job->R) {
if (!job->R_updates)
job->R_updates = json_incref (context);
else {
if (json_object_update (job->R_updates, context) < 0)
flux_log (jsctx->h, LOG_INFO,
"%s: job %s failed to update R",
__FUNCTION__, idf58 (job->id));
}
return;
}

job_R_update (job, context);
}

static void process_resource_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
{
/* Generally speaking, resource-update events only have an effect
* when a job is running. */
if (job->state == FLUX_JOB_STATE_RUN)
update_resource (jsctx, job, updt->update_context);
updt->finished = true;
}

Expand All @@ -624,8 +668,10 @@ static void process_updates (struct job_state_ctx *jsctx, struct job *job)

if (updt->type == JOB_UPDATE_TYPE_STATE_TRANSITION)
process_state_transition_update (jsctx, job, updt);
else /* updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE */
else if (updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE)
process_jobspec_update (jsctx, job, updt);
else /* updt->type == JOB_UPDATE_TYPE_RESOURCE_UPDATE */
process_resource_update (jsctx, job, updt);

next:
if (updt->finished)
Expand Down Expand Up @@ -752,6 +798,9 @@ static struct job *eventlog_restart_parse (struct job_state_ctx *jsctx,
else if (streq (name, "jobspec-update")) {
update_jobspec (jsctx, job, context, false);
}
else if (streq (name, "resource-update")) {
update_resource (jsctx, job, context);
}
else if (streq (name, "flux-restart")) {
revert_job_state (jsctx, job, timestamp);
}
Expand Down Expand Up @@ -852,7 +901,7 @@ static int depthfirst_map_one (struct job_state_ctx *jsctx,
if (flux_kvs_lookup_get (f3, &R) < 0)
goto done;

if (job_parse_R (job, R, NULL) < 0)
if (job_parse_R (job, R, job->R_updates) < 0)
goto done;
}

Expand Down Expand Up @@ -1379,6 +1428,26 @@ static int journal_jobspec_update_event (struct job_state_ctx *jsctx,
return 0;
}

static int journal_resource_update_event (struct job_state_ctx *jsctx,
struct job *job,
json_t *context)
{
if (!context) {
flux_log (jsctx->h, LOG_ERR,
"%s: resource-update event context invalid: %s",
__FUNCTION__, idf58 (job->id));
errno = EPROTO;
return -1;
}

if (add_resource_update (job, context) < 0) {
flux_log_error (jsctx->h, "%s: add_resource_update", __FUNCTION__);
return -1;
}
process_updates (jsctx, job);
return 0;
}

static int journal_dependency_event (struct job_state_ctx *jsctx,
struct job *job,
const char *cmd,
Expand Down Expand Up @@ -1532,6 +1601,10 @@ static int journal_process_event (struct job_state_ctx *jsctx, json_t *event)
if (journal_jobspec_update_event (jsctx, job, context) < 0)
return -1;
}
else if (streq (name, "resource-update")) {
if (journal_resource_update_event (jsctx, job, context) < 0)
return -1;
}
else if (streq (name, "memo")) {
if (memo_update (jsctx->h, job, context) < 0)
return -1;
Expand Down

0 comments on commit a1dd5fc

Please sign in to comment.