diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 4056ea69..ca6e65d6 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -248,3 +248,14 @@ int get_project_info (const char *project, return 0; } + + +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue) +{ + auto it = queues.find (queue); + if (it == queues.end ()) + return -1; + + return it->second.max_running_jobs; +} diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 98341cbe..3c3a2ff8 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -48,6 +48,9 @@ class Association { int max_nodes; // max num nodes across all running jobs std::unordered_map queue_usage; // track num of running jobs per queue + std::unordered_map> + queue_held_jobs; // keep track of held job ID's per queue // methods json_t* to_json () const; // convert object to JSON string @@ -114,4 +117,8 @@ int get_project_info (const char *project, std::vector &permissible_projects, std::vector projects); +// fetch the max number of running jobs a queue can have per-association +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue); + #endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index c94d5bd3..56bc65d3 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -830,12 +830,15 @@ static int depend_cb (flux_plugin_t *p, int userid; long int id; Association *b; + char *queue = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:I}", - "userid", &userid, "id", &id) < 0) { + "{s:i, s:I, s{s{s{s?s}}}}", + "userid", &userid, "id", &id, + "jobspec", "attributes", "system", + "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -856,6 +859,38 @@ static int depend_cb (flux_plugin_t *p, return -1; } + if (queue != NULL) { + // fetch max number of running jobs in this queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + std::string (queue)); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to find " + "max run jobs for queue"); + return -1; + } + + // look up the association's current number of running jobs; + // if queue cannot be found, an entry in the Association object will be + // initialized with a current running jobs count of 0 + int assoc_cur_run_jobs = b->queue_usage[std::string (queue)]; + if (assoc_cur_run_jobs == queue_max_run_jobs) { + // association is already at their max number of running jobs + // in this queue; add a dependency + if (flux_jobtap_dependency_add (p, id, "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "add dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[std::string (queue)].push_back (id); + + return 0; + } + } + // if user has already hit their max running jobs count, add a job // dependency to hold job until an already running job has finished if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) { @@ -1158,8 +1193,41 @@ static int inactive_cb (flux_plugin_t *p, if (queue != NULL) { // a queue was passed-in; decrement counter of the number of // queue-specific running jobs for this association - if (b->queue_usage[std::string (queue)] > 0) - b->queue_usage[std::string (queue)]--; + std::string queue_str (queue); + if (b->queue_usage[queue_str] > 0) { + // decrement the counter of running jobs the association in queue + b->queue_usage[queue_str]--; + + // fetch max number of running jobs in queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + queue_str); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "find max run jobs per-queue"); + return -1; + } + + if ((b->queue_held_jobs[queue_str].size () > 0) && + (b->queue_usage[queue_str] < queue_max_run_jobs)) { + // association has at least one held job in queue; + // remove the dependency from the first held job + long int id = b->queue_held_jobs[queue_str].front (); + if (flux_jobtap_dependency_remove (p, + id, + "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, id, "mf_priority", + 0, "failed to remove job " + " dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[queue_str].erase ( + b->queue_held_jobs[queue_str].begin () + ); + } + } } // if the user/bank combo has any currently held jobs and the user is now diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index d55677cf..2567fe3f 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -56,7 +56,8 @@ void add_user_to_map ( a.projects, a.def_project, a.max_nodes, - a.queue_usage + a.queue_usage, + a.queue_held_jobs }; } @@ -68,9 +69,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -272,7 +273,7 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE";