Skip to content

Commit

Permalink
plugin: enforce running jobs limit per-queue
Browse files Browse the repository at this point in the history
Problem: The priority plugin has no way of enforcing a max running jobs
limit for an association on a per-queue basis.

Add a new member to the Association class: queue_held_jobs, a hash map
of key-value pairs where the key is the name of the queue the held job
is supposed to run under and the value is a vector of job IDs.

Add a helper function, max_run_jobs_per_queue (), to fetch the max
number of running jobs for an association in a queue.

In the callback for job.state.depend, add a check for the number of
currently running jobs an association has in a queue compared to the
limit of running jobs a queue can have. If they are equal, add a
dependency to the currently submitted job to hold it until a currently
running job finishes. Push back the held job ID onto the vector of held
jobs and store it in the Association object.

In the callback for job.state.inactive, add a check for the release of
any held jobs in a queue for an association *after* a currently running
job in that queue completes. If the association is now under that limit,
grab the first job ID in the held jobs queue and remove the dependency
from it. Remove that job ID from the vector that holds IDs of held jobs
for that queue.
  • Loading branch information
cmoussa1 committed Oct 3, 2024
1 parent b7fad84 commit 62919be
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/plugins/accounting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,14 @@ int get_project_info (const char *project,

return 0;
}


int max_run_jobs_per_queue (const std::map<std::string, Queue> &queues,
const std::string &queue)
{
auto it = queues.find (queue);
if (it == queues.end ())
return -1;

return it->second.max_running_jobs;
}
7 changes: 7 additions & 0 deletions src/plugins/accounting.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Association {
int max_nodes; // max num nodes across all running jobs
std::unordered_map<std::string, int>
queue_usage; // track num of running jobs per queue
std::unordered_map<std::string,
std::vector<long int>>
queue_held_jobs; // keep track of held job ID's per queue

// methods
json_t* to_json () const; // convert object to JSON string
Expand Down Expand Up @@ -114,4 +117,8 @@ int get_project_info (const char *project,
std::vector<std::string> &permissible_projects,
std::vector<std::string> projects);

// fetch the max number of running jobs a queue can have per-association
int max_run_jobs_per_queue (const std::map<std::string, Queue> &queues,
const std::string &queue);

#endif // ACCOUNTING_H
76 changes: 72 additions & 4 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,12 +830,15 @@ static int depend_cb (flux_plugin_t *p,
int userid;
long int id;
Association *b;
char *queue = NULL;

flux_t *h = flux_jobtap_get_flux (p);
if (flux_plugin_arg_unpack (args,
FLUX_PLUGIN_ARG_IN,
"{s:i, s:I}",
"userid", &userid, "id", &id) < 0) {
"{s:i, s:I, s{s{s{s?s}}}}",
"userid", &userid, "id", &id,
"jobspec", "attributes", "system",
"queue", &queue) < 0) {
flux_log (h,
LOG_ERR,
"flux_plugin_arg_unpack: %s",
Expand All @@ -856,6 +859,38 @@ static int depend_cb (flux_plugin_t *p,
return -1;
}

if (queue != NULL) {
// fetch max number of running jobs in this queue
int queue_max_run_jobs = max_run_jobs_per_queue (queues,
std::string (queue));
if (queue_max_run_jobs < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0, "failed to find "
"max run jobs for queue");
return -1;
}

// look up the association's current number of running jobs;
// if queue cannot be found, an entry in the Association object will be
// initialized with a current running jobs count of 0
int assoc_cur_run_jobs = b->queue_usage[std::string (queue)];
if (assoc_cur_run_jobs == queue_max_run_jobs) {
// association is already at their max number of running jobs
// in this queue; add a dependency
if (flux_jobtap_dependency_add (p, id, "max-run-jobs-queue") < 0) {
flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0, "failed to "
"add dependency for max run jobs "
"per-queue limit");
return -1;
}
b->queue_held_jobs[std::string (queue)].push_back (id);

return 0;
}
}

// if user has already hit their max running jobs count, add a job
// dependency to hold job until an already running job has finished
if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) {
Expand Down Expand Up @@ -1158,8 +1193,41 @@ static int inactive_cb (flux_plugin_t *p,
if (queue != NULL) {
// a queue was passed-in; decrement counter of the number of
// queue-specific running jobs for this association
if (b->queue_usage[std::string (queue)] > 0)
b->queue_usage[std::string (queue)]--;
std::string queue_str (queue);
if (b->queue_usage[queue_str] > 0) {
// decrement the counter of running jobs the association in queue
b->queue_usage[queue_str]--;

// fetch max number of running jobs in queue
int queue_max_run_jobs = max_run_jobs_per_queue (queues,
queue_str);
if (queue_max_run_jobs < 0) {
flux_jobtap_raise_exception (p,
FLUX_JOBTAP_CURRENT_JOB,
"mf_priority", 0, "failed to "
"find max run jobs per-queue");
return -1;
}

if ((b->queue_held_jobs[queue_str].size () > 0) &&
(b->queue_usage[queue_str] < queue_max_run_jobs)) {
// association has at least one held job in queue;
// remove the dependency from the first held job
long int id = b->queue_held_jobs[queue_str].front ();
if (flux_jobtap_dependency_remove (p,
id,
"max-run-jobs-queue") < 0) {
flux_jobtap_raise_exception (p, id, "mf_priority",
0, "failed to remove job "
" dependency for max run jobs "
"per-queue limit");
return -1;
}
b->queue_held_jobs[queue_str].erase (
b->queue_held_jobs[queue_str].begin ()
);
}
}
}

// if the user/bank combo has any currently held jobs and the user is now
Expand Down
9 changes: 5 additions & 4 deletions src/plugins/test/accounting_test01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ void add_user_to_map (
a.projects,
a.def_project,
a.max_nodes,
a.queue_usage
a.queue_usage,
a.queue_held_jobs
};
}

Expand All @@ -68,9 +69,9 @@ void initialize_map (
std::map<int, std::map<std::string, Association>> &users)
{
Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647, {}};
{}, 0, 1, {"*"}, "*", 2147483647, {}, {}};
Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647, {}};
{}, 0, 1, {"*"}, "*", 2147483647, {}, {}};

add_user_to_map (users, 1001, "bank_A", user1);
users_def_bank[1001] = "bank_A";
Expand Down Expand Up @@ -272,7 +273,7 @@ static void test_check_map_dne_true ()
users_def_bank.clear ();

Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {},
{}, 0, 1, {"*"}, "*", 2147483647, {}};
{}, 0, 1, {"*"}, "*", 2147483647, {}, {}};
add_user_to_map (users, 9999, "DNE", tmp_user);
users_def_bank[9999] = "DNE";

Expand Down

0 comments on commit 62919be

Please sign in to comment.