Skip to content

Commit

Permalink
plugin: track number of running jobs per-queue
Browse files Browse the repository at this point in the history
Problem: The priority plugin has no way to keep track of the number of
jobs an association is running under each queue.

Add a new member to the Association class: queue_usage, a map whose
key-value pairs consist of a string representing the queue name and an
integer representing the number of running jobs the association has
under that queue.

In the callback for job.state.run, increment the number of running jobs
the association has for a given queue if one is specified.

In the callback for job.state.inactive, decrement the number of running
jobs the association has for a given queue if one is specified.

Adjust the unit tests for the Association class to account for the
addition of the "queue_usage" member.
  • Loading branch information
cmoussa1 committed Sep 24, 2024
1 parent f5c09b3 commit 90a3ded
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
24 changes: 22 additions & 2 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,28 @@ json_t* Association::to_json () const
}
}

json_t *queue_usage_json = json_object ();
if (!queue_usage_json) {
json_decref (held_job_ids);
json_decref (user_queues);
json_decref (user_projects);
return nullptr;
}
for (const auto &entry : queue_usage) {
if (json_object_set_new (queue_usage_json,
entry.first.c_str (),
json_integer (entry.second)) < 0) {
json_decref (held_job_ids);
json_decref (user_queues);
json_decref (user_projects);
json_decref (queue_usage_json);
return nullptr;
}
}

// 'o' steals the reference for both held_job_ids and user_queues
json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i,"
" s:o, s:o, s:i, s:o, s:s, s:i, s:i}",
" s:o, s:o, s:i, s:o, s:s, s:i, s:i, s:o}",
"bank_name", bank_name.c_str (),
"fairshare", fairshare,
"max_run_jobs", max_run_jobs,
Expand All @@ -99,7 +118,8 @@ json_t* Association::to_json () const
"projects", user_projects,
"def_project", def_project.c_str (),
"max_nodes", max_nodes,
"active", active);
"active", active,
"queue_usage", queue_usage_json);

if (!u)
return nullptr;
Expand Down
3 changes: 3 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include <iterator>
#include <sstream>
#include <algorithm>
#include <unordered_map>

// all attributes are per-user/bank
class Association {
Expand All @@ -45,6 +46,8 @@ class Association {
std::vector<std::string> projects; // list of accessible projects
std::string def_project; // default project
int max_nodes; // max num nodes across all running jobs
std::unordered_map<std::string, int>
queue_usage; // track num of running jobs per queue

// methods
json_t* to_json () const; // convert object to JSON string
Expand Down
33 changes: 31 additions & 2 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,20 @@ static int run_cb (flux_plugin_t *p,
{
int userid;
Association *b;
char *queue = NULL;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s{s{s{s?s}}}}",
"jobspec", "attributes", "system",
"queue", &queue) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
flux_plugin_arg_strerror (args));
return -1;
}

b = static_cast<Association *>
(flux_jobtap_job_aux_get (p,
Expand All @@ -896,6 +910,11 @@ static int run_cb (flux_plugin_t *p,
return -1;
}

if (queue != NULL)
// a queue was passed-in; increment counter of the number of
// queue-specific running jobs for this association
b->queue_usage[std::string (queue)]++;

// increment the user's current running jobs count
b->cur_run_jobs++;

Expand Down Expand Up @@ -1098,12 +1117,15 @@ static int inactive_cb (flux_plugin_t *p,
{
int userid;
Association *b;
char *queue = NULL;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i}",
"userid", &userid) < 0) {
"{s:i, s{s{s{s?s}}}}",
"userid", &userid,
"jobspec", "attributes", "system",
"queue", &queue) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
Expand Down Expand Up @@ -1133,6 +1155,13 @@ static int inactive_cb (flux_plugin_t *p,
// and look to see if any held jobs can be released
b->cur_run_jobs--;

if (queue != NULL) {
// a queue was passed-in; decrement counter of the number of
// queue-specific running jobs for this association
if (b->queue_usage[std::string (queue)] > 0)
b->queue_usage[std::string (queue)]--;
}

// if the user/bank combo has any currently held jobs and the user is now
// under their max jobs limit, remove the dependency from first held job
if ((b->held_jobs.size () > 0) && (b->cur_run_jobs < b->max_run_jobs)) {
Expand Down
9 changes: 5 additions & 4 deletions src/plugins/test/accounting_test01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ void add_user_to_map (
a.active,
a.projects,
a.def_project,
a.max_nodes
a.max_nodes,
a.queue_usage
};
}

Expand All @@ -67,9 +68,9 @@ void initialize_map (
std::map<int, std::map<std::string, Association>> &users)
{
Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647};
{}, 0, 1, {"*"}, "*", 2147483647, {}};
Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647};
{}, 0, 1, {"*"}, "*", 2147483647, {}};

add_user_to_map (users, 1001, "bank_A", user1);
users_def_bank[1001] = "bank_A";
Expand Down Expand Up @@ -271,7 +272,7 @@ static void test_check_map_dne_true ()
users_def_bank.clear ();

Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647};
{}, 0, 1, {"*"}, "*", 2147483647, {}};
add_user_to_map (users, 9999, "DNE", tmp_user);
users_def_bank[9999] = "DNE";

Expand Down

0 comments on commit 90a3ded

Please sign in to comment.