diff --git a/batch_job/src/Makefile b/batch_job/src/Makefile index ada75817a6..39c7103a80 100755 --- a/batch_job/src/Makefile +++ b/batch_job/src/Makefile @@ -25,7 +25,8 @@ SOURCES = \ batch_queue_work_queue.c \ batch_queue_cluster.c \ batch_queue_k8s.c \ - batch_queue_amazon.c + batch_queue_amazon.c \ + batch_queue_flux.c PUBLIC_HEADERS = batch_queue.h batch_job.h batch_job_info.h batch_file.h batch_wrapper.h diff --git a/batch_job/src/batch_queue.c b/batch_job/src/batch_queue.c index 37ab771023..9a5dc9bc6f 100644 --- a/batch_job/src/batch_queue.c +++ b/batch_job/src/batch_queue.c @@ -21,6 +21,7 @@ See the file COPYING for details. extern const struct batch_queue_module batch_queue_amazon; extern const struct batch_queue_module batch_queue_cluster; extern const struct batch_queue_module batch_queue_condor; +extern const struct batch_queue_module batch_queue_flux; extern const struct batch_queue_module batch_queue_local; extern const struct batch_queue_module batch_queue_moab; extern const struct batch_queue_module batch_queue_uge; @@ -45,13 +46,14 @@ static struct batch_queue_module batch_queue_unknown = { NULL, }; -#define BATCH_JOB_SYSTEMS "local, vine, wq, condor, uge (sge), pbs, lsf, torque, moab, slurm, amazon, k8s, dryrun" +#define BATCH_JOB_SYSTEMS "local, vine, wq, condor, uge (sge), pbs, lsf, torque, moab, slurm, amazon, k8s, flux, dryrun" const struct batch_queue_module *const batch_queue_modules[] = { &batch_queue_amazon, &batch_queue_cluster, &batch_queue_condor, &batch_queue_dryrun, + &batch_queue_flux, &batch_queue_local, &batch_queue_moab, &batch_queue_uge, diff --git a/batch_job/src/batch_queue.h b/batch_job/src/batch_queue.h index d3e3f2e073..c7866c927a 100644 --- a/batch_job/src/batch_queue.h +++ b/batch_job/src/batch_queue.h @@ -33,7 +33,7 @@ Basic use is as follows:
 // Create a queue for submitting to HTCondor.
-struct batch_queue *queue = batch_queue_create(BATCH_QUEUE_TYPE_CONDOR);
+struct batch_queue *queue = batch_queue_create(BATCH_QUEUE_TYPE_CONDOR,0,0);
 
 // Define a batch job consiting of a command with input and output files.
 struct batch_job *job = batch_job_create(queue);
@@ -43,7 +43,7 @@ batch_job_add_input_file(job,"/usr/share/dict/words","words.txt");
 batch_job_add_output_file(job,"output.txt","output.txt");
 
 // Submit the job to the queue, which returns a jobid 
-batch_job_id_t jobid = batch_queue_submit(queue,job);
+batch_queue_id_t jobid = batch_queue_submit(queue,job);
 printf("jobid %" PRIbjid" submitted\n",jobid);
 
 // Wait for a job to complete, which returns the jobid and info.
@@ -79,6 +79,7 @@ typedef enum {
 	BATCH_QUEUE_TYPE_TORQUE,              /**< Batch jobs will be send to the Torque scheduler. */
 	BATCH_QUEUE_TYPE_AMAZON,              /**< Batch jobs will be run inside Amazon EC2 Instances */
 	BATCH_QUEUE_TYPE_K8S,                 /**< Batch jobs will be run inside Kubernetes pods. */
+	BATCH_QUEUE_TYPE_FLUX,                /**< Batch jobs will be run on Flux. */
 	BATCH_QUEUE_TYPE_DRYRUN,              /**< Batch jobs will not actually run. */
 	BATCH_QUEUE_TYPE_UNKNOWN = -1         /**< An invalid batch queue type. */
 } batch_queue_type_t;
diff --git a/batch_job/src/batch_queue_flux.c b/batch_job/src/batch_queue_flux.c
new file mode 100644
index 0000000000..745224f666
--- /dev/null
+++ b/batch_job/src/batch_queue_flux.c
@@ -0,0 +1,378 @@
+/*
+Copyright (C) 2024 The University of Notre Dame
+This software is distributed under the GNU General Public License.
+See the file COPYING for details.
+*/
+
+#include "batch_queue.h"
+#include "batch_queue_internal.h"
+#include "debug.h"
+#include "itable.h"
+#include "path.h"
+#include "process.h"
+#include "macros.h"
+#include "stringtools.h"
+#include "xxmalloc.h"
+#include "jx_parse.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+// itable mapping flux_job_ids to flux_job_info
+static struct itable *flux_job_info_table = NULL;
+
+// itable mapping batch_queue_jobid_t to flux_job_info
+static struct itable *batch_queue_jobid_info_table = NULL;
+
+static int job_count = 1;
+
+struct flux_job_info {
+	batch_queue_id_t job_id;
+	uint64_t flux_job_id;
+};
+
+static struct flux_job_info *create_flux_job_info(batch_queue_id_t job_id, uint64_t flux_job_id)
+{
+	struct flux_job_info *new_job_info = xxmalloc(sizeof(struct flux_job_info));
+	new_job_info->job_id = job_id;
+	new_job_info->flux_job_id = flux_job_id;
+
+	return new_job_info;
+}
+
+static void delete_flux_job_info(struct flux_job_info *job_info)
+{
+	if (job_info) {
+		free(job_info);
+	}
+}
+
+static batch_queue_id_t batch_queue_flux_submit(struct batch_queue *q, struct batch_job *bt)
+{
+	// Set same defaults as batch_queue_condor and condor_submit_workers
+	// Flux does not support setting memory and disk requirements
+	int64_t cores = 1;
+	int64_t gpus = 0;
+
+	struct rmsummary *resources = bt->resources;
+	if (resources) {
+		cores = resources->cores > -1 ? resources->cores : cores;
+		gpus = resources->gpus > -1 ? resources->gpus : gpus;
+	}
+
+	// Create archive to stage-in to flux job
+	// First, delete old archive if it exists
+	FILE *archive_remove_pipe = popen("flux archive remove -f", "r");
+	if (!archive_remove_pipe) {
+		return -1;
+	}
+	char buffer[BUFSIZ];
+	while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_remove_pipe) > 0) {
+	}
+	pclose(archive_remove_pipe);
+
+	// Only enable the stage-in option if we have files in the archive
+	bool flux_stage_in = false;
+	if (bt->input_files) {
+		struct batch_file *bf;
+		LIST_ITERATE(bt->input_files, bf)
+		{
+			flux_stage_in = true;
+
+			char *dirc = xxstrdup(bf->outer_name);
+			char *basec = xxstrdup(bf->outer_name);
+			char *dname = dirname(dirc);
+			char *bname = basename(basec);
+
+			char *command = string_format("flux archive create --append -C %s %s 2>&1", dname, bname);
+			FILE *archive_create_pipe = popen(command, "r");
+			if (!archive_create_pipe) {
+				return -1;
+			}
+			while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), archive_create_pipe) > 0) {
+			}
+			int archive_create_status = pclose(archive_create_pipe);
+			archive_create_status = WEXITSTATUS(archive_create_status);
+			if (archive_create_status != EXIT_SUCCESS) {
+				debug(D_BATCH, "flux failed to create archive with file %s", bf->outer_name);
+				return -1;
+			}
+
+			free(dirc);
+			free(basec);
+			free(command);
+		}
+	}
+
+	// Flux does not support staging files out of the worker environment, so warn for each file
+	if (bt->output_files) {
+		struct batch_file *bf;
+		LIST_ITERATE(bt->output_files, bf)
+		{
+			debug(D_BATCH, "warn: flux does not support output files (%s)", bf->outer_name);
+		}
+	}
+
+	// We simply export vars to the environment, and flux-submit pulls in the environment to the worker.
+	if (bt->envlist) {
+		jx_export(bt->envlist);
+	}
+
+	char *submit_command = string_format("flux submit %s --flags=waitable --nodes=1 --cores=%" PRId64 " --gpus-per-node=%" PRId64 " sh -c 'cd $FLUX_JOB_TMPDIR && %s' | flux job id --to=dec", flux_stage_in ? "-o stage-in" : "", cores, gpus, bt->command);
+	FILE *submit_pipe = popen(submit_command, "r");
+	free(submit_command);
+
+	uint64_t flux_job_id;
+	memset(buffer, 0, sizeof(buffer));
+	while (fgets(buffer, sizeof(buffer), submit_pipe)) {
+		if (sscanf(buffer, "%" PRIu64, &flux_job_id) == 1) {
+			batch_queue_id_t job_id = job_count++;
+			struct batch_job_info *info = calloc(1, sizeof(*info));
+			info->submitted = time(0);
+			info->started = time(0);
+			itable_insert(q->job_table, job_id, info);
+
+			struct flux_job_info *curr_job_info = create_flux_job_info(job_id, flux_job_id);
+			itable_insert(flux_job_info_table, flux_job_id, curr_job_info);
+			itable_insert(batch_queue_jobid_info_table, job_id, curr_job_info);
+
+			pclose(submit_pipe);
+
+			debug(D_BATCH, "created job_id %" PRId64 " with flux_job_id %" PRIu64, job_id, flux_job_id);
+			return job_id;
+		}
+	}
+
+	return -1;
+}
+
+static void fill_batch_job_info(struct batch_job_info *info_out, uint64_t flux_job_id)
+{
+	if (!info_out) {
+		return;
+	}
+
+	char *command = string_format("flux jobs --json %" PRIu64 " 2> /dev/null", flux_job_id);
+	FILE *pipe = popen(command, "r");
+	free(command);
+	command = NULL;
+
+	if (!pipe) {
+		return;
+	}
+
+	struct jx *json = jx_parse_stream(pipe);
+	if (!json) {
+		pclose(pipe);
+		return;
+	}
+
+	info_out->submitted = jx_lookup_double(json, "t_submit");
+	info_out->started = jx_lookup_double(json, "t_run");
+	info_out->disk_allocation_exhausted = 0;
+	info_out->exit_code = jx_lookup_integer(json, "returncode");
+	info_out->exit_signal = jx_lookup_integer(json, "waitstatus");
+	info_out->exit_signal = WSTOPSIG(info_out->exit_signal);
+	info_out->exited_normally = jx_lookup_integer(json, "waitstatus");
+	info_out->exited_normally = WIFEXITED(info_out->exited_normally);
+	info_out->finished = jx_lookup_boolean(json, "success");
+
+	jx_delete(json);
+}
+
+static batch_queue_id_t batch_queue_flux_wait_jobid(struct batch_queue *q, struct batch_job_info *info_out, time_t stoptime, uint64_t wait_flux_job_id)
+{
+	while (1) {
+		int timeout;
+
+		if (stoptime > 0) {
+			timeout = MAX(0, stoptime - time(0));
+		} else {
+			timeout = 5;
+		}
+
+		if (timeout <= 0) {
+			return -1;
+		}
+
+		char *wait_command;
+		if (wait_flux_job_id != 0) {
+			wait_command = string_format("timeout %ds flux job wait %" PRIu64 " 2>&1", timeout, wait_flux_job_id);
+		} else {
+			wait_command = string_format("timeout %ds flux job wait 2>&1", timeout);
+		}
+
+		FILE *wait_pipe = popen(wait_command, "r");
+		free(wait_command);
+		if (!wait_pipe) {
+			return -1;
+		}
+
+		char wait_output[BUFSIZ];
+		while (fread(wait_output, sizeof(char), sizeof(wait_output) / sizeof(char), wait_pipe) > 0) {
+		}
+		string_chomp(wait_output);
+		int wait_status = pclose(wait_pipe);
+		wait_status = WEXITSTATUS(wait_status);
+
+		if (wait_status == 124) {
+			// timeout killed the wait command
+			return -1;
+		} else if (wait_status == 2) {
+			// no more jobs to be waited on
+			return 0;
+		}
+
+		// convert output flux job id to decimal
+		char *convert_command = string_format("echo '%s' | flux job id --to=dec 2>&1", wait_output);
+		FILE *convert_pipe = popen(convert_command, "r");
+		free(convert_command);
+		if (!convert_pipe) {
+			return -1;
+		}
+
+		char convert_output[BUFSIZ];
+		uint64_t flux_job_id;
+		while (fgets(convert_output, sizeof(convert_output), convert_pipe)) {
+			if (sscanf(convert_output, "%" PRIu64, &flux_job_id) == 1) {
+				struct flux_job_info *job_info = itable_lookup(flux_job_info_table, flux_job_id);
+				if (job_info) {
+					pclose(convert_pipe);
+					fill_batch_job_info(info_out, flux_job_id);
+					return job_info->job_id;
+				}
+			}
+		}
+
+		pclose(convert_pipe);
+	}
+}
+
+static batch_queue_id_t batch_queue_flux_wait(struct batch_queue *q, struct batch_job_info *info_out, time_t stoptime)
+{
+	return batch_queue_flux_wait_jobid(q, info_out, stoptime, 0);
+}
+
+static int batch_queue_flux_remove(struct batch_queue *q, batch_queue_id_t jobid)
+{
+	struct flux_job_info *info = itable_lookup(batch_queue_jobid_info_table, jobid);
+	if (!info) {
+		return 0;
+	}
+
+	char *kill_command = string_format("flux job kill %" PRIu64 " 2>&1", info->flux_job_id);
+	FILE *kill_pipe = popen(kill_command, "r");
+	free(kill_command);
+	if (!kill_pipe) {
+		return 0;
+	}
+
+	char buffer[BUFSIZ];
+	while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {
+	}
+
+	int kill_status = pclose(kill_pipe);
+	kill_status = WEXITSTATUS(kill_status);
+	if (kill_status == EXIT_SUCCESS) {
+		// Kill signal sent successfully, try to wait on specific job
+		struct batch_job_info info_out;
+		batch_queue_id_t waited_jobid = batch_queue_flux_wait_jobid(q, &info_out, 5, info->flux_job_id);
+
+		if (waited_jobid != -1) {
+			return 1;
+		}
+
+		// Wait timed out, so kill it for real
+		kill_command = string_format("flux job kill -s SIGKILL %" PRIu64 " 2>&1", info->flux_job_id);
+		kill_pipe = popen(kill_command, "r");
+		free(kill_command);
+		if (!kill_pipe) {
+			return 0;
+		}
+
+		while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), kill_pipe) > 0) {
+		}
+		pclose(kill_pipe);
+
+		// Wait on job, then return
+		waited_jobid = batch_queue_flux_wait_jobid(q, &info_out, 5, info->flux_job_id);
+		if (waited_jobid != -1) {
+			return 1;
+		} else {
+			return 0;
+		}
+	} else {
+		return 0;
+	}
+}
+
+static int batch_queue_flux_create(struct batch_queue *q)
+{
+	batch_queue_set_option(q, "experimental", "yes");
+
+	FILE *uptime_pipe = popen("flux uptime 2>&1", "r");
+	if (!uptime_pipe) {
+		return -1;
+	}
+
+	char buffer[BUFSIZ];
+	while (fread(buffer, sizeof(char), sizeof(buffer) / sizeof(char), uptime_pipe) > 0) {
+	}
+
+	int uptime_status = pclose(uptime_pipe);
+	uptime_status = WEXITSTATUS(uptime_status);
+	if (uptime_status != EXIT_SUCCESS) {
+		debug(D_BATCH, "batch_queue_flux_create failed: not connected to flux environment");
+		return -1;
+	}
+
+	flux_job_info_table = itable_create(0);
+	batch_queue_jobid_info_table = itable_create(0);
+
+	return 0;
+}
+
+static int batch_queue_flux_free(struct batch_queue *q)
+{
+	if (flux_job_info_table) {
+		struct flux_job_info *info;
+		uint64_t flux_job_id;
+		ITABLE_ITERATE(flux_job_info_table, flux_job_id, info)
+		{
+			delete_flux_job_info(info);
+		}
+		itable_delete(flux_job_info_table);
+		flux_job_info_table = NULL;
+	}
+
+	if (batch_queue_jobid_info_table) {
+		itable_delete(batch_queue_jobid_info_table);
+		batch_queue_jobid_info_table = NULL;
+	}
+
+	return 0;
+}
+
+batch_queue_stub_port(flux);
+batch_queue_stub_option_update(flux);
+
+const struct batch_queue_module batch_queue_flux = {
+		BATCH_QUEUE_TYPE_FLUX,
+		"flux",
+
+		batch_queue_flux_create,
+		batch_queue_flux_free,
+		batch_queue_flux_port,
+		batch_queue_flux_option_update,
+
+		batch_queue_flux_submit,
+		batch_queue_flux_wait,
+		batch_queue_flux_remove,
+};
+
+/* vim: set noexpandtab tabstop=8: */
diff --git a/batch_job/src/batch_queue_k8s.c b/batch_job/src/batch_queue_k8s.c
index 7c08e5d41b..011d4c14d2 100644
--- a/batch_job/src/batch_queue_k8s.c
+++ b/batch_job/src/batch_queue_k8s.c
@@ -489,9 +489,9 @@ static int batch_queue_k8s_gen_running_pod_lst(struct list **running_pod_lst,
 		pod_state = strtok(NULL, " \t");
 
 		int i = 0;
-		while (pod_state[i] != '\n' && pod_state[i] != EOF)
+		while (pod_state[i] != '\n' && pod_state[i] != '\0')
 			i++;
-		if (pod_state[i] == '\n' || pod_state[i] == EOF)
+		if (pod_state[i] == '\n')
 			pod_state[i] = '\0';
 
 		if (strcmp(pod_state, "Running") == 0) {
@@ -607,9 +607,9 @@ static batch_queue_id_t batch_queue_k8s_wait(struct batch_queue *q,
 
 			// trim the tailing new line
 			int i = 0;
-			while (log_tail_content[i] != '\n' && log_tail_content[i] != EOF)
+			while (log_tail_content[i] != '\n' && log_tail_content[i] != '\0')
 				i++;
-			if (log_tail_content[i] == '\n' || log_tail_content[i] == EOF)
+			if (log_tail_content[i] == '\n' || log_tail_content[i] == '\0')
 				log_tail_content[i] = '\0';
 
 			free(get_log_cmd);
diff --git a/configure b/configure
index 627d52d301..1d9048e9b1 100755
--- a/configure
+++ b/configure
@@ -64,6 +64,14 @@ case "$BUILD_CPU" in
 	;;
 esac
 
+if [ "$BUILD_CPU" = X86_64 -a "$BUILD_SYS" = LINUX -a "$CONDA_BUILD_CROSS_COMPILATION" != 1 ]
+then
+	BUILD_LINUX_NATIVE_X86_64=yes
+else
+	BUILD_LINUX_NATIVE_X86_64=no
+fi
+
+
 include_package_chirp="chirp"
 include_package_deltadb="deltadb"
 include_package_doc="doc"
@@ -1241,7 +1249,7 @@ fi
 
 if [ "$include_package_resource_monitor" = resource_monitor ]
 then
-	if [ -d resource_monitor -a $BUILD_SYS = LINUX ]
+	if [ -d resource_monitor -a "$BUILD_LINUX_NATIVE_X86_64" = yes ]
 	then
 		echo "resource_monitor IS supported on ${BUILD_SYS}"
 	else
@@ -1280,7 +1288,7 @@ then
 	fi
 fi
 
-potential_packages="dttools batch_job ${include_package_grow} ${include_package_makeflow} ${include_package_work_queue} ${include_package_ftplite} ${include_package_taskvine} ${include_package_parrot} ${include_package_resource_monitor} ${include_package_chirp} ${include_package_deltadb} ${include_package_doc} ${include_package_poncho}"
+potential_packages="dttools batch_job ${include_package_taskvine} ${include_package_grow} ${include_package_makeflow} ${include_package_work_queue} ${include_package_ftplite} ${include_package_parrot} ${include_package_resource_monitor} ${include_package_chirp} ${include_package_deltadb} ${include_package_doc} ${include_package_poncho}"
 
 echo "checking for all the things I know how to build..."
 for p in $potential_packages
@@ -1429,6 +1437,8 @@ CCTOOLS_INSTALL_DIR=${install_path}
 
 CCTOOLS_OPSYS=${BUILD_SYS}
 
+CCTOOLS_LINUX_NATIVE_X86_64=${BUILD_LINUX_NATIVE_X86_64}
+
 CCTOOLS_PACKAGES=${packages}
 
 CCTOOLS_CC=${CCTOOLS_CC}
diff --git a/doc/manuals/makeflow/index.md b/doc/manuals/makeflow/index.md
index e72f3eece4..85ec76f027 100644
--- a/doc/manuals/makeflow/index.md
+++ b/doc/manuals/makeflow/index.md
@@ -444,6 +444,19 @@ output: input
     srun $(BATCH_OPTIONS) --mpi=pmi2 -- ./my-mpi-job -i input -o output
 ```
 
+### Flux
+
+The [Flux resource manager](https://flux-framework.readthedocs.io/en/latest/)
+is available as an experimental backend by passing the `-T flux` option.
+
+To use the Flux batch system, you must run Makeflow in a shell connected to a
+Flux instance (i.e. `$FLUX_URI` must be set and valid). This is validated by
+ensuring `flux uptime` runs successfully.
+
+While the Flux backend supports staging in files into the execution
+environment, it currently does not support staging files out, instead assuming
+a shared filesystem. Simple programs that use `batch_job` but do not require
+output files, like `vine_factory`, should work fine.
 
 ### Moab Scheduler
 
diff --git a/dttools/src/bitmap.c b/dttools/src/bitmap.c
index 84e9b772d8..608c90218f 100644
--- a/dttools/src/bitmap.c
+++ b/dttools/src/bitmap.c
@@ -385,7 +385,7 @@ static unsigned char pcx_rle_repeat = 0;
 static unsigned char pcx_rle_value = 0;
 static unsigned char pcx_rle_read(FILE *file)
 {
-	unsigned char c;
+	int c;
 retry:
 	if (pcx_rle_repeat > 0) {
 		pcx_rle_repeat--;
diff --git a/dttools/src/envtools.c b/dttools/src/envtools.c
index adb15b492e..ebc5980569 100644
--- a/dttools/src/envtools.c
+++ b/dttools/src/envtools.c
@@ -63,7 +63,7 @@ int env_replace(const char *infile, const char *outfile)
 	int var_index = 0;
 	int valid_var = 0;
 
-	char c = fgetc(INPUT);
+	int c = fgetc(INPUT);
 	while (c != EOF) {
 		if (c == '$') {
 			valid_var = 1;
diff --git a/resource_monitor/src/Makefile b/resource_monitor/src/Makefile
index 8126e39f40..e190809a4e 100644
--- a/resource_monitor/src/Makefile
+++ b/resource_monitor/src/Makefile
@@ -20,11 +20,11 @@ LOCAL_LINKAGE = ../../dttools/src/libdttools.a
 
 PROGRAMS = resource_monitor piggybacker rmonitor_poll_example rmonitor_snapshot
 
-ifeq ($(CCTOOLS_OPSYS),DARWIN)
+ifeq ($(CCTOOLS_LINUX_NATIVE_X86_64),yes)
+	TARGETS = $(LIBRARIES) $(PROGRAMS)
+else
 	TARGETS =
 	PROGRAMS =
-else
-	TARGETS = $(LIBRARIES) $(PROGRAMS)
 endif
 
 all: $(TARGETS) bindings
@@ -69,14 +69,14 @@ clean:
 	$(MAKE) -C bindings clean
 
 install: all
-ifeq ($(CCTOOLS_OPSYS),DARWIN)
-	$(MAKE) -C bindings install
-else
+ifeq ($(CCTOOLS_LINUX_NATIVE_X86_64),yes)
 	mkdir -p $(CCTOOLS_INSTALL_DIR)/bin
 	cp $(PROGRAMS) $(CCTOOLS_INSTALL_DIR)/bin/
 	mkdir -p $(CCTOOLS_INSTALL_DIR)/lib
 	cp $(LIBRARIES) $(CCTOOLS_INSTALL_DIR)/lib/
 	$(MAKE) -C bindings install
+else
+	$(MAKE) -C bindings install
 endif
 
 test: all
diff --git a/resource_monitor/src/rmonitor_helper.c b/resource_monitor/src/rmonitor_helper.c
index 34139fe3e0..8e13d7ef76 100644
--- a/resource_monitor/src/rmonitor_helper.c
+++ b/resource_monitor/src/rmonitor_helper.c
@@ -270,6 +270,7 @@ int close(int fd)
 	return status;
 }
 
+#if defined(SYS_open)
 static int open_for_writing(int fd)
 {
 	int flags, access_mode;
@@ -369,6 +370,7 @@ int open64(const char *path, int flags, ...)
 	return fd;
 }
 #endif /* defined linux && __USE_LARGEFILE64 */
+#endif /* defined SYS_open */
 
 int socket(int domain, int type, int protocol)
 {
diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c
index ce7931f10b..f2bf7c1c28 100644
--- a/taskvine/src/manager/vine_manager.c
+++ b/taskvine/src/manager/vine_manager.c
@@ -914,6 +914,7 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
 			t->time_workers_execute_all += delta_time;
 		}
 
+		/* Remove the unfinished task and update data structures. */
 		reap_task_from_worker(q, w, t, VINE_TASK_READY);
 
 		// recreate inputs lost
@@ -1352,9 +1353,8 @@ static int fetch_outputs_from_worker(struct vine_manager *q, struct vine_worker_
 
 	vine_accumulate_task(q, t);
 
-	// At this point, a task is completed.
+	/* Remove the completed task and update all data structures. */
 	reap_task_from_worker(q, w, t, VINE_TASK_RETRIEVED);
-	vine_manager_send(q, w, "kill %d\n", t->task_id);
 
 	switch (t->result) {
 	case VINE_RESULT_INPUT_MISSING:
@@ -3028,7 +3028,7 @@ static int resubmit_if_needed(struct vine_manager *q, struct vine_worker_info *w
 }
 
 /*
-Collect a completed task from a worker, and then update
+Remove a running or completed task from a worker, and then update
 all auxiliary data structures to remove the association
 and change the task state.
 */
@@ -3043,6 +3043,9 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf
 	/* Make sure the task and worker agree before changing anything. */
 	assert(t->worker == w);
 
+	/* Tell worker to remove the task sandbox (and if necessary, the running process) */
+	vine_manager_send(q, w, "kill %d\n", t->task_id);
+
 	w->total_task_time += t->time_workers_execute_last;
 
 	rmsummary_delete(t->current_resource_box);
@@ -3116,7 +3119,7 @@ If a file can be fetched from a substitute source,
 this function modifies the file->substitute field to reflect that source.
 */
 
-static int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
+int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
 {
 	struct vine_mount *m;
 
@@ -3299,56 +3302,41 @@ static int vine_manager_check_library_for_function_call(struct vine_manager *q,
 }
 
 /*
-Consider if a task is eligible to run, and if so, find the best worker for it.
+Consider if a task is eligible to run.
 */
-static struct vine_worker_info *consider_task(struct vine_manager *q, struct vine_task *t)
+int consider_task(struct vine_manager *q, struct vine_task *t)
 {
 	timestamp_t now_usecs = timestamp_get();
 	double now_secs = ((double)now_usecs) / ONE_SECOND;
 
 	// Skip task if min requested start time not met.
 	if (t->resources_requested->start > now_secs) {
-		return NULL;
+		return 0;
 	}
 
 	// Skip if this task failed recently
 	if (t->time_when_last_failure + q->transient_error_interval > now_usecs) {
-		return NULL;
+		return 0;
 	}
 
 	// Skip if category already running maximum allowed tasks
 	struct category *c = vine_category_lookup_or_create(q, t->category);
 	if (c->max_concurrent > -1 && c->max_concurrent <= c->vine_stats->tasks_running) {
-		return NULL;
+		return 0;
 	}
 
 	// Skip task if temp input files have not been materialized.
 	if (!vine_manager_check_inputs_available(q, t)) {
-		return NULL;
+		return 0;
 	}
 
-	// Skip function call task if no suitable library template was installed
+	// Skip function call task if no suitable library template was installed.
 	if (!vine_manager_check_library_for_function_call(q, t)) {
-		return NULL;
-	}
-
-	// Find the best worker for the task
-	q->stats_measure->time_scheduling = timestamp_get();
-	struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
-	if (!w) {
-		return NULL;
-	}
-	q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;
-
-	// Check if there is transfer capacity available.
-	if (q->peer_transfers_enabled) {
-		if (!vine_manager_transfer_capacity_available(q, w, t)) {
-			return NULL;
-		}
+		return 0;
 	}
 
-	// All checks passed
-	return w;
+	// All checks passed, task is eligible to run.
+	return 1;
 }
 
 /*
@@ -3365,7 +3353,6 @@ static int send_one_task(struct vine_manager *q)
 
 	int t_idx;
 	struct vine_task *t;
-	struct vine_worker_info *w = NULL;
 
 	int iter_count = 0;
 	int iter_depth = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth);
@@ -3386,7 +3373,15 @@ static int send_one_task(struct vine_manager *q)
 	// the priority queue data structure where also invokes priority_queue_rotate_reset.
 	PRIORITY_QUEUE_ROTATE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
 	{
-		w = consider_task(q, t);
+		if (!consider_task(q, t)) {
+			continue;
+		}
+
+		// Find the best worker for the task
+		q->stats_measure->time_scheduling = timestamp_get();
+		struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
+		q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;
+
 		if (w) {
 			priority_queue_remove(q->ready_tasks, t_idx);
 			commit_task_to_worker(q, w, t);
@@ -3729,22 +3724,17 @@ static void reset_task_to_state(struct vine_manager *q, struct vine_task *t, vin
 		break;
 
 	case VINE_TASK_RUNNING:
-		// t->worker must be set if in RUNNING state.
+		/* t->worker must be set if in RUNNING state */
 		assert(w);
 
-		// send message to worker asking to kill its task.
-		vine_manager_send(q, w, "kill %d\n", t->task_id);
-		debug(D_VINE, "Task with id %d has been cancelled at worker %s (%s) and removed.", t->task_id, w->hostname, w->addrport);
+		/* Remove the running task and update all data structures. */
+		reap_task_from_worker(q, w, t, new_state);
 
-		// Delete any input files that are not to be cached.
+		/* After task is killed, delete non-cacheable inputs and all (incomplete) output files from the worker cache. */
 		delete_worker_files(q, w, t->input_mounts, VINE_CACHE_LEVEL_TASK);
-
-		// Delete all output files since they are not needed as the task was cancelled.
 		delete_worker_files(q, w, t->output_mounts, VINE_CACHE_LEVEL_FOREVER);
 
-		// Collect task structure from worker.
-		// Note that this calls change_task_state internally.
-		reap_task_from_worker(q, w, t, new_state);
+		/* change_task_state() happened inside reap_task_from_worker */
 
 		break;
 
diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h
index 7b060837b7..81e47f209f 100644
--- a/taskvine/src/manager/vine_manager.h
+++ b/taskvine/src/manager/vine_manager.h
@@ -282,6 +282,9 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag,
 
 void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info *w, vine_worker_disconnect_reason_t reason);
 
+/* Check if the worker is able to transfer the necessary files for this task. */
+int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
+
 /* The expected format of files created by the resource monitor.*/
 #define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d"
 #define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor"
diff --git a/taskvine/src/manager/vine_schedule.c b/taskvine/src/manager/vine_schedule.c
index 19678243f8..edc75621ea 100644
--- a/taskvine/src/manager/vine_schedule.c
+++ b/taskvine/src/manager/vine_schedule.c
@@ -281,6 +281,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
 		return 0;
 	}
 
+	/* If the worker has transfer capacity to get this task. */
+	if (q->peer_transfers_enabled && !vine_manager_transfer_capacity_available(q, w, t)) {
+		return 0;
+	}
+
 	/* If the worker doesn't have the features the task requires. */
 	if (t->feature_list) {
 		if (!w->features) {