Skip to content

Commit

Permalink
testsuite: cover python kvslookup base option
Browse files Browse the repository at this point in the history
Problem: There is no coverage for the 'base' option and the "R"
key in job kvs lookup via Python.

Add some coverage in python/t0014-job-kvslookup.py. Use a jobtap
plugin to generate resource-update event for tests.
  • Loading branch information
chu11 authored and grondo committed Oct 25, 2023
1 parent c2a1cf3 commit ef87fab
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 6 deletions.
10 changes: 10 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ check_LTLIBRARIES = \
job-manager/plugins/config.la \
job-manager/plugins/jobspec-update.la \
job-manager/plugins/jobspec-update-job-list.la \
job-manager/plugins/resource-update-expiration.la \
job-manager/plugins/update-test.la \
job-manager/plugins/project-bank-validate.la \
stats/stats-basic.la \
Expand Down Expand Up @@ -1003,6 +1004,15 @@ job_manager_plugins_jobspec_update_job_list_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la

job_manager_plugins_resource_update_expiration_la_SOURCES = \
job-manager/plugins/resource-update-expiration.c
job_manager_plugins_resource_update_expiration_la_CPPFLAGS = \
$(test_cppflags)
job_manager_plugins_resource_update_expiration_la_LDFLAGS = \
$(fluxplugin_ldflags) -module -rpath /nowhere
job_manager_plugins_resource_update_expiration_la_LIBADD = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la

job_manager_plugins_update_test_la_SOURCES = \
job-manager/plugins/update-test.c
Expand Down
69 changes: 69 additions & 0 deletions t/job-manager/plugins/resource-update-expiration.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* jobspec-update-job-list.c - test jobspec-update event in job-list
* module
*/

#include <flux/core.h>
#include <flux/jobtap.h>

#include "ccan/str/str.h"
#include "src/common/libutil/errprintf.h"

static int run_cb (flux_plugin_t *p,
const char *topic,
flux_plugin_arg_t *args,
void *data)
{
double expiration;
flux_jobid_t id;

if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:I s:{s:{s:F}}}",
"id", &id,
"R",
"execution",
"expiration", &expiration) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"resource-update", 0,
"unpack failure");
return -1;
}

if (flux_jobtap_event_post_pack (p,
id,
"resource-update",
"{s:f}",
"expiration", expiration + 3600.) < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"resource-update", 0,
"update failure");
return -1;
}
return 0;
}

static const struct flux_plugin_handler tab[] = {
{ "job.state.run", run_cb, NULL },
{ 0 },
};

int flux_plugin_init (flux_plugin_t *p)
{
if (flux_plugin_register (p, "resource-update-expiration", tab) < 0)
return -1;
return 0;
}

// vi:ts=4 sw=4 expandtab
103 changes: 97 additions & 6 deletions t/python/t0014-job-kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
###############################################################

import json
import os
import unittest

import flux
Expand All @@ -35,6 +36,17 @@ def submitJob(self, command, urgency):
@classmethod
def setUpClass(self):
self.fh = flux.Flux()

# in future use more standard update mechanism instead of
# jobtap plugin. This jobtap plugin will simply increase the
# job expiration by 60 minutes.
pluginpath = (
os.environ["FLUX_BUILD_DIR"]
+ "/t/job-manager/plugins/.libs/resource-update-expiration.so"
)
payload = {"load": pluginpath}
self.fh.rpc("job-manager.jobtap", payload).get()

self.jobid1 = self.submitJob(["hostname"], 0)
flux.job.event_wait(self.fh, self.jobid1, name="priority")
update = {"attributes.system.duration": 100.0}
Expand All @@ -43,6 +55,10 @@ def setUpClass(self):
payload = {"id": self.jobid1, "urgency": 16}
self.fh.rpc("job-manager.urgency", payload).get()
flux.job.event_wait(self.fh, self.jobid1, name="clean")

payload = {"remove": "all"}
self.fh.rpc("job-manager.jobtap", payload).get()

self.jobid2 = self.submitJob(["hostname"], 16)
flux.job.event_wait(self.fh, self.jobid2, name="clean")

Expand Down Expand Up @@ -116,6 +132,28 @@ def check_jobspec_base_decoded(self, data, jobid):
self.assertEqual(data["jobspec"]["tasks"][0]["command"][0], "hostname")
self.assertEqual(data["jobspec"]["attributes"]["system"]["duration"], 0)

def check_R_base_str(self, jobid, base, data):
self.assertEqual(base["id"], jobid)
self.assertEqual(data["id"], jobid)
self.assertIn("R", base)
self.assertIn("R", data)
self.assertEqual(type(base["R"]), str)
self.assertEqual(type(data["R"]), str)
R_base = json.loads(base["R"])
R_data = json.loads(data["R"])
base_expiration = R_base["execution"]["expiration"]
data_expiration = R_data["execution"]["expiration"]
self.assertGreater(data_expiration, base_expiration)

def check_R_base_decoded(self, jobid, base, data):
self.assertEqual(base["id"], jobid)
self.assertEqual(data["id"], jobid)
self.assertIn("R", base)
self.assertIn("R", data)
base_expiration = base["R"]["execution"]["expiration"]
data_expiration = data["R"]["execution"]["expiration"]
self.assertGreater(data_expiration, base_expiration)

def test_info_00_job_info_lookup(self):
rpc = flux.job.job_info_lookup(self.fh, self.jobid1)
data = rpc.get()
Expand Down Expand Up @@ -217,10 +255,37 @@ def test_lookup_13_job_kvs_lookup_jobspec_base_multiple_keys(self):
self.assertIn("eventlog", data)
self.check_jobspec_base_decoded(data, self.jobid1)

def test_lookup_14_job_kvs_lookup_base_no_jobspec(self):
data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R", "J"], base=True)
def test_lookup_14_job_kvs_lookup_R_base(self):
base = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"], base=True)
data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"])
self.assertNotIn("eventlog", base)
self.assertNotIn("eventlog", data)
self.check_R_base_decoded(self.jobid1, base, data)

def test_lookup_15_job_kvs_lookup_R_base_nodecode(self):
base = flux.job.job_kvs_lookup(
self.fh, self.jobid1, keys=["R"], decode=False, base=True
)
data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"], decode=False)
self.assertNotIn("eventlog", base)
self.assertNotIn("eventlog", data)
self.check_R_base_str(self.jobid1, base, data)

def test_lookup_16_job_kvs_lookup_R_base_multiple_keys(self):
base = flux.job.job_kvs_lookup(
self.fh, self.jobid1, keys=["R", "eventlog"], decode=False, base=True
)
data = flux.job.job_kvs_lookup(
self.fh, self.jobid1, keys=["R", "eventlog"], decode=False
)
self.assertIn("eventlog", base)
self.assertIn("eventlog", data)
self.check_R_base_str(self.jobid1, base, data)

def test_lookup_17_job_kvs_lookup_base_no_jobspec_R(self):
data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["J"], base=True)
self.assertNotIn("jobspec", data)
self.check_R_decoded(data, self.jobid1)
self.assertNotIn("R", data)
self.check_J_decoded(data, self.jobid1)

def test_list_00_job_kvs_lookup_list(self):
Expand Down Expand Up @@ -337,12 +402,38 @@ def test_list_14_job_kvs_lookup_list_jobspec_base_multiple_keys(self):
self.assertIn("J", data[0])
self.check_jobspec_base_decoded(data[0], self.jobid1)

def test_list_15_job_kvs_lookup_list_base_no_jobspec(self):
def test_list_15_job_kvs_lookup_list_R_base(self):
ids = [self.jobid1]
base = flux.job.JobKVSLookup(self.fh, ids, keys=["R"], base=True).data()
data = flux.job.JobKVSLookup(self.fh, ids, keys=["R"]).data()
self.assertEqual(len(base), 1)
self.assertEqual(len(data), 1)
self.check_R_base_decoded(self.jobid1, base[0], data[0])

def test_list_16_job_kvs_lookup_list_R_base_nodecode(self):
ids = [self.jobid1]
base = flux.job.JobKVSLookup(
self.fh, ids, keys=["R"], decode=False, base=True
).data()
data = flux.job.JobKVSLookup(self.fh, ids, keys=["R"], decode=False).data()
self.assertEqual(len(base), 1)
self.assertEqual(len(data), 1)
self.check_R_base_str(self.jobid1, base[0], data[0])

def test_list_17_job_kvs_lookup_list_R_base_multiple_keys(self):
ids = [self.jobid1]
data = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"], base=True).data()
base = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"], base=True).data()
data = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"]).data()
self.assertEqual(len(data), 1)
self.assertIn("J", data[0])
self.check_R_base_decoded(self.jobid1, base[0], data[0])

def test_list_18_job_kvs_lookup_list_base_no_jobspec_R(self):
ids = [self.jobid1]
data = flux.job.JobKVSLookup(self.fh, ids, keys=["J"], base=True).data()
self.assertEqual(len(data), 1)
self.assertNotIn("jobspec", data[0])
self.check_R_decoded(data[0], self.jobid1)
self.assertNotIn("R", data[0])
self.check_J_decoded(data[0], self.jobid1)


Expand Down

0 comments on commit ef87fab

Please sign in to comment.