Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6089 - Automation: Optimise Job management #2476

Merged
merged 44 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e6d3be2
catalog: add jobParentId and scheduledStartTime, #TASK-6171 #TASK-6089
pfurio May 6, 2024
0d1c470
master: use scheduledTime field, #TASK-6171 #TASK-6089
pfurio May 8, 2024
666a4c9
catalog: implement a way to kill jobs, #TASK-6171 #TASK-6089
pfurio May 8, 2024
2a04195
master: Allow grace kill for k8s jobs #TASK-6171 #TASK-6089
j-coll Jun 4, 2024
d4afa89
master: Allow grace kill for local jobs #TASK-6171 #TASK-6089
j-coll Jun 4, 2024
1304dc8
Merge branch 'develop' into TASK-6089
pfurio Jun 14, 2024
ab3a5ea
catalog: add priority and dry-run parameters to the WS, #TASK-6089
pfurio Jun 14, 2024
0c05c5c
app: use dryRun from Job, #TASK-6089
pfurio Jun 14, 2024
7c9b562
wip
pfurio Jun 17, 2024
ca96ff7
master: add first job scheduler implementation, #TASK-6089
pfurio Jun 18, 2024
6f988b2
master: change job scheduler rules, #TASK-6089
pfurio Jun 27, 2024
bd25b29
app: move migration folder to v3, #TASK-6089
pfurio Jun 27, 2024
59f9778
Merge branch 'develop' into TASK-6089
pfurio Jun 27, 2024
64e7f07
storage: Remote kill MR jobs on application kill. #TASK-6089
j-coll Jun 28, 2024
172aa78
Merge branch 'TASK-6089' of github.com:opencb/opencga into TASK-6089
j-coll Jun 28, 2024
d6d2bb2
catalog: add "kill job" junit test, #TASK-6089
pfurio Jul 1, 2024
7738218
Merge branch 'develop' into TASK-6089
j-coll Jul 1, 2024
7640eb2
master: Use bash instead of sh to exec jobs. #TASK-6089
j-coll Jul 1, 2024
ef98b0e
storage: Fix applicationId MR reported. #TASK-6089
j-coll Jul 1, 2024
5faf721
core: add job prefix to new query fields, #TASK-6089
pfurio Jul 2, 2024
3eec4d3
Merge branch 'TASK-6089' of github.com:opencb/opencga into TASK-6089
pfurio Jul 2, 2024
8f43346
catalog: fix kill condition, #TASK-6089
pfurio Jul 2, 2024
bbfe0f9
catalog: add new kill tests, #TASK-6089
pfurio Jul 2, 2024
83ef3aa
core: add priority and jobPriority params, #TASK-6089
pfurio Jul 3, 2024
d3f69e7
server: change scheduledStartTime to query param, #TASK-6089
pfurio Jul 3, 2024
5ec8c70
catalog: Allow reading logs from ABORTED jobs. #TASK-6089
j-coll Jul 3, 2024
52471f3
storage: Kill remote ssh process (if needed) with `kill -15` #TASK-6089
j-coll Jul 3, 2024
4b07633
pom: upgrade k8s fabric8 client library. #TASK-6089
j-coll Jul 3, 2024
fa20a6e
master: Register output files from killed jobs. #TASK-6089
j-coll Jul 3, 2024
3fa655b
master: Kill k8s jobs just once. Don't confirm untill fully terminate…
j-coll Jul 3, 2024
dc3546f
Merge branch 'TASK-6089' of github.com:opencb/opencga into TASK-6089
j-coll Jul 3, 2024
46f6d73
master: move code to checkPendingJobs method, #TASK-6089
pfurio Jul 3, 2024
f8157c5
master: Not all jobs without execution results should end up as ERROR…
j-coll Jul 3, 2024
15188d3
master: Fix junit tests. #TASK-6089
j-coll Jul 3, 2024
25de849
master: fix ExecutionDaemonTest tests, #TASK-6089
pfurio Jul 4, 2024
d4c83a8
Merge branch 'develop' into TASK-6089
pfurio Jul 8, 2024
657fb24
master: add dry run junit test, #TASK-6089
pfurio Jul 8, 2024
15157eb
Merge branch 'develop' into TASK-6089
pfurio Jul 8, 2024
074f37c
master: declare new testing dependencies, #TASK-6089
pfurio Jul 8, 2024
10645a9
Merge branch 'TASK-6089' of github.com:opencb/opencga into TASK-6089
pfurio Jul 8, 2024
3168314
master: Do not use exec to run jobs, as we need to create the DIND_DO…
j-coll Jul 9, 2024
4839a37
master: Add "trap TERM" to dind-docker container. #TASK-6089
j-coll Jul 9, 2024
c724cbd
master: change variable names, #TASK-6089
pfurio Jul 9, 2024
6ce3797
master: when dry run is added, use generic cli, #TASK-6089
pfurio Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,17 @@
import org.opencb.biodata.tools.alignment.BamUtils;
import org.opencb.cellbase.client.rest.CellBaseClient;
import org.opencb.cellbase.client.rest.GeneClient;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.StorageManager;
import org.opencb.opencga.analysis.models.FileInfo;
import org.opencb.opencga.analysis.models.StudyInfo;
import org.opencb.opencga.analysis.tools.ToolRunner;
import org.opencb.opencga.catalog.db.api.FileDBAdaptor;
import org.opencb.opencga.catalog.db.api.ProjectDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.CatalogManager;
import org.opencb.opencga.catalog.utils.CatalogFqn;
import org.opencb.opencga.catalog.utils.ParamUtils;
import org.opencb.opencga.core.api.ParamConstants;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.alignment.AlignmentIndexParams;
import org.opencb.opencga.core.models.alignment.CoverageIndexParams;
import org.opencb.opencga.core.models.JwtPayload;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.project.Project;
Expand All @@ -70,6 +64,7 @@ public class AlignmentStorageManager extends StorageManager {

private AlignmentStorageEngine alignmentStorageEngine;
private String jobId;
private boolean dryRun;

private static final Map<String, String> statsMap = new HashMap<>();

Expand All @@ -82,12 +77,13 @@ public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFacto
initStatsMap();
}

public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFactory storageEngineFactory, String jobId) {
public AlignmentStorageManager(CatalogManager catalogManager, StorageEngineFactory storageEngineFactory, String jobId, boolean dryRun) {
super(catalogManager, storageEngineFactory);

// TODO: Create this alignmentStorageEngine by reflection
this.alignmentStorageEngine = new LocalAlignmentStorageEngine();
this.jobId = jobId;
this.dryRun = dryRun;

initStatsMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> flagStatsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentFlagStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
flagStatsJobId = flagStatsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + flagStatsJobId + " to compute stats (" + AlignmentFlagStatsAnalysis.ID
+ ")");
Expand All @@ -137,7 +138,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> statsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
statsJobId = statsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + statsJobId + " to compute stats (" + AlignmentStatsAnalysis.ID + ")");
}
Expand All @@ -154,7 +156,7 @@ protected void run() throws ToolException {
OpenCGAResult<Job> fastQcMetricsJobResult = catalogManager.getJobManager()
.submit(study, AlignmentFastQcMetricsAnalysis.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
token);
getJobId(), null, false, token);
fastQcMetricsJobId = fastQcMetricsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + fastQcMetricsJobId + " to compute FastQC metrics ("
+ AlignmentFastQcMetricsAnalysis.ID + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collections;

@Tool(id = ClinicalTsvAnnotationLoader.ID, resource = Enums.Resource.CLINICAL_ANALYSIS, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class ClinicalTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "clinical-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = AuxiliarRgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION,
description = AuxiliarRgaAnalysis.DESCRIPTION)
description = AuxiliarRgaAnalysis.DESCRIPTION, priority = Enums.Priority.HIGH)
public class AuxiliarRgaAnalysis extends OperationTool {
public final static String ID = "rga-aux-index";
public final static String DESCRIPTION = ParamConstants.INDEX_AUXILIAR_COLLECTION_DESCRIPTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = RgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION, description = "Index RGA study.")
@Tool(id = RgaAnalysis.ID, resource = Enums.Resource.RGA, type = Tool.Type.OPERATION, description = "Index RGA study.",
priority = Enums.Priority.HIGH)
public class RgaAnalysis extends OperationTool {
public final static String ID = "rga-index";
public final static String DESCRIPTION = "Generate Recessive Gene Analysis secondary index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = CohortTsvAnnotationLoader.ID, resource = Enums.Resource.COHORT, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class CohortTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "cohort-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = FamilyTsvAnnotationLoader.ID, resource = Enums.Resource.FAMILY, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class FamilyTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "family-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.List;

@Tool(id = FetchAndRegisterTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = "Download an external file and register it in OpenCGA.")
description = "Download an external file and register it in OpenCGA.", priority = Enums.Priority.HIGH)
public class FetchAndRegisterTask extends OpenCgaToolScopeStudy {

public final static String ID = "files-fetch";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
import java.io.IOException;
import java.util.*;

@Tool(id = FileDeleteTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Delete files.")
@Tool(id = FileDeleteTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Delete files.",
priority = Enums.Priority.HIGH)
public class FileDeleteTask extends OpenCgaTool {

public final static String ID = "files-delete";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.util.List;
import java.util.Map;

@Tool(id = FileLinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = FileLinkTask.DESCRIPTION)
@Tool(id = FileLinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = FileLinkTask.DESCRIPTION,
priority = Enums.Priority.HIGH)
public class FileLinkTask extends OpenCgaToolScopeStudy {

public static final String ID = "file-link";
Expand Down Expand Up @@ -69,7 +70,7 @@ protected void run() throws Exception {
Job postLinkJob = catalogManager.getJobManager()
.submit(getStudy(), PostLinkSampleAssociation.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
getToken()).first();
getJobId(), null, false, getToken()).first();
logger.info("Submit post-link job : " + postLinkJob.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Collections;

@Tool(id = FileTsvAnnotationLoader.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class FileTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "file-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@

import java.util.*;

@Tool(id = FileUnlinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Unlink files.")
@Tool(id = FileUnlinkTask.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION, description = "Unlink files.",
priority = Enums.Priority.HIGH)
public class FileUnlinkTask extends OpenCgaTool {

public final static String ID = "files-unlink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.*;

@Tool(id = PostLinkSampleAssociation.ID, resource = Enums.Resource.FILE, type = Tool.Type.OPERATION,
description = PostLinkSampleAssociation.DESCRIPTION)
description = PostLinkSampleAssociation.DESCRIPTION, priority = Enums.Priority.HIGH)
public class PostLinkSampleAssociation extends OpenCgaToolScopeStudy {

public static final String ID = "postlink";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = IndividualTsvAnnotationLoader.ID, resource = Enums.Resource.INDIVIDUAL, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class IndividualTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "individual-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opencb.opencga.core.tools.annotations.Tool;

@Tool(id = SampleTsvAnnotationLoader.ID, resource = Enums.Resource.SAMPLE, type = Tool.Type.OPERATION,
description = "Load annotations from TSV file.")
description = "Load annotations from TSV file.", priority = Enums.Priority.HIGH)
public class SampleTsvAnnotationLoader extends TsvAnnotationLoader {
public final static String ID = "sample-tsv-load";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> variantStatsJobResult = catalogManager.getJobManager()
.submit(study, SampleVariantStatsAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
variantStatsJobId = variantStatsJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + variantStatsJobId + " to compute stats (" + SampleVariantStatsAnalysis.ID
+ ")");
Expand Down Expand Up @@ -267,7 +268,8 @@ protected void run() throws ToolException {

OpenCGAResult<Job> signatureJobResult = catalogManager.getJobManager()
.submit(getStudy(), MutationalSignatureAnalysis.ID, Enums.Priority.MEDIUM, params, null, "Job generated by "
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), token);
+ getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(), getJobId(), null,
false, token);
signatureJobId = signatureJobResult.first().getId();
logger.info("Submitted job {} to compute the mutational signature analysis {}", signatureJobId,
MutationalSignatureAnalysis.ID);
Expand All @@ -289,7 +291,7 @@ protected void run() throws ToolException {
OpenCGAResult<Job> genomePlotJobResult = catalogManager.getJobManager()
.submit(getStudy(), GenomePlotAnalysis.ID, Enums.Priority.MEDIUM, params, null,
"Job generated by " + getId() + " - " + getJobId(), Collections.emptyList(), Collections.emptyList(),
token);
getJobId(), null, false, token);
genomePlotJobId = genomePlotJobResult.first().getId();
addEvent(Event.Type.INFO, "Submit job " + genomePlotJobId + " to compute genome plot (" + GenomePlotAnalysis.ID
+ ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.nio.file.Paths;

@Tool(id = TemplateRunner.ID, description = TemplateRunner.DESCRIPTION, type = Tool.Type.OPERATION, resource = Enums.Resource.STUDY,
scope = Tool.Scope.STUDY)
scope = Tool.Scope.STUDY, priority = Enums.Priority.HIGH)
public class TemplateRunner extends OperationTool {

public static final String ID = "templates";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class OpenCgaTool {

private String jobId;
private String opencgaHome;
private boolean dryRun;
protected String token;

protected final ObjectMap params;
Expand All @@ -92,19 +93,20 @@ public OpenCgaTool() {
}

public final OpenCgaTool setUp(String opencgaHome, CatalogManager catalogManager, StorageEngineFactory engineFactory,
ObjectMap params, Path outDir, String jobId, String token) {
ObjectMap params, Path outDir, String jobId, boolean dryRun, String token) {
VariantStorageManager manager = new VariantStorageManager(catalogManager, engineFactory);
return setUp(opencgaHome, catalogManager, manager, params, outDir, jobId, token);
return setUp(opencgaHome, catalogManager, manager, params, outDir, jobId, dryRun, token);
}

public final OpenCgaTool setUp(String opencgaHome, CatalogManager catalogManager, VariantStorageManager variantStorageManager,
ObjectMap params, Path outDir, String jobId, String token) {
ObjectMap params, Path outDir, String jobId, boolean dryRun, String token) {
this.opencgaHome = opencgaHome;
this.catalogManager = catalogManager;
this.configuration = catalogManager.getConfiguration();
this.variantStorageManager = variantStorageManager;
this.storageConfiguration = variantStorageManager.getStorageConfiguration();
this.jobId = jobId;
this.dryRun = dryRun;
this.token = token;
if (params != null) {
this.params.putAll(params);
Expand Down Expand Up @@ -243,10 +245,14 @@ public final ExecutionResult start() throws ToolException {
try {
currentStep = "check";
privateCheck();
check();
currentStep = null;
erm.setSteps(getSteps());
run();
if (dryRun) {
logger.info("Dry run enabled. Sleep for 5 seconds and skip execution.");
Thread.sleep(5000);
} else {
currentStep = null;
erm.setSteps(getSteps());
run();
}
} catch (ToolException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -308,6 +314,7 @@ private void privateCheck() throws Exception {
if (toolParams != null) {
toolParams.updateParams(getParams());
}
check();
j-coll marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Loading
Loading