From ef87fab6c80e98648fd6d9f65d65de76d29cf632 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Wed, 20 Sep 2023 14:38:53 -0700 Subject: [PATCH] testsuite: cover python kvslookup `base` option Problem: There is no coverage for the 'base' option and the "R" key in job kvs lookup via Python. Add some coverage in python/t0014-job-kvslookup.py. Use a jobtap plugin to generate resource-update event for tests. --- t/Makefile.am | 10 ++ .../plugins/resource-update-expiration.c | 69 ++++++++++++ t/python/t0014-job-kvslookup.py | 103 +++++++++++++++++- 3 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 t/job-manager/plugins/resource-update-expiration.c diff --git a/t/Makefile.am b/t/Makefile.am index 1da5afd93490..f1648cc88b7c 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -483,6 +483,7 @@ check_LTLIBRARIES = \ job-manager/plugins/config.la \ job-manager/plugins/jobspec-update.la \ job-manager/plugins/jobspec-update-job-list.la \ + job-manager/plugins/resource-update-expiration.la \ job-manager/plugins/update-test.la \ job-manager/plugins/project-bank-validate.la \ stats/stats-basic.la \ @@ -1003,6 +1004,15 @@ job_manager_plugins_jobspec_update_job_list_la_LIBADD = \ $(top_builddir)/src/common/libflux-internal.la \ $(top_builddir)/src/common/libflux-core.la +job_manager_plugins_resource_update_expiration_la_SOURCES = \ + job-manager/plugins/resource-update-expiration.c +job_manager_plugins_resource_update_expiration_la_CPPFLAGS = \ + $(test_cppflags) +job_manager_plugins_resource_update_expiration_la_LDFLAGS = \ + $(fluxplugin_ldflags) -module -rpath /nowhere +job_manager_plugins_resource_update_expiration_la_LIBADD = \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la job_manager_plugins_update_test_la_SOURCES = \ job-manager/plugins/update-test.c diff --git a/t/job-manager/plugins/resource-update-expiration.c b/t/job-manager/plugins/resource-update-expiration.c new file mode 100644 index 000000000000..3631da43cfc6 --- /dev/null +++ b/t/job-manager/plugins/resource-update-expiration.c @@ -0,0 +1,69 @@ +/************************************************************\ + * Copyright 2023 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* jobspec-update-job-list.c - test jobspec-update event in job-list + * module + */ + +#include +#include + +#include "ccan/str/str.h" +#include "src/common/libutil/errprintf.h" + +static int run_cb (flux_plugin_t *p, + const char *topic, + flux_plugin_arg_t *args, + void *data) +{ + double expiration; + flux_jobid_t id; + + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s:I s:{s:{s:F}}}", + "id", &id, + "R", + "execution", + "expiration", &expiration) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "resource-update", 0, + "unpack failure"); + return -1; + } + + if (flux_jobtap_event_post_pack (p, + id, + "resource-update", + "{s:f}", + "expiration", expiration + 3600.) < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "resource-update", 0, + "update failure"); + return -1; + } + return 0; +} + +static const struct flux_plugin_handler tab[] = { + { "job.state.run", run_cb, NULL }, + { 0 }, +}; + +int flux_plugin_init (flux_plugin_t *p) +{ + if (flux_plugin_register (p, "resource-update-expiration", tab) < 0) + return -1; + return 0; +} + +// vi:ts=4 sw=4 expandtab diff --git a/t/python/t0014-job-kvslookup.py b/t/python/t0014-job-kvslookup.py index e80e579fb607..10522ba5e832 100755 --- a/t/python/t0014-job-kvslookup.py +++ b/t/python/t0014-job-kvslookup.py @@ -11,6 +11,7 @@ ############################################################### import json +import os import unittest import flux @@ -35,6 +36,17 @@ def submitJob(self, command, urgency): @classmethod def setUpClass(self): self.fh = flux.Flux() + + # in future use more standard update mechanism instead of + # jobtap plugin. This jobtap plugin will simply increase the + # job expiration by 60 minutes. + pluginpath = ( + os.environ["FLUX_BUILD_DIR"] + + "/t/job-manager/plugins/.libs/resource-update-expiration.so" + ) + payload = {"load": pluginpath} + self.fh.rpc("job-manager.jobtap", payload).get() + self.jobid1 = self.submitJob(["hostname"], 0) flux.job.event_wait(self.fh, self.jobid1, name="priority") update = {"attributes.system.duration": 100.0} @@ -43,6 +55,10 @@ def setUpClass(self): payload = {"id": self.jobid1, "urgency": 16} self.fh.rpc("job-manager.urgency", payload).get() flux.job.event_wait(self.fh, self.jobid1, name="clean") + + payload = {"remove": "all"} + self.fh.rpc("job-manager.jobtap", payload).get() + self.jobid2 = self.submitJob(["hostname"], 16) flux.job.event_wait(self.fh, self.jobid2, name="clean") @@ -116,6 +132,28 @@ def check_jobspec_base_decoded(self, data, jobid): self.assertEqual(data["jobspec"]["tasks"][0]["command"][0], "hostname") self.assertEqual(data["jobspec"]["attributes"]["system"]["duration"], 0) + def check_R_base_str(self, jobid, base, data): + self.assertEqual(base["id"], jobid) + self.assertEqual(data["id"], jobid) + self.assertIn("R", base) + self.assertIn("R", data) + self.assertEqual(type(base["R"]), str) + self.assertEqual(type(data["R"]), str) + R_base = json.loads(base["R"]) + R_data = json.loads(data["R"]) + base_expiration = R_base["execution"]["expiration"] + data_expiration = R_data["execution"]["expiration"] + self.assertGreater(data_expiration, base_expiration) + + def check_R_base_decoded(self, jobid, base, data): + self.assertEqual(base["id"], jobid) + self.assertEqual(data["id"], jobid) + self.assertIn("R", base) + self.assertIn("R", data) + base_expiration = base["R"]["execution"]["expiration"] + data_expiration = data["R"]["execution"]["expiration"] + self.assertGreater(data_expiration, base_expiration) + def test_info_00_job_info_lookup(self): rpc = flux.job.job_info_lookup(self.fh, self.jobid1) data = rpc.get() @@ -217,10 +255,37 @@ def test_lookup_13_job_kvs_lookup_jobspec_base_multiple_keys(self): self.assertIn("eventlog", data) self.check_jobspec_base_decoded(data, self.jobid1) - def test_lookup_14_job_kvs_lookup_base_no_jobspec(self): - data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R", "J"], base=True) + def test_lookup_14_job_kvs_lookup_R_base(self): + base = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"], base=True) + data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"]) + self.assertNotIn("eventlog", base) + self.assertNotIn("eventlog", data) + self.check_R_base_decoded(self.jobid1, base, data) + + def test_lookup_15_job_kvs_lookup_R_base_nodecode(self): + base = flux.job.job_kvs_lookup( + self.fh, self.jobid1, keys=["R"], decode=False, base=True + ) + data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["R"], decode=False) + self.assertNotIn("eventlog", base) + self.assertNotIn("eventlog", data) + self.check_R_base_str(self.jobid1, base, data) + + def test_lookup_16_job_kvs_lookup_R_base_multiple_keys(self): + base = flux.job.job_kvs_lookup( + self.fh, self.jobid1, keys=["R", "eventlog"], decode=False, base=True + ) + data = flux.job.job_kvs_lookup( + self.fh, self.jobid1, keys=["R", "eventlog"], decode=False + ) + self.assertIn("eventlog", base) + self.assertIn("eventlog", data) + self.check_R_base_str(self.jobid1, base, data) + + def test_lookup_17_job_kvs_lookup_base_no_jobspec_R(self): + data = flux.job.job_kvs_lookup(self.fh, self.jobid1, keys=["J"], base=True) self.assertNotIn("jobspec", data) - self.check_R_decoded(data, self.jobid1) + self.assertNotIn("R", data) self.check_J_decoded(data, self.jobid1) def test_list_00_job_kvs_lookup_list(self): @@ -337,12 +402,38 @@ def test_list_14_job_kvs_lookup_list_jobspec_base_multiple_keys(self): self.assertIn("J", data[0]) self.check_jobspec_base_decoded(data[0], self.jobid1) - def test_list_15_job_kvs_lookup_list_base_no_jobspec(self): + def test_list_15_job_kvs_lookup_list_R_base(self): + ids = [self.jobid1] + base = flux.job.JobKVSLookup(self.fh, ids, keys=["R"], base=True).data() + data = flux.job.JobKVSLookup(self.fh, ids, keys=["R"]).data() + self.assertEqual(len(base), 1) + self.assertEqual(len(data), 1) + self.check_R_base_decoded(self.jobid1, base[0], data[0]) + + def test_list_16_job_kvs_lookup_list_R_base_nodecode(self): + ids = [self.jobid1] + base = flux.job.JobKVSLookup( + self.fh, ids, keys=["R"], decode=False, base=True + ).data() + data = flux.job.JobKVSLookup(self.fh, ids, keys=["R"], decode=False).data() + self.assertEqual(len(base), 1) + self.assertEqual(len(data), 1) + self.check_R_base_str(self.jobid1, base[0], data[0]) + + def test_list_17_job_kvs_lookup_list_R_base_multiple_keys(self): ids = [self.jobid1] - data = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"], base=True).data() + base = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"], base=True).data() + data = flux.job.JobKVSLookup(self.fh, ids, keys=["R", "J"]).data() + self.assertEqual(len(data), 1) + self.assertIn("J", data[0]) + self.check_R_base_decoded(self.jobid1, base[0], data[0]) + + def test_list_18_job_kvs_lookup_list_base_no_jobspec_R(self): + ids = [self.jobid1] + data = flux.job.JobKVSLookup(self.fh, ids, keys=["J"], base=True).data() self.assertEqual(len(data), 1) self.assertNotIn("jobspec", data[0]) - self.check_R_decoded(data[0], self.jobid1) + self.assertNotIn("R", data[0]) self.check_J_decoded(data[0], self.jobid1)