Skip to content

Commit

Permalink
master: Submit k8s job reading minRequirements #TASK-6445
Browse files Browse the repository at this point in the history
  • Loading branch information
j-coll committed Nov 14, 2024
1 parent 93c141b commit 7a5b7cd
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 51 deletions.
15 changes: 1 addition & 14 deletions opencga-core/src/main/resources/configuration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ analysis:
HG38: "exomiser/2402_hg38.zip"
PHENOTYPE: "exomiser/2402_phenotype.zip"
execution:
# Accepted values are "local", "SGE", "azure-batch", "k8s"
# Accepted values are "local", "SGE", "k8s"
# see org.opencb.opencga.master.monitor.executors.ExecutorFactory
id: "${OPENCGA.EXECUTION.MODE}"
defaultQueue: "" # Default queue to be used to submit jobs
Expand Down Expand Up @@ -147,13 +147,6 @@ analysis:
- "variant-secondary-sample-index"
## Local executor configuration
local.maxConcurrentJobs: 2 # Max number of concurrent jobs to be executed locally in the master
## Azure Batch Service configuration example
# azure.batchAccount : "batchAccount"
# azure.batchKey : "batchKey"
# azure.batchUri : "https://batchservice.uksouth.batch.azure.com"
# azure.batchPoolId : "poolId"
# azure.dockerImageName : "openCGADockerImageName"
# azure.dockerArgs : "dockerRunOptions"

## Kubernetes executor configuration example
# k8s.masterUrl: "https://192.168.99.100:8443/"
Expand Down Expand Up @@ -181,12 +174,6 @@ analysis:
agentpool: jobs
kubernetes.io/os: linux
# FOR ACI:
# k8s.requests:
# cpu: 2
# memory: "12G"
# k8s.limits:
# cpu: 2
# memory: "12G"
# k8s.nodeSelector:
# kubernetes.io/role: agent
# kubernetes.io/os: linux
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.opencb.opencga.core.common;

import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.opencb.opencga.core.testclassification.duration.ShortTests;
Expand All @@ -25,6 +26,9 @@
import java.io.InputStreamReader;
import java.nio.file.Paths;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

@Category(ShortTests.class)
public class IOUtilsTest {

Expand Down Expand Up @@ -68,4 +72,35 @@ public void testGrepFile() throws Exception {
in.close();

}

@Test
public void fromHumanReadableToByte_ValidInput_ReturnsCorrectBytes() {
assertEquals(1024, IOUtils.fromHumanReadableToByte("1Ki"));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1KiB"));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1.KiB"));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1.0KiB"));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1.0K", true));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1K", true));
assertEquals(1024, IOUtils.fromHumanReadableToByte("1KB", true));

assertEquals(1000, IOUtils.fromHumanReadableToByte("1K"));
assertEquals(1000, IOUtils.fromHumanReadableToByte("1KB"));
}

@Test
public void fromHumanReadableToByte_ValidInputWithBinaryUnits_ReturnsCorrectBytes() {
assertEquals(1048576, IOUtils.fromHumanReadableToByte("1Mi"));
assertEquals(1073741824, IOUtils.fromHumanReadableToByte("1Gi"));
assertEquals(1099511627776L, IOUtils.fromHumanReadableToByte("1Ti"));
}

@Test
public void fromHumanReadableToByte_InvalidInput_ThrowsNumberFormatException() {
assertThrows(NumberFormatException.class, () -> IOUtils.fromHumanReadableToByte("1X"));
}

@Test
public void fromHumanReadableToByte_NullInput_ThrowsNullPointerException() {
assertThrows(NullPointerException.class, () -> IOUtils.fromHumanReadableToByte(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ protected int checkPendingJob(Job job) {
try {
String queue = getQueue(tool);
logger.info("Queue job '{}' on queue '{}'", job.getId(), queue);
batchExecutor.execute(job.getId(), queue, authenticatedCommandLine, stdout, stderr);
batchExecutor.execute(job, queue, authenticatedCommandLine, stdout, stderr);
} catch (Exception e) {
return abortJob(job, "Error executing job.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,19 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;

/**
* Created by wasim on 17/12/18.
*
* ## Azure Batch Service configuration example
* # azure.batchAccount : "batchAccount"
* # azure.batchKey : "batchKey"
* # azure.batchUri : "https://batchservice.uksouth.batch.azure.com"
* # azure.batchPoolId : "poolId"
* # azure.dockerImageName : "openCGADockerImageName"
* # azure.dockerArgs : "dockerRunOptions"
*/
public class AzureBatchExecutor implements BatchExecutor {
public abstract class AzureBatchExecutor implements BatchExecutor {

private String batchAccount;
private String batchKey;
Expand Down Expand Up @@ -94,11 +101,6 @@ private BatchClient createBatchClient() {
return BatchClient.open(new BatchSharedKeyCredentials(batchUri, batchAccount, batchKey));
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
// submitAzureTask(job, token);
}

@Override
public String getStatus(String jobId) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.opencb.opencga.master.monitor.executors;

import org.opencb.opencga.core.models.job.Job;

import java.io.Closeable;
import java.nio.file.Path;

Expand All @@ -24,16 +26,7 @@
*/
public interface BatchExecutor extends Closeable {

String TIMEOUT = "timeout";
String STDOUT = "stdout";
String STDERR = "stderr";
String OUTDIR = "outdir";
String NUM_THREADS = "num_threads";
String MAX_MEM = "max_mem";
String OUT_LOG_EXTENSION = ".log";
String ERR_LOG_EXTENSION = ".err";

void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception;
void execute(Job job, String queue, String commandLine, Path stdout, Path stderr) throws Exception;

String getStatus(String jobId);

Expand All @@ -49,10 +42,6 @@ default boolean canBeQueued() {

boolean isExecutorAlive();

default String getCommandLine(String commandLine) {
return getCommandLine(commandLine, null, null);
}

/**
* We do it this way to avoid writing the session id in the command line (avoid display/monitor/logs) attribute of Job.
* @param commandLine Basic command line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public ExecutorFactory(Configuration configuration) {
case "sge":
this.executor = new SGEExecutor(execution);
break;
case "azure":
case "azure-batch":
this.executor = new AzureBatchExecutor(execution);
break;
// case "azure":
// case "azure-batch":
// this.executor = new AzureBatchExecutor(execution);
// break;
case "k8s":
case "kubernetes":
this.executor = new K8SExecutor(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,10 @@ public void close() throws IOException {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
String jobName = buildJobName(jobId);
public void execute(org.opencb.opencga.core.models.job.Job job, String queue, String commandLine, Path stdout, Path stderr)
throws Exception {
String jobName = buildJobName(job.getId());
ResourceRequirements resources = getResources(job);
final io.fabric8.kubernetes.api.model.batch.v1.Job k8sJob = new JobBuilder()
.withApiVersion("batch/v1")
.withKind("Job")
Expand Down Expand Up @@ -338,7 +340,28 @@ public void execute(String jobId, String queue, String commandLine, Path stdout,
k8sJob.getSpec().getTemplate().getSpec().getContainers().add(dockerDaemonSidecar);
}
jobStatusCache.put(jobName, Pair.of(Instant.now(), Enums.ExecutionStatus.QUEUED));
getKubernetesClient().batch().jobs().inNamespace(namespace).create(k8sJob);
getKubernetesClient().batch().v1().jobs().inNamespace(namespace).resource(k8sJob).create();
}

private ResourceRequirements getResources(org.opencb.opencga.core.models.job.Job job) {
if (job.getTool().getMinimumRequirements() != null) {
ResourceRequirementsBuilder resources = new ResourceRequirementsBuilder(this.resources);
if (StringUtils.isNotEmpty(job.getTool().getMinimumRequirements().getMemory())) {
long memoryBytes = IOUtils.fromHumanReadableToByte(job.getTool().getMinimumRequirements().getMemory());
Quantity memory = new Quantity(String.valueOf(memoryBytes));
resources.addToRequests("memory", memory);
resources.addToLimits("memory", memory);
}
if (StringUtils.isNotEmpty(job.getTool().getMinimumRequirements().getCpu())) {
double cpuUnits = Double.parseDouble(job.getTool().getMinimumRequirements().getCpu());
Quantity cpu = new Quantity(Double.toString(cpuUnits));
resources.addToRequests("cpu", cpu);
resources.addToLimits("cpu", cpu);
}
return resources.build();
} else {
return this.resources;
}
}

private boolean shouldAddDockerDaemon(String queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opencb.commons.exec.RunnableProcess;
import org.opencb.opencga.core.config.Execution;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.job.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -71,7 +72,8 @@ public void close() throws IOException {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
public void execute(Job job, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
String jobId = job.getId();
jobStatus.put(jobId, Enums.ExecutionStatus.QUEUED);
Runnable runnable = () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.opencb.opencga.core.config.Execution;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.job.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,8 +43,8 @@ public void close() throws IOException {
}

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
sgeManager.queueJob(jobId, "", -1, getCommandLine(commandLine, stdout, stderr), null);
public void execute(Job job, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
sgeManager.queueJob(job.getId(), "", -1, getCommandLine(commandLine, stdout, stderr), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,9 @@ private static class DummyBatchExecutor implements BatchExecutor {
public Map<String, String> jobStatus = new HashMap<>();

@Override
public void execute(String jobId, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
System.out.println("Executing job " + jobId + " --- " + commandLine);
jobStatus.put(jobId, Enums.ExecutionStatus.QUEUED);
public void execute(Job job, String queue, String commandLine, Path stdout, Path stderr) throws Exception {
System.out.println("Executing job " + job.getId() + " --- " + commandLine);
jobStatus.put(job.getId(), Enums.ExecutionStatus.QUEUED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.experimental.categories.Category;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.core.config.Execution;
import org.opencb.opencga.core.models.job.Job;
import org.opencb.opencga.core.testclassification.duration.ShortTests;

import java.nio.file.Files;
Expand Down Expand Up @@ -53,7 +54,7 @@ public void setUp() throws Exception {
public void test() throws Exception {
System.out.println(rootDir.toAbsolutePath());
for (int i = 0; i < 10; i++) {
localExecutor.execute("jobId-" + i, "default", "echo Hello World", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
localExecutor.execute(new Job().setId("jobId-" + i), "default", "echo Hello World", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
}

for (int i = 0; i < 10; i++) {
Expand All @@ -73,7 +74,7 @@ public void testKill() throws Exception {
System.out.println(rootDir.toAbsolutePath());
for (int i = 0; i < 10; i++) {
// System.out.println("Submitting job " + i);
localExecutor.execute("jobId-" + i, "default", "sleep 20", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
localExecutor.execute(new Job().setId("jobId-" + i), "default", "sleep 20", rootDir.resolve("out_" + i + ".txt"), rootDir.resolve("err_" + i + ".txt"));
}

// Allow some time for the jobs to start
Expand Down

0 comments on commit 7a5b7cd

Please sign in to comment.