From 593f3695e8b63ec38efe33a93b3673c017d8fcc7 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 23 Sep 2024 11:18:43 -0700 Subject: [PATCH 1/4] queue_table: add max_running_jobs column Problem: There is no definition for a max running jobs limit for a queue in the flux-accounting database, but there exists a need to limit the number of jobs an association can run under a certain queue. Add a new column to the queue_table in the flux-accounting DB: max_running_jobs, which limits the number of running jobs an association can have under a particular queue. Add max_running_jobs to the set of information sent by the flux account-priority-update command, unpack it in the priority plugin, and store it in an attribute of the Queue class. --- .../python/fluxacct/accounting/create_db.py | 1 + .../fluxacct/accounting/queue_subcommands.py | 12 +++++++++--- src/cmd/flux-account-priority-update.py | 1 + src/cmd/flux-account-service.py | 2 ++ src/cmd/flux-account.py | 15 +++++++++++++++ src/plugins/accounting.hpp | 1 + src/plugins/mf_priority.cpp | 7 +++++-- src/plugins/test/accounting_test01.cpp | 6 +++--- 8 files changed, 37 insertions(+), 8 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/create_db.py b/src/bindings/python/fluxacct/accounting/create_db.py index 15c28b532..1262c39d1 100755 --- a/src/bindings/python/fluxacct/accounting/create_db.py +++ b/src/bindings/python/fluxacct/accounting/create_db.py @@ -181,6 +181,7 @@ def create_db( max_nodes_per_job int(11) DEFAULT 1 NOT NULL ON CONFLICT REPLACE DEFAULT 1, max_time_per_job int(11) DEFAULT 60 NOT NULL ON CONFLICT REPLACE DEFAULT 60, priority int(11) DEFAULT 0 NOT NULL ON CONFLICT REPLACE DEFAULT 0, + max_running_jobs int(11) DEFAULT 100 NOT NULL ON CONFLICT REPLACE DEFAULT 100, PRIMARY KEY (queue) );""" ) diff --git a/src/bindings/python/fluxacct/accounting/queue_subcommands.py b/src/bindings/python/fluxacct/accounting/queue_subcommands.py index f6253d0dc..27c6f0c38 100644 --- a/src/bindings/python/fluxacct/accounting/queue_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/queue_subcommands.py @@ -37,7 +37,9 @@ def view_queue(conn, queue): raise sqlite3.OperationalError(f"an sqlite3.OperationalError occurred: {exc}") -def add_queue(conn, queue, min_nodes=1, max_nodes=1, max_time=60, priority=0): +def add_queue( + conn, queue, min_nodes=1, max_nodes=1, max_time=60, priority=0, max_running_jobs=100 +): try: insert_stmt = """ INSERT INTO queue_table ( @@ -45,8 +47,9 @@ def add_queue(conn, queue, min_nodes=1, max_nodes=1, max_time=60, priority=0): min_nodes_per_job, max_nodes_per_job, max_time_per_job, - priority - ) VALUES (?, ?, ?, ?, ?) + priority, + max_running_jobs + ) VALUES (?, ?, ?, ?, ?, ?) """ conn.execute( insert_stmt, @@ -56,6 +59,7 @@ def add_queue(conn, queue, min_nodes=1, max_nodes=1, max_time=60, priority=0): max_nodes, max_time, priority, + max_running_jobs, ), ) @@ -84,6 +88,7 @@ def edit_queue( max_nodes_per_job=None, max_time_per_job=None, priority=None, + max_running_jobs=None, ): params = locals() editable_fields = [ @@ -91,6 +96,7 @@ def edit_queue( "max_nodes_per_job", "max_time_per_job", "priority", + "max_running_jobs", ] for field in editable_fields: diff --git a/src/cmd/flux-account-priority-update.py b/src/cmd/flux-account-priority-update.py index f20ed9513..e7db04394 100755 --- a/src/cmd/flux-account-priority-update.py +++ b/src/cmd/flux-account-priority-update.py @@ -110,6 +110,7 @@ def bulk_update(path): "max_nodes_per_job": int(row[2]), "max_time_per_job": int(row[3]), "priority": int(row[4]), + "max_running_jobs": int(row[5]), } bulk_q_data.append(single_q_data) diff --git a/src/cmd/flux-account-service.py b/src/cmd/flux-account-service.py index e448354ee..363cc3342 100755 --- a/src/cmd/flux-account-service.py +++ b/src/cmd/flux-account-service.py @@ -398,6 +398,7 @@ def add_queue(self, handle, watcher, msg, arg): msg.payload["max_nodes_per_job"], msg.payload["max_time_per_job"], msg.payload["priority"], + msg.payload["max_running_jobs"], ) payload = {"add_queue": val} @@ -453,6 +454,7 @@ def edit_queue(self, handle, watcher, msg, arg): msg.payload["max_nodes_per_job"], msg.payload["max_time_per_job"], msg.payload["priority"], + msg.payload["max_running_jobs"], ) payload = {"edit_queue": val} diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index 63a5ef9ac..a196df6fc 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -417,6 +417,12 @@ def add_add_queue_arg(subparsers): default=0, metavar="PRIORITY", ) + subparser_add_queue.add_argument( + "--max-running-jobs", + help="max number of running jobs an association can have in the queue", + default=100, + metavar="MAX_RUNNING_JOBS", + ) def add_view_queue_arg(subparsers): @@ -467,6 +473,13 @@ def add_edit_queue_arg(subparsers): default=None, metavar="PRIORITY", ) + subparser_edit_queue.add_argument( + "--max-running-jobs", + type=int, + help="max number of running jobs an association can have in the queue", + default=None, + metavar="MAX_RUNNING_JOBS", + ) def add_delete_queue_arg(subparsers): @@ -763,6 +776,7 @@ def select_accounting_function(args, output_file, parser): "max_nodes_per_job": args.max_nodes_per_job, "max_time_per_job": args.max_time_per_job, "priority": args.priority, + "max_running_jobs": args.max_running_jobs, } return_val = flux.Flux().rpc("accounting.add_queue", data).get() elif args.func == "view_queue": @@ -785,6 +799,7 @@ def select_accounting_function(args, output_file, parser): "max_nodes_per_job": args.max_nodes_per_job, "max_time_per_job": args.max_time_per_job, "priority": args.priority, + "max_running_jobs": args.max_running_jobs, } return_val = flux.Flux().rpc("accounting.edit_queue", data).get() elif args.func == "add_project": diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 22df99364..9e730421d 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -74,6 +74,7 @@ class Queue { int max_nodes_per_job; int max_time_per_job; int priority; + int max_running_jobs; }; // get an Association object that points to user/bank in the users map; diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index e9d65ca41..a2e1cb890 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -354,6 +354,7 @@ static void rec_q_cb (flux_t *h, { char *queue = NULL; int min_nodes_per_job, max_nodes_per_job, max_time_per_job, priority = 0; + int max_running_jobs = 0; json_t *data, *jtemp = NULL; json_error_t error; int num_data = 0; @@ -376,12 +377,13 @@ static void rec_q_cb (flux_t *h, json_t *el = json_array_get(data, i); if (json_unpack_ex (el, &error, 0, - "{s:s, s:i, s:i, s:i, s:i}", + "{s:s, s:i, s:i, s:i, s:i, s:i}", "queue", &queue, "min_nodes_per_job", &min_nodes_per_job, "max_nodes_per_job", &max_nodes_per_job, "max_time_per_job", &max_time_per_job, - "priority", &priority) < 0) + "priority", &priority, + "max_running_jobs", &max_running_jobs) < 0) flux_log (h, LOG_ERR, "mf_priority unpack: %s", error.text); Queue *q; @@ -391,6 +393,7 @@ static void rec_q_cb (flux_t *h, q->max_nodes_per_job = max_nodes_per_job; q->max_time_per_job = max_time_per_job; q->priority = priority; + q->max_running_jobs = max_running_jobs; } if (flux_respond (h, msg, NULL) < 0) diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index 6901d780a..8be490d17 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -83,9 +83,9 @@ void initialize_map ( * helper function to add test queues to the queues map */ void initialize_queues () { - queues["bronze"] = {0, 5, 60, 100}; - queues["silver"] = {0, 5, 60, 200}; - queues["gold"] = {0, 5, 60, 300}; + queues["bronze"] = {0, 5, 60, 100, 100}; + queues["silver"] = {0, 5, 60, 200, 100}; + queues["gold"] = {0, 5, 60, 300, 100}; } From b7fad848fc13a3108441ac8425e530def108d548 Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 23 Sep 2024 11:22:43 -0700 Subject: [PATCH 2/4] plugin: track number of running jobs per-queue Problem: The priority plugin has no way to keep track of the number of jobs an association is running under each queue. Add a new member to the Association class: queue_usage, a map whose key-value pairs consist of a string representing the queue name and an integer representing the number of running jobs the association has under that queue. In the callback for job.state.run, increment the number of running jobs the association has for a given queue if one is specified. In the callback for job.state.inactive, decrement the number of running jobs the association has for a given queue if one is specified. Adjust the unit tests for the Association class to account for the addition of the "queue_usage" member. --- src/plugins/accounting.cpp | 24 +++++++++++++++++-- src/plugins/accounting.hpp | 3 +++ src/plugins/mf_priority.cpp | 33 ++++++++++++++++++++++++-- src/plugins/test/accounting_test01.cpp | 9 +++---- 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 7a02638c8..4056ea695 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -84,9 +84,28 @@ json_t* Association::to_json () const } } + json_t *queue_usage_json = json_object (); + if (!queue_usage_json) { + json_decref (held_job_ids); + json_decref (user_queues); + json_decref (user_projects); + return nullptr; + } + for (const auto &entry : queue_usage) { + if (json_object_set_new (queue_usage_json, + entry.first.c_str (), + json_integer (entry.second)) < 0) { + json_decref (held_job_ids); + json_decref (user_queues); + json_decref (user_projects); + json_decref (queue_usage_json); + return nullptr; + } + } + // 'o' steals the reference for both held_job_ids and user_queues json_t *u = json_pack ("{s:s, s:f, s:i, s:i, s:i, s:i," - " s:o, s:o, s:i, s:o, s:s, s:i, s:i}", + " s:o, s:o, s:i, s:o, s:s, s:i, s:i, s:o}", "bank_name", bank_name.c_str (), "fairshare", fairshare, "max_run_jobs", max_run_jobs, @@ -99,7 +118,8 @@ json_t* Association::to_json () const "projects", user_projects, "def_project", def_project.c_str (), "max_nodes", max_nodes, - "active", active); + "active", active, + "queue_usage", queue_usage_json); if (!u) return nullptr; diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 9e730421d..98341cbea 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -27,6 +27,7 @@ extern "C" { #include #include #include +#include // all attributes are per-user/bank class Association { @@ -45,6 +46,8 @@ class Association { std::vector projects; // list of accessible projects std::string def_project; // default project int max_nodes; // max num nodes across all running jobs + std::unordered_map + queue_usage; // track num of running jobs per queue // methods json_t* to_json () const; // convert object to JSON string diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index a2e1cb890..c3b9fbe66 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -882,6 +882,20 @@ static int run_cb (flux_plugin_t *p, { int userid; Association *b; + char *queue = NULL; + + flux_t *h = flux_jobtap_get_flux (p); + if (flux_plugin_arg_unpack (args, + FLUX_PLUGIN_ARG_IN, + "{s{s{s{s?s}}}}", + "jobspec", "attributes", "system", + "queue", &queue) < 0) { + flux_log (h, + LOG_ERR, + "flux_plugin_arg_unpack: %s", + flux_plugin_arg_strerror (args)); + return -1; + } b = static_cast (flux_jobtap_job_aux_get (p, @@ -896,6 +910,11 @@ static int run_cb (flux_plugin_t *p, return -1; } + if (queue != NULL) + // a queue was passed-in; increment counter of the number of + // queue-specific running jobs for this association + b->queue_usage[std::string (queue)]++; + // increment the user's current running jobs count b->cur_run_jobs++; @@ -1098,12 +1117,15 @@ static int inactive_cb (flux_plugin_t *p, { int userid; Association *b; + char *queue = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i}", - "userid", &userid) < 0) { + "{s:i, s{s{s{s?s}}}}", + "userid", &userid, + "jobspec", "attributes", "system", + "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -1133,6 +1155,13 @@ static int inactive_cb (flux_plugin_t *p, // and look to see if any held jobs can be released b->cur_run_jobs--; + if (queue != NULL) { + // a queue was passed-in; decrement counter of the number of + // queue-specific running jobs for this association + if (b->queue_usage[std::string (queue)] > 0) + b->queue_usage[std::string (queue)]--; + } + // if the user/bank combo has any currently held jobs and the user is now // under their max jobs limit, remove the dependency from first held job if ((b->held_jobs.size () > 0) && (b->cur_run_jobs < b->max_run_jobs)) { diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index 8be490d17..d55677cfc 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -55,7 +55,8 @@ void add_user_to_map ( a.active, a.projects, a.def_project, - a.max_nodes + a.max_nodes, + a.queue_usage }; } @@ -67,9 +68,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -271,7 +272,7 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647}; + {}, 0, 1, {"*"}, "*", 2147483647, {}}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE"; From 62919bef035a66c8de8be37379ce436c87a4cd7f Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 24 Sep 2024 13:03:43 -0700 Subject: [PATCH 3/4] plugin: enforce running jobs limit per-queue Problem: The priority plugin has no way of enforcing a max running jobs limit for an association on a per-queue basis. Add a new member to the Association class: queue_held_jobs, a hash map of key-value pairs where the key is the name of the queue the held job is supposed to run under and the value is a vector of job IDs. Add a helper function, max_run_jobs_per_queue (), to fetch the max number of running jobs for an association in a queue. In the callback for job.state.depend, add a check for the number of currently running jobs an association has in a queue compared to the limit of running jobs a queue can have. If they are equal, add a dependency to the currently submitted job to hold it until a currently running job finishes. Push back the held job ID onto the vector of held jobs and store it in the Association object. In the callback for job.state.inactive, add a check for the release of any held jobs in a queue for an association *after* a currently running job in that queue completes. If the association is now under that limit, grab the first job ID in the held jobs queue and remove the dependency from it. Remove that job ID from the vector that holds IDs of held jobs for that queue. --- src/plugins/accounting.cpp | 11 ++++ src/plugins/accounting.hpp | 7 +++ src/plugins/mf_priority.cpp | 76 ++++++++++++++++++++++++-- src/plugins/test/accounting_test01.cpp | 9 +-- 4 files changed, 95 insertions(+), 8 deletions(-) diff --git a/src/plugins/accounting.cpp b/src/plugins/accounting.cpp index 4056ea695..ca6e65d65 100644 --- a/src/plugins/accounting.cpp +++ b/src/plugins/accounting.cpp @@ -248,3 +248,14 @@ int get_project_info (const char *project, return 0; } + + +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue) +{ + auto it = queues.find (queue); + if (it == queues.end ()) + return -1; + + return it->second.max_running_jobs; +} diff --git a/src/plugins/accounting.hpp b/src/plugins/accounting.hpp index 98341cbea..3c3a2ff88 100644 --- a/src/plugins/accounting.hpp +++ b/src/plugins/accounting.hpp @@ -48,6 +48,9 @@ class Association { int max_nodes; // max num nodes across all running jobs std::unordered_map queue_usage; // track num of running jobs per queue + std::unordered_map> + queue_held_jobs; // keep track of held job ID's per queue // methods json_t* to_json () const; // convert object to JSON string @@ -114,4 +117,8 @@ int get_project_info (const char *project, std::vector &permissible_projects, std::vector projects); +// fetch the max number of running jobs a queue can have per-association +int max_run_jobs_per_queue (const std::map &queues, + const std::string &queue); + #endif // ACCOUNTING_H diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index c3b9fbe66..5d86abf75 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -830,12 +830,15 @@ static int depend_cb (flux_plugin_t *p, int userid; long int id; Association *b; + char *queue = NULL; flux_t *h = flux_jobtap_get_flux (p); if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s:i, s:I}", - "userid", &userid, "id", &id) < 0) { + "{s:i, s:I, s{s{s{s?s}}}}", + "userid", &userid, "id", &id, + "jobspec", "attributes", "system", + "queue", &queue) < 0) { flux_log (h, LOG_ERR, "flux_plugin_arg_unpack: %s", @@ -856,6 +859,38 @@ static int depend_cb (flux_plugin_t *p, return -1; } + if (queue != NULL) { + // fetch max number of running jobs in this queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + std::string (queue)); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to find " + "max run jobs for queue"); + return -1; + } + + // look up the association's current number of running jobs; + // if queue cannot be found, an entry in the Association object will be + // initialized with a current running jobs count of 0 + int assoc_cur_run_jobs = b->queue_usage[std::string (queue)]; + if (assoc_cur_run_jobs == queue_max_run_jobs) { + // association is already at their max number of running jobs + // in this queue; add a dependency + if (flux_jobtap_dependency_add (p, id, "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "add dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[std::string (queue)].push_back (id); + + return 0; + } + } + // if user has already hit their max running jobs count, add a job // dependency to hold job until an already running job has finished if ((b->max_run_jobs > 0) && (b->cur_run_jobs == b->max_run_jobs)) { @@ -1158,8 +1193,41 @@ static int inactive_cb (flux_plugin_t *p, if (queue != NULL) { // a queue was passed-in; decrement counter of the number of // queue-specific running jobs for this association - if (b->queue_usage[std::string (queue)] > 0) - b->queue_usage[std::string (queue)]--; + std::string queue_str (queue); + if (b->queue_usage[queue_str] > 0) { + // decrement the counter of running jobs the association in queue + b->queue_usage[queue_str]--; + + // fetch max number of running jobs in queue + int queue_max_run_jobs = max_run_jobs_per_queue (queues, + queue_str); + if (queue_max_run_jobs < 0) { + flux_jobtap_raise_exception (p, + FLUX_JOBTAP_CURRENT_JOB, + "mf_priority", 0, "failed to " + "find max run jobs per-queue"); + return -1; + } + + if ((b->queue_held_jobs[queue_str].size () > 0) && + (b->queue_usage[queue_str] < queue_max_run_jobs)) { + // association has at least one held job in queue; + // remove the dependency from the first held job + long int id = b->queue_held_jobs[queue_str].front (); + if (flux_jobtap_dependency_remove (p, + id, + "max-run-jobs-queue") < 0) { + flux_jobtap_raise_exception (p, id, "mf_priority", + 0, "failed to remove job " + " dependency for max run jobs " + "per-queue limit"); + return -1; + } + b->queue_held_jobs[queue_str].erase ( + b->queue_held_jobs[queue_str].begin () + ); + } + } } // if the user/bank combo has any currently held jobs and the user is now diff --git a/src/plugins/test/accounting_test01.cpp b/src/plugins/test/accounting_test01.cpp index d55677cfc..2567fe3f7 100644 --- a/src/plugins/test/accounting_test01.cpp +++ b/src/plugins/test/accounting_test01.cpp @@ -56,7 +56,8 @@ void add_user_to_map ( a.projects, a.def_project, a.max_nodes, - a.queue_usage + a.queue_usage, + a.queue_held_jobs }; } @@ -68,9 +69,9 @@ void initialize_map ( std::map> &users) { Association user1 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; Association user2 = {"bank_A", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 1001, "bank_A", user1); users_def_bank[1001] = "bank_A"; @@ -272,7 +273,7 @@ static void test_check_map_dne_true () users_def_bank.clear (); Association tmp_user = {"DNE", 0.5, 5, 0, 7, 0, {}, - {}, 0, 1, {"*"}, "*", 2147483647, {}}; + {}, 0, 1, {"*"}, "*", 2147483647, {}, {}}; add_user_to_map (users, 9999, "DNE", tmp_user); users_def_bank[9999] = "DNE"; From eb6d521fad7dbd4b3fa16c8794ada5a14a60413a Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Mon, 23 Sep 2024 11:24:16 -0700 Subject: [PATCH 4/4] t: add tests for enforcing queue limits Problem: There are no tests that check the enforcement of max running jobs limit in a queue for an association. Add some tests. --- t/Makefile.am | 1 + t/t1042-mf-priority-queue-limits.t | 207 +++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+) create mode 100755 t/t1042-mf-priority-queue-limits.t diff --git a/t/Makefile.am b/t/Makefile.am index 392e0f7ce..c5960c853 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -41,6 +41,7 @@ TESTSCRIPTS = \ t1039-issue476.t \ t1040-mf-priority-projects.t \ t1041-view-jobs-by-project.t \ + t1042-mf-priority-queue-limits.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1042-mf-priority-queue-limits.t b/t/t1042-mf-priority-queue-limits.t new file mode 100755 index 000000000..4e7ae6a07 --- /dev/null +++ b/t/t1042-mf-priority-queue-limits.t @@ -0,0 +1,207 @@ +#!/bin/bash + +test_description='test multi-factor priority plugin queue limits' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 16 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'allow guest access to testexec' ' + flux config load <<-EOF + [exec.testexec] + allow-guests = true + EOF +' + +test_expect_success 'create flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'load multi-factor priority plugin' ' + flux jobtap load -r .priority-default ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'check that mf_priority plugin is loaded' ' + flux jobtap list | grep mf_priority +' + +test_expect_success 'add some banks' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root bankA 1 +' + +test_expect_success 'add queues with different running jobs limits' ' + flux account add-queue bronze --priority=200 --max-running-jobs=3 && + flux account add-queue silver --priority=300 --max-running-jobs=2 && + flux account add-queue gold --priority=400 --max-running-jobs=1 +' + +test_expect_success 'add a user' ' + flux account add-user \ + --username=user1 \ + --userid=5001 \ + --bank=bankA \ + --queues="bronze,silver,gold" \ + --max-running-jobs=100 \ + --max-active-jobs=100 +' + +test_expect_success 'send the user and queue information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'configure flux with those queues' ' + cat >conf.d/queues.toml <<-EOT && + [queues.bronze] + [queues.silver] + [queues.gold] + EOT + flux config reload && + flux queue start --all +' + +# In this set of tests, an association belongs to all three available queues, +# and each queue has a different limit on the number of running jobs available +# per-association. The association will submit the max number of running jobs +# to the silver queue (2 jobs). A dependency specific to the number of running +# jobs per-queue is added to the third submitted job in the silver queue, but +# jobs submitted to other queues will still receive an alloc event. +# +# Once one of the currently running jobs in the silver queue completes and is +# cleaned up, the job with a dependency added to it will have its dependency +# removed and will receive its alloc event. +test_expect_success 'submit max number of jobs to silver queue' ' + job1=$(flux python ${SUBMIT_AS} 5001 --queue=silver sleep 60) && + job2=$(flux python ${SUBMIT_AS} 5001 --queue=silver sleep 60) && + flux job wait-event -vt 5 ${job1} alloc && + flux job wait-event -vt 5 ${job2} alloc +' + +test_expect_success 'running jobs count for the queues are incremented once jobs start' ' + flux jobtap query mf_priority.so > silver.json && + jq -e ".mf_priority_map[] | \ + select(.userid == 5001) | \ + .banks[0].queue_usage.silver == 2" user1.json && + jq -e ".mf_priority_map[] | \ + select(.userid == 5001) | \ + .banks[0].cur_run_jobs == 3" query.json && + jq -e ".mf_priority_map[] | \ + select(.userid == 5001) | \ + .banks[0].queue_usage.silver == 0" user1.json && + jq -e ".mf_priority_map[] | \ + select(.userid == 5001) | \ + .banks[0].held_jobs | length == 1"