From f7acb1bf8d4ab36e5fd2537da6fe1bb994370acb Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 24 Sep 2024 13:03:43 -0700 Subject: [PATCH] plugin: enforce running jobs limit per-queue Problem: The priority plugin has no way of enforcing a max running jobs limit for an association on a per-queue basis. Add a new member to the Association class: queue_held_jobs, a hash map of key-value pairs where the key is the name of the queue the held job is supposed to run under and the value is a vector of job IDs. Add a helper function, max_run_jobs_per_queue (), to fetch the max number of running jobs for an association in a queue. In the callback for job.state.depend, add a check for the number of currently running jobs an association has in a queue compared to the limit of running jobs a queue can have. If they are equal, add a dependency to the currently submitted job to hold it until a currently running job finishes. Push back the held job ID onto the vector of held jobs and store it in the Association object. In the callback for job.state.inactive, add a check for the release of any held jobs in a queue for an association *after* a currently running job in that queue completes. If the association is now under that limit, grab the first job ID in the held jobs queue and remove the dependency from it. Remove that job ID from the vector that holds IDs of held jobs for that queue. --- src/plugins/accounting.cpp | 11 ++++ src/plugins/accounting.hpp | 7 +++ src/plugins/mf_priority.cpp | 76 ++++++++++++++++++++++++-- src/plugins/test/accounting_test01.cpp | 9 +-- 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 4056ea69..ca6e65d6 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -248,3 +248,14 @@ int get_project_info (const char *project, return 0; } + + +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue) +{ + auto it = queues.find (queue); + if (it == queues.end ()) + return -1; + + return it->second.max_running_jobs; +} diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 98341cbe..3c3a2ff8 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -48,6 +48,9 @@ class Association { int max_nodes; // max num nodes across all running jobs std::unordered_map queue_usage; // track num of running jobs per queue + std::unordered_map> + queue_held_jobs; // keep track of held job ID's per queue // methods json_t* to_json () const; // convert object to JSON string @@ -114,4 +117,8 @@ int get_project_info (const char *project, std::vector &permissible_projects, std::vector projects); +// fetch the max number of running jobs a queue can have per-association +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue); + #endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index c94d5bd3..56bc65d3 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -830,12 +830,15 @@ static int depend_cb (flux_plugin_t *p, int userid; long int id; Association *b; + char *queue = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:I}", - "userid", &userid, "id", &id) < 0) { + "{s:i, s:I, s{s{s{s?s}}}}", + "userid", &userid, "id", &id, + "jobspec", "attributes", "system", + "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -856,6 +859,38 @@ static int depend_cb (flux_plugin_t *p, return -1; } + if (queue != NULL) { + // fetch max number of running jobs in this queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + std::string (queue)); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to find " + "max run jobs for queue"); + return -1; + } + + // look up the association's current number of running jobs; + // if queue cannot be found, an entry in the Association object will be + // initialized with a current running jobs count of 0 + int assoc_cur_run_jobs = b->queue_usage[std::string (queue)]; + if (assoc_cur_run_jobs == queue_max_run_jobs) { + // association is already at their max number of running jobs + // in this queue; add a dependency + if (flux_jobtap_dependency_add (p, id, "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "add dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[std::string (queue)].push_back (id); + + return 0; + } + } + // if user has already hit their max running jobs count, add a job // dependency to hold job until an already running job has finished if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) { @@ -1158,8 +1193,41 @@ static int inactive_cb (flux_plugin_t *p, if (queue != NULL) { // a queue was passed-in; decrement counter of the number of // queue-specific running jobs for this association - if (b->queue_usage[std::string (queue)] > 0) - b->queue_usage[std::string (queue)]--; + std::string queue_str (queue); + if (b->queue_usage[queue_str] > 0) { + // decrement the counter of running jobs the association in queue + b->queue_usage[queue_str]--; + + // fetch max number of running jobs in queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + queue_str); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "find max run jobs per-queue"); + return -1; + } + + if ((b->queue_held_jobs[queue_str].size () > 0) && + (b->queue_usage[queue_str] < queue_max_run_jobs)) { + // association has at least one held job in queue; + // remove the dependency from the first held job + long int id = b->queue_held_jobs[queue_str].front (); + if (flux_jobtap_dependency_remove (p, + id, + "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, id, "mf_priority", + 0, "failed to remove job " + " dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[queue_str].erase ( + b->queue_held_jobs[queue_str].begin () + ); + } + } } // if the user/bank combo has any currently held jobs and the user is now diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index d55677cf..2567fe3f 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -56,7 +56,8 @@ void add_user_to_map ( a.projects, a.def_project, a.max_nodes, - a.queue_usage + a.queue_usage, + a.queue_held_jobs }; } @@ -68,9 +69,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -272,7 +273,7 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE";