Skip to content

Commit

Permalink
Merge pull request #5464 from chu11/issue5424_flux_job_info_updated_R
Browse files Browse the repository at this point in the history
flux-job: get updated version of R
  • Loading branch information
mergify[bot] authored Oct 25, 2023
2 parents 9528de4 + ef87fab commit 6745065
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 46 deletions.
39 changes: 34 additions & 5 deletions src/bindings/python/flux/job/kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from flux.job import JobID, JobspecV1
from flux.job.event import EventLogEvent
from flux.rpc import RPC
from flux.util import set_treedict


def _decode_field(data, key):
Expand Down Expand Up @@ -63,6 +64,10 @@ def _setup_lookup_keys(keys, original, base):
elif not base:
if "eventlog" not in keys:
keys.append("eventlog")
if "R" in keys:
if not base:
if "eventlog" not in keys:
keys.append("eventlog")


def _get_original_jobspec(job_data):
Expand All @@ -87,7 +92,22 @@ def _get_updated_jobspec(job_data):
return jobspec.dumps()


def _get_updated_R(job_data):
if isinstance(job_data["R"], str):
R = json.loads(job_data["R"])
else:
R = job_data["R"]
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "resource-update":
for key, value in event.context.items():
if key == "expiration":
set_treedict(R, "execution.expiration", value)
return json.dumps(R, ensure_ascii=False)


def _update_keys(job_data, decode, keys, original, base):
remove_eventlog = False
if "jobspec" in keys:
if original:
job_data["jobspec"] = _get_original_jobspec(job_data)
Expand All @@ -100,7 +120,16 @@ def _update_keys(job_data, decode, keys, original, base):
if decode:
_decode_field(job_data, "jobspec")
if "eventlog" not in keys:
job_data.pop("eventlog")
remove_eventlog = True
if "R" in keys:
if not base:
job_data["R"] = _get_updated_R(job_data)
if decode:
_decode_field(job_data, "R")
if "eventlog" not in keys:
remove_eventlog = True
if remove_eventlog:
job_data.pop("eventlog")


# jobs_kvs_lookup simple variant for one jobid
Expand All @@ -110,7 +139,7 @@ def job_kvs_lookup(
"""
Lookup job kvs data based on a jobid
Some keys such as "jobspec" may be altered based on update events
Some keys such as "jobspec" or "R" may be altered based on update events
in the eventlog. Set 'base' to True to skip these updates and
read exactly what is in the KVS.
Expand All @@ -121,7 +150,7 @@ def job_kvs_lookup(
currently decodes "jobspec" and "R" into dicts
(default True)
:original: For 'jobspec', return the original submitted jobspec
:base: For 'jobspec', get base value, do not apply updates from eventlog
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
"""
keyslookup = list(keys)
_setup_lookup_keys(keyslookup, original, base)
Expand Down Expand Up @@ -182,7 +211,7 @@ def get_decode(self):
class JobKVSLookup:
"""User friendly class to lookup job KVS data
Some keys such as "jobspec" may be altered based on update events
Some keys such as "jobspec" or "R" may be altered based on update events
in the eventlog. Set 'base' to True to skip these updates and
read exactly what is in the KVS.
Expand All @@ -193,7 +222,7 @@ class JobKVSLookup:
currently decodes "jobspec" and "R" into dicts
(default True)
:original: For 'jobspec', return the original submitted jobspec
:base: For 'jobspec', get base value, do not apply updates from eventlog
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
"""

def __init__(
Expand Down
70 changes: 69 additions & 1 deletion src/cmd/flux-job.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ static struct optparse_option info_opts[] = {
.usage = "For key \"jobspec\", return the original submitted jobspec",
},
{ .name = "base", .key = 'b', .has_arg = 0,
.usage = "For key \"jobspec\", get base value, "
.usage = "For key \"jobspec\" or \"R\", "
"do not apply updates from eventlog",
},
OPTPARSE_TABLE_END
Expand Down Expand Up @@ -3463,6 +3463,63 @@ void info_output_jobspec (flux_future_t *f, struct info_ctx *ctx)
}
}

void info_output_R (flux_future_t *f, struct info_ctx *ctx)
{
if (ctx->base) {
const char *R_str;
info_output_get (f, ctx, "R", &R_str);
printf ("%s\n", R_str);
}
else {
const char *R_str;
const char *eventlog_str;
json_t *R;
json_t *eventlog;
json_error_t error;
size_t index;
json_t *entry;
char *R_updated;

info_output_get (f, ctx, "R", &R_str);
info_output_get (f, ctx, "eventlog", &eventlog_str);

if (!(R = json_loads (R_str, JSON_DECODE_ANY, &error)))
log_msg_exit ("Failed to decode R: %s", error.text);

if (!(eventlog = eventlog_decode (eventlog_str)))
log_err_exit ("Failed to decode eventlog");

json_array_foreach (eventlog, index, entry) {
const char *name;
json_t *context;
const char *path;
json_t *value;

if (eventlog_entry_parse (entry, NULL, &name, &context) < 0)
log_err_exit ("Failed to parse eventlog entry");

if (!streq (name, "resource-update"))
continue;

json_object_foreach (context, path, value) {
if (streq (path, "expiration")) {
if (jpath_set (R, "execution.expiration", value) < 0)
log_err_exit ("Failed to update R");
}
}
}

if (!(R_updated = json_dumps (R, JSON_COMPACT)))
log_err_exit ("Failed to decode R object");

printf ("%s\n", R_updated);

json_decref (R);
json_decref (eventlog);
free (R_updated);
}
}

void info_output (flux_future_t *f, const char *key, struct info_ctx *ctx)
{
const char *s;
Expand All @@ -3471,6 +3528,10 @@ void info_output (flux_future_t *f, const char *key, struct info_ctx *ctx)
info_output_jobspec (f, ctx);
return;
}
else if (streq (key, "R")) {
info_output_R (f, ctx);
return;
}

info_output_get (f, ctx, key, &s);
printf ("%s\n", s);
Expand Down Expand Up @@ -3531,6 +3592,13 @@ void info_lookup (flux_t *h,
/* also get eventlog to build viewed jobspec */
extra_key = "eventlog";
}
else if (streq (key, "R")) {
if (optparse_hasopt (p, "base"))
ctx.base = true;
else
/* also get eventlog to build viewed R */
extra_key = "eventlog";
}

/* N.B. job-info.lookup will ignore duplicate keys, in the
* event user specified duplicate keys or options that lead to
Expand Down
10 changes: 10 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ check_LTLIBRARIES = \
job-manager/plugins/config.la \
job-manager/plugins/jobspec-update.la \
job-manager/plugins/jobspec-update-job-list.la \
job-manager/plugins/resource-update-expiration.la \
job-manager/plugins/update-test.la \
job-manager/plugins/project-bank-validate.la \
stats/stats-basic.la \
Expand Down Expand Up @@ -1003,6 +1004,15 @@ job_manager_plugins_jobspec_update_job_list_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la

job_manager_plugins_resource_update_expiration_la_SOURCES = \
job-manager/plugins/resource-update-expiration.c
job_manager_plugins_resource_update_expiration_la_CPPFLAGS = \
$(test_cppflags)
job_manager_plugins_resource_update_expiration_la_LDFLAGS = \
$(fluxplugin_ldflags) -module -rpath /nowhere
job_manager_plugins_resource_update_expiration_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la

job_manager_plugins_update_test_la_SOURCES = \
job-manager/plugins/update-test.c
Expand Down
69 changes: 69 additions & 0 deletions t/job-manager/plugins/resource-update-expiration.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* jobspec-update-job-list.c - test jobspec-update event in job-list
* module
*/

#include <flux/core.h>
#include <flux/jobtap.h>

#include "ccan/str/str.h"
#include "src/common/libutil/errprintf.h"

static int run_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
double expiration;
flux_jobid_t id;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I s:{s:{s:F}}}",
"id", &id,
"R",
"execution",
"expiration", &expiration) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"resource-update", 0,
"unpack failure");
return -1;
}

if (flux_jobtap_event_post_pack (p,
id,
"resource-update",
"{s:f}",
"expiration", expiration + 3600.) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"resource-update", 0,
"update failure");
return -1;
}
return 0;
}

static const struct flux_plugin_handler tab[] = {
{ "job.state.run", run_cb, NULL },
{ 0 },
};

int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "resource-update-expiration", tab) < 0)
return -1;
return 0;
}

// vi:ts=4 sw=4 expandtab
Loading

0 comments on commit 6745065

Please sign in to comment.