Skip to content

Commit

Permalink
Merge pull request #2476 from opencb/TASK-6089
Browse files Browse the repository at this point in the history
TASK-6089 - Automation: Optimise Job management
  • Loading branch information
j-coll authored Jul 10, 2024
2 parents 1a7f9a2 + 6ce3797 commit ab5d880
Show file tree
Hide file tree
Showing 166 changed files with 4,257 additions and 714 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,17 @@
import org.opencb.biodata.tools.alignment.BamUtils;
import org.opencb.cellbase.client.rest.CellBaseClient;
import org.opencb.cellbase.client.rest.GeneClient;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.StorageManager;
import org.opencb.opencga.analysis.models.FileInfo;
import org.opencb.opencga.analysis.models.StudyInfo;
import org.opencb.opencga.analysis.tools.ToolRunner;
import org.opencb.opencga.catalog.db.api.FileDBAdaptor;
import org.opencb.opencga.catalog.db.api.ProjectDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.CatalogManager;
import org.opencb.opencga.catalog.utils.CatalogFqn;
import org.opencb.opencga.catalog.utils.ParamUtils;
import org.opencb.opencga.core.api.ParamConstants;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.alignment.AlignmentIndexParams;
import org.opencb.opencga.core.models.alignment.CoverageIndexParams;
import org.opencb.opencga.core.models.JwtPayload;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.project.Project;
Expand All @@ -70,6 +64,7 @@ public class AlignmentStorageManager extends StorageManager {

private AlignmentStorageEngine alignmentStorageEngine;
private String jobId;
private boolean dryRun;

private static final Map<String, String> statsMap = new HashMap<>();

Expand All @@ -82,12 +77,13 @@ public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFacto
initStatsMap();
}

public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFactory storageEngineFactory, String jobId) {
public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFactory storageEngineFactory, String jobId, boolean dryRun) {
super(catalogManager, storageEngineFactory);

// TODO: Create this alignmentStorageEngine by reflection
this.alignmentStorageEngine = new LocalAlignmentStorageEngine();
this.jobId = jobId;
this.dryRun = dryRun;

initStatsMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> flagStatsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentFlagStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
flagStatsJobId = flagStatsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + flagStatsJobId + " to compute stats (" + AlignmentFlagStatsAnalysis.ID
+ ")");
Expand All @@ -137,7 +138,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> statsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
statsJobId = statsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + statsJobId + " to compute stats (" + AlignmentStatsAnalysis.ID + ")");
}
Expand All @@ -154,7 +156,7 @@ protected void run() throws ToolException {
OpenCGAResult<Job> fastQcMetricsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentFastQcMetricsAnalysis.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
token);
getJobId(), null, false, token);
fastQcMetricsJobId = fastQcMetricsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + fastQcMetricsJobId + " to compute FastQC metrics ("
+ AlignmentFastQcMetricsAnalysis.ID + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collections;

@Tool(id = ClinicalTsvAnnotationLoader.ID, resource = Enums.Resource.CLINICAL_ANALYSIS, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class ClinicalTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "clinical-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = AuxiliarRgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION,
description = AuxiliarRgaAnalysis.DESCRIPTION)
description = AuxiliarRgaAnalysis.DESCRIPTION, priority = Enums.Priority.HIGH)
public class AuxiliarRgaAnalysis extends OperationTool {
public final static String ID = "rga-aux-index";
public final static String DESCRIPTION = ParamConstants.INDEX_AUXILIAR_COLLECTION_DESCRIPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = RgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION, description = "Index RGA study.")
@Tool(id = RgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION, description = "Index RGA study.",
priority = Enums.Priority.HIGH)
public class RgaAnalysis extends OperationTool {
public final static String ID = "rga-index";
public final static String DESCRIPTION = "Generate Recessive Gene Analysis secondary index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = CohortTsvAnnotationLoader.ID, resource = Enums.Resource.COHORT, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class CohortTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "cohort-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = FamilyTsvAnnotationLoader.ID, resource = Enums.Resource.FAMILY, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class FamilyTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "family-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.List;

@Tool(id = FetchAndRegisterTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = "Download an external file and register it in OpenCGA.")
description = "Download an external file and register it in OpenCGA.", priority = Enums.Priority.HIGH)
public class FetchAndRegisterTask extends OpenCgaToolScopeStudy {

public final static String ID = "files-fetch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
import java.io.IOException;
import java.util.*;

@Tool(id = FileDeleteTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Delete files.")
@Tool(id = FileDeleteTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Delete files.",
priority = Enums.Priority.HIGH)
public class FileDeleteTask extends OpenCgaTool {

public final static String ID = "files-delete";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.util.List;
import java.util.Map;

@Tool(id = FileLinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = FileLinkTask.DESCRIPTION)
@Tool(id = FileLinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = FileLinkTask.DESCRIPTION,
priority = Enums.Priority.HIGH)
public class FileLinkTask extends OpenCgaToolScopeStudy {

public static final String ID = "file-link";
Expand Down Expand Up @@ -69,7 +70,7 @@ protected void run() throws Exception {
Job postLinkJob = catalogManager.getJobManager()
.submit(getStudy(), PostLinkSampleAssociation.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
getToken()).first();
getJobId(), null, false, getToken()).first();
logger.info("Submit post-link job : " + postLinkJob.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collections;

@Tool(id = FileTsvAnnotationLoader.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class FileTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "file-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@

import java.util.*;

@Tool(id = FileUnlinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Unlink files.")
@Tool(id = FileUnlinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Unlink files.",
priority = Enums.Priority.HIGH)
public class FileUnlinkTask extends OpenCgaTool {

public final static String ID = "files-unlink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.*;

@Tool(id = PostLinkSampleAssociation.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = PostLinkSampleAssociation.DESCRIPTION)
description = PostLinkSampleAssociation.DESCRIPTION, priority = Enums.Priority.HIGH)
public class PostLinkSampleAssociation extends OpenCgaToolScopeStudy {

public static final String ID = "postlink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = IndividualTsvAnnotationLoader.ID, resource = Enums.Resource.INDIVIDUAL, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class IndividualTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "individual-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = SampleTsvAnnotationLoader.ID, resource = Enums.Resource.SAMPLE, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class SampleTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "sample-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> variantStatsJobResult = catalogManager.getJobManager()
.submit(study, SampleVariantStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
variantStatsJobId = variantStatsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + variantStatsJobId + " to compute stats (" + SampleVariantStatsAnalysis.ID
+ ")");
Expand Down Expand Up @@ -267,7 +268,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> signatureJobResult = catalogManager.getJobManager()
.submit(getStudy(), MutationalSignatureAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
signatureJobId = signatureJobResult.first().getId();
logger.info("Submitted job {} to compute the mutational signature analysis {}", signatureJobId,
MutationalSignatureAnalysis.ID);
Expand All @@ -289,7 +291,7 @@ protected void run() throws ToolException {
OpenCGAResult<Job> genomePlotJobResult = catalogManager.getJobManager()
.submit(getStudy(), GenomePlotAnalysis.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
token);
getJobId(), null, false, token);
genomePlotJobId = genomePlotJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + genomePlotJobId + " to compute genome plot (" + GenomePlotAnalysis.ID
+ ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.nio.file.Paths;

@Tool(id = TemplateRunner.ID, description = TemplateRunner.DESCRIPTION, type = Tool.Type.OPERATION, resource = Enums.Resource.STUDY,
scope = Tool.Scope.STUDY)
scope = Tool.Scope.STUDY, priority = Enums.Priority.HIGH)
public class TemplateRunner extends OperationTool {

public static final String ID = "templates";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class OpenCgaTool {

private String jobId;
private String opencgaHome;
private boolean dryRun;
protected String token;

protected final ObjectMap params;
Expand All @@ -92,19 +93,20 @@ public OpenCgaTool() {
}

public final OpenCgaTool setUp(String opencgaHome, CatalogManager catalogManager, StorageEngineFactory engineFactory,
ObjectMap params, Path outDir, String jobId, String token) {
ObjectMap params, Path outDir, String jobId, boolean dryRun, String token) {
VariantStorageManager manager = new VariantStorageManager(catalogManager, engineFactory);
return setUp(opencgaHome, catalogManager, manager, params, outDir, jobId, token);
return setUp(opencgaHome, catalogManager, manager, params, outDir, jobId, dryRun, token);
}

public final OpenCgaTool setUp(String opencgaHome, CatalogManager catalogManager, VariantStorageManager variantStorageManager,
ObjectMap params, Path outDir, String jobId, String token) {
ObjectMap params, Path outDir, String jobId, boolean dryRun, String token) {
this.opencgaHome = opencgaHome;
this.catalogManager = catalogManager;
this.configuration = catalogManager.getConfiguration();
this.variantStorageManager = variantStorageManager;
this.storageConfiguration = variantStorageManager.getStorageConfiguration();
this.jobId = jobId;
this.dryRun = dryRun;
this.token = token;
if (params != null) {
this.params.putAll(params);
Expand Down Expand Up @@ -188,13 +190,14 @@ public final ExecutionResult start() throws ToolException {
exception = e;
}
if (!erm.isClosed()) {
privateLogger.error("Unexpected system shutdown!");
String message = "Unexpected system shutdown. Job killed by the system.";
privateLogger.error(message);
try {
if (scratchDir != null) {
deleteScratchDirectory();
}
if (exception == null) {
exception = new RuntimeException("Unexpected system shutdown");
exception = new RuntimeException(message);
}
logException(exception);
ExecutionResult result = erm.close(exception);
Expand Down Expand Up @@ -243,21 +246,32 @@ public final ExecutionResult start() throws ToolException {
try {
currentStep = "check";
privateCheck();
check();
currentStep = null;
erm.setSteps(getSteps());
run();
if (dryRun) {
logger.info("Dry run enabled. Sleep for 5 seconds and skip execution.");
Thread.sleep(5000);
} else {
currentStep = null;
erm.setSteps(getSteps());
run();
}
} catch (ToolException e) {
throw e;
} catch (Exception e) {
throw new ToolException(e);
}
Runtime.getRuntime().removeShutdownHook(hook);
} catch (Throwable e) {
exception = e;
// Do not use a finally block to remove shutdownHook, as finally blocks will be executed even if the JVM is killed,
// and this would throw IllegalStateException("Shutdown in progress");
try {
Runtime.getRuntime().removeShutdownHook(hook);
} catch (Exception e1) {
e.addSuppressed(e1);
}
throw e;
} finally {
deleteScratchDirectory();
Runtime.getRuntime().removeShutdownHook(hook);
stopMemoryMonitor();
result = erm.close(exception);
logException(exception);
Expand Down Expand Up @@ -308,6 +322,7 @@ private void privateCheck() throws Exception {
if (toolParams != null) {
toolParams.updateParams(getParams());
}
check();
}

/**
Expand Down
Loading

0 comments on commit ab5d880

Please sign in to comment.