diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 7a02638c..4056ea69 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -84,9 +84,28 @@ json_t* Association::to_json () const } } + json_t *queue_usage_json = json_object (); + if (!queue_usage_json) { + json_decref (held_job_ids); + json_decref (user_queues); + json_decref (user_projects); + return nullptr; + } + for (const auto &entry : queue_usage) { + if (json_object_set_new (queue_usage_json, + entry.first.c_str (), + json_integer (entry.second)) < 0) { + json_decref (held_job_ids); + json_decref (user_queues); + json_decref (user_projects); + json_decref (queue_usage_json); + return nullptr; + } + } + // 'o' steals the reference for both held_job_ids and user_queues json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i," - " s:o, s:o, s:i, s:o, s:s, s:i, s:i}", + " s:o, s:o, s:i, s:o, s:s, s:i, s:i, s:o}", "bank_name", bank_name.c_str (), "fairshare", fairshare, "max_run_jobs", max_run_jobs, @@ -99,7 +118,8 @@ json_t* Association::to_json () const "projects", user_projects, "def_project", def_project.c_str (), "max_nodes", max_nodes, - "active", active); + "active", active, + "queue_usage", queue_usage_json); if (!u) return nullptr; diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 9e730421..98341cbe 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -27,6 +27,7 @@ extern "C" { #include #include #include +#include // all attributes are per-user/bank class Association { @@ -45,6 +46,8 @@ class Association { std::vector projects; // list of accessible projects std::string def_project; // default project int max_nodes; // max num nodes across all running jobs + std::unordered_map + queue_usage; // track num of running jobs per queue // methods json_t* to_json () const; // convert object to JSON string diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index 71c0b7bf..c94d5bd3 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -882,6 +882,20 @@ static int run_cb (flux_plugin_t *p, { int userid; Association *b; + char *queue = NULL; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s{s{s{s?s}}}}", + "jobspec", "attributes", "system", + "queue", &queue) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } b = static_cast (flux_jobtap_job_aux_get (p, @@ -896,6 +910,11 @@ static int run_cb (flux_plugin_t *p, return -1; } + if (queue != NULL) + // a queue was passed-in; increment counter of the number of + // queue-specific running jobs for this association + b->queue_usage[std::string (queue)]++; + // increment the user's current running jobs count b->cur_run_jobs++; @@ -1098,12 +1117,15 @@ static int inactive_cb (flux_plugin_t *p, { int userid; Association *b; + char *queue = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", - "userid", &userid) < 0) { + "{s:i, s{s{s{s?s}}}}", + "userid", &userid, + "jobspec", "attributes", "system", + "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -1133,6 +1155,13 @@ static int inactive_cb (flux_plugin_t *p, // and look to see if any held jobs can be released b->cur_run_jobs--; + if (queue != NULL) { + // a queue was passed-in; decrement counter of the number of + // queue-specific running jobs for this association + if (b->queue_usage[std::string (queue)] > 0) + b->queue_usage[std::string (queue)]--; + } + // if the user/bank combo has any currently held jobs and the user is now // under their max jobs limit, remove the dependency from first held job if ((b->held_jobs.size () > 0) && (b->cur_run_jobs < b->max_run_jobs)) { diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index 8be490d1..d55677cf 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -55,7 +55,8 @@ void add_user_to_map ( a.active, a.projects, a.def_project, - a.max_nodes + a.max_nodes, + a.queue_usage }; } @@ -67,9 +68,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -271,7 +272,7 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE";