diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index a9ca71d52f..44be61e1d2 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -114,6 +114,9 @@ See the file COPYING for details. /* Default value for how frequently to check for tasks that do not fit any worker. */ #define VINE_LARGE_TASK_CHECK_INTERVAL 180000000 // 3 minutes in usecs +/* Default value for how frequently to allow calls to vine_hungry_computation. */ +#define VINE_HUNGRY_CHECK_INTERVAL 5000000 // 5 seconds in usecs + /* Default timeout for slow workers to come back to the pool, can be set prior to creating a manager. */ double vine_option_blocklist_slow_workers_timeout = 900; @@ -3947,6 +3950,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->max_task_stdout_storage = MAX_TASK_STDOUT_STORAGE; q->max_new_workers = MAX_NEW_WORKERS; q->large_task_check_interval = VINE_LARGE_TASK_CHECK_INTERVAL; + q->hungry_check_interval = VINE_HUNGRY_CHECK_INTERVAL; q->option_blocklist_slow_workers_timeout = vine_option_blocklist_slow_workers_timeout; q->manager_preferred_connection = xxstrdup("by_ip"); @@ -5201,16 +5205,11 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag, // check if workers' resources are available to execute more tasks queue should // have at least MAX(hungry_minimum, hungry_minimum_factor * number of workers) ready tasks +// Usually not called directly, but by vine_hungry. //@param: struct vine_manager* - pointer to manager //@return: approximate number of additional tasks if hungry, 0 otherwise -int vine_hungry(struct vine_manager *q) +int vine_hungry_computation(struct vine_manager *q) { - // check if manager is initialized - // return false if not - if (q == NULL) { - return 0; - } - struct vine_stats qstats; vine_get_stats(q, &qstats); @@ -5315,6 +5314,25 @@ int vine_hungry(struct vine_manager *q) return min; } +int vine_hungry(struct vine_manager *q) +{ + if (!q) { + return 0; + } + + timestamp_t current_time = timestamp_get(); + + if (current_time - q->time_last_hungry + q->hungry_check_interval > 0) { + q->time_last_hungry = current_time; + q->tasks_waiting_last_hungry = priority_queue_size(q->ready_tasks); + q->tasks_to_sate_hungry = vine_hungry_computation(q); + } + + int dispatched_since = q->tasks_waiting_last_hungry - priority_queue_size(q->ready_tasks); + + return MAX(0, q->tasks_to_sate_hungry - dispatched_since); +} + int vine_workers_shutdown(struct vine_manager *q, int n) { struct vine_worker_info *w; diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index 4b669d03dd..b6951937f5 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -176,6 +176,12 @@ struct vine_manager { int file_source_max_transfers; int worker_source_max_transfers; + /* Hungry call optimization */ + timestamp_t time_last_hungry; /* Last time vine_hungry_computation was called. */ + int tasks_to_sate_hungry; /* Number of tasks that would sate the queue since last call to vine_hungry_computation. */ + int tasks_waiting_last_hungry; /* Number of tasks originally waiting when call to vine_hungry_computation was made. */ + timestamp_t hungry_check_interval; /* Maximum interval between vine_hungry_computation checks. */ + /* Various performance knobs that can be tuned. */ int short_timeout; /* Timeout in seconds to send/recv a brief message from worker */ int long_timeout; /* Timeout if in the middle of an incomplete message. */