diff --git a/cgatcore/pipeline/__init__.py b/cgatcore/pipeline/__init__.py index de0fcf2b..7c3828e6 100644 --- a/cgatcore/pipeline/__init__.py +++ b/cgatcore/pipeline/__init__.py @@ -5,9 +5,53 @@ ============================================= This module provides a comprehensive set of tools to facilitate the creation and management -of data processing pipelines using CGAT Ruffus. It includes functionalities for pipeline control, -logging, parameterization, task execution, database uploads, temporary file management, and -integration with AWS S3. +of data processing pipelines using CGAT Ruffus. It includes functionalities for: + +1. Pipeline Control + - Task execution and dependency management + - Command-line interface for pipeline operations + - Logging and error handling + +2. Resource Management + - Cluster job submission and monitoring + - Memory and CPU allocation + - Temporary file handling + +3. Configuration + - Parameter management via YAML configuration + - Cluster settings customization + - Pipeline state persistence + +4. Cloud Integration + - AWS S3 support for input/output files + - Cloud-aware pipeline decorators + - Remote file handling + +Example Usage +------------ +A basic pipeline using local files: + +.. code-block:: python + + from cgatcore import pipeline as P + + # Standard pipeline task + @P.transform("input.txt", suffix(".txt"), ".processed") + def process_local_file(infile, outfile): + # Processing logic here + pass + +Using S3 integration: + +.. code-block:: python + + # S3-aware pipeline task + @P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed") + def process_s3_file(infile, outfile): + # Processing logic here + pass + +For detailed documentation, see: https://cgat-core.readthedocs.io/ """ diff --git a/cgatcore/pipeline/cluster.py b/cgatcore/pipeline/cluster.py index cfc293a9..ab27919e 100644 --- a/cgatcore/pipeline/cluster.py +++ b/cgatcore/pipeline/cluster.py @@ -1,15 +1,54 @@ -'''cluster.py - cluster utility functions for ruffus pipelines -============================================================== - -This module abstracts the DRMAA native specification and provides -convenience functions for running Drmaa jobs. - -Currently SGE, SLURM, Torque and PBSPro are supported. - -Reference ---------- - -''' +""" +cluster.py - Cluster job management for CGAT pipelines +==================================================== + +This module provides functionality for submitting and managing jobs on various +cluster platforms (SLURM, SGE, PBS/Torque). It handles: + +1. Job Submission + - Resource allocation (memory, CPU cores) + - Queue selection and prioritization + - Job dependencies and scheduling + +2. Platform Support + - SLURM Workload Manager + - Sun Grid Engine (SGE) + - PBS/Torque + - Local execution (multiprocessing) + +3. Resource Management + - Memory limits and monitoring + - CPU allocation + - Job runtime constraints + - Temporary directory handling + +Configuration +------------ +Cluster settings can be configured in `.cgat.yml`: + +.. code-block:: yaml + + cluster: + queue_manager: slurm + queue: main + memory_resource: mem + memory_default: 4G + parallel_environment: dedicated + +Available Parameters +------------------ +- cluster_queue: Cluster queue to use (default: all.q) +- cluster_priority: Job priority (-10 to 10, default: -10) +- cluster_num_jobs: Maximum concurrent jobs (default: 100) +- cluster_memory_resource: Memory resource identifier +- cluster_memory_default: Default job memory (default: 4G) +- cluster_memory_ulimit: Enable memory limits via ulimit +- cluster_parallel_environment: Parallel environment name +- cluster_queue_manager: Queue management system +- cluster_tmpdir: Temporary directory location + +For detailed documentation, see: https://cgat-core.readthedocs.io/ +""" import re import math @@ -484,7 +523,6 @@ def get_native_specification(self, spec.append("-q {}".format(kwargs["queue"])) spec.append(kwargs.get("options", "")) - return spec def update_template(self, jt): diff --git a/cgatcore/pipeline/execution.py b/cgatcore/pipeline/execution.py index 5ae7e6ae..edf23a38 100644 --- a/cgatcore/pipeline/execution.py +++ b/cgatcore/pipeline/execution.py @@ -1,15 +1,62 @@ -"""execution.py - Job control for ruffus pipelines -========================================================= +""" +execution.py - Task execution for CGAT pipelines +============================================== + +This module handles the execution of pipeline tasks, providing support for: + +1. Job Execution + - Local execution via subprocess + - Cluster job submission + - Python function execution + - Container-based execution + +2. Resource Management + - Memory monitoring and limits + - CPU allocation + - Runtime constraints + - Working directory management + +3. Error Handling + - Job failure detection + - Retry mechanisms + - Error logging and reporting + - Clean-up procedures + +4. Execution Modes + - Synchronous (blocking) execution + - Asynchronous job submission + - Parallel task execution + - Dependency-aware scheduling -Session -------- +Usage Examples +------------- +1. Submit a command to the cluster: -This module manages a DRMAA session. :func:`start_session` -starts a session and :func:`close_session` closes it. +.. code-block:: python + + statement = "samtools sort input.bam -o output.bam" + job_options = "-l mem_free=4G" + job_threads = 4 + + execution.run(statement, + job_options=job_options, + job_threads=job_threads) + +2. Execute a Python function: + +.. code-block:: python + + def process_data(infile, outfile): + # Processing logic here + pass -Reference ---------- + execution.submit(module="my_module", + function="process_data", + infiles="input.txt", + outfiles="output.txt", + job_memory="4G") +For detailed documentation, see: https://cgat-core.readthedocs.io/ """ import collections diff --git a/docs/function_doc/pipeline.md b/docs/function_doc/pipeline.md index d9a4af32..09148a55 100644 --- a/docs/function_doc/pipeline.md +++ b/docs/function_doc/pipeline.md @@ -1,5 +1,129 @@ # CGATcore Pipeline Module +The `pipeline` module is the core component of CGAT-core, providing essential functionality for building and executing computational pipelines. + +## Core Functions + +### Pipeline Decorators + +```python +@transform(input_files, suffix(".input"), ".output") +def task_function(infile, outfile): + """Transform a single input file to an output file.""" + pass + +@merge(input_files, "output.txt") +def merge_task(infiles, outfile): + """Merge multiple input files into a single output.""" + pass + +@split(input_file, "*.split") +def split_task(infile, outfiles): + """Split a single input file into multiple outputs.""" + pass + +@follows(previous_task) +def dependent_task(): + """Execute after previous_task completes.""" + pass +``` + +### S3-Aware Decorators + +```python +@s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed") +def process_s3_file(infile, outfile): + """Process files directly from S3.""" + pass + +@s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt") +def merge_s3_files(infiles, outfile): + """Merge multiple S3 files.""" + pass +``` + +## Configuration Functions + +### Pipeline Setup +```python +# Initialize pipeline +pipeline.initialize(options) + +# Get pipeline parameters +params = pipeline.get_params() + +# Configure cluster execution +pipeline.setup_cluster() +``` + +### Resource Management +```python +# Set memory requirements +pipeline.set_job_memory("4G") + +# Set CPU requirements +pipeline.set_job_threads(4) + +# Configure temporary directory +pipeline.set_tmpdir("/path/to/tmp") +``` + +## Execution Functions + +### Running Tasks +```python +# Execute a command +pipeline.run("samtools sort input.bam") + +# Submit a Python function +pipeline.submit( + module="my_module", + function="process_data", + infiles="input.txt", + outfiles="output.txt" +) +``` + +### Job Control +```python +# Check job status +pipeline.is_running(job_id) + +# Wait for job completion +pipeline.wait_for_jobs() + +# Clean up temporary files +pipeline.cleanup() +``` + +## Error Handling + +```python +try: + pipeline.run("risky_command") +except pipeline.PipelineError as e: + pipeline.handle_error(e) +``` + +## Best Practices + +1. **Resource Management** + - Always specify memory and CPU requirements + - Use appropriate cluster queue settings + - Clean up temporary files + +2. **Error Handling** + - Implement proper error checking + - Use pipeline.log for logging + - Handle temporary file cleanup + +3. **Performance** + - Use appropriate chunk sizes for parallel processing + - Monitor resource usage + - Optimize cluster settings + +For more details, see the [Pipeline Overview](../pipeline_modules/overview.md) and [Writing Workflows](../defining_workflow/writing_workflows.md) guides. + ::: cgatcore.pipeline :members: :show-inheritance: diff --git a/docs/getting_started/examples.md b/docs/getting_started/examples.md index eae1aa75..65bd620a 100644 --- a/docs/getting_started/examples.md +++ b/docs/getting_started/examples.md @@ -228,4 +228,416 @@ When running the pipeline, make sure to specify `--no-cluster` as a command line - **Logs**: Check the log files generated during the pipeline run for detailed error messages. - **Support**: For further assistance, refer to the [CGAT-core documentation](https://cgat-developers.github.io/cgat-core/) or raise an issue on our [GitHub repository](https://github.com/cgat-developers/cgat-core/issues). +## CGAT-core Examples +This guide provides practical examples of CGAT-core pipelines for various use cases, from basic file processing to complex genomics workflows. + +## Quick Start Examples + +### Hello World Pipeline + +```python +"""hello_world.py - Simple CGAT pipeline example + +This pipeline demonstrates the basic structure of a CGAT pipeline: +1. Task definition +2. Pipeline flow +3. Command execution +""" + +from ruffus import * +from cgatcore import pipeline as P +import sys + +# ------------------------------------------------------------------------ +# Tasks +# ------------------------------------------------------------------------ +@originate("hello.txt") +def create_file(outfile): + """Create a simple text file.""" + statement = """echo "Hello, CGAT!" > %(outfile)s""" + P.run(statement) + +@transform(create_file, suffix(".txt"), ".upper.txt") +def convert_to_upper(infile, outfile): + """Convert text to uppercase.""" + statement = """cat %(infile)s | tr '[:lower:]' '[:upper:]' > %(outfile)s""" + P.run(statement) + +# ------------------------------------------------------------------------ +# Pipeline Running +# ------------------------------------------------------------------------ +if __name__ == "__main__": + sys.exit(P.main(sys.argv)) +``` + +### Configuration Example + +```yaml +# pipeline.yml +pipeline: + name: hello_world + author: Your Name + +# Cluster configuration +cluster: + queue_manager: slurm + queue: main + memory_resource: mem + memory_default: 1G +``` + +## Real-World Examples + +### 1. Genomics Pipeline + +This example demonstrates a typical RNA-seq analysis pipeline: + +```python +"""rnaseq_pipeline.py - RNA-seq analysis pipeline + +Features: +- FastQ quality control +- Read alignment +- Expression quantification +- Differential expression analysis +""" + +from ruffus import * +from cgatcore import pipeline as P +import logging as L +import sys +import os + +# ------------------------------------------------------------------------ +# Configuration +# ------------------------------------------------------------------------ +P.get_parameters([ + "%s/pipeline.yml" % os.path.splitext(__file__)[0], + "pipeline.yml"]) + +# ------------------------------------------------------------------------ +# Tasks +# ------------------------------------------------------------------------ +@transform("*.fastq.gz", suffix(".fastq.gz"), ".fastqc.done") +def run_fastqc(infile, outfile): + """Quality control of sequencing reads.""" + job_threads = 1 + job_memory = "2G" + + statement = """ + fastqc --outdir=fastqc %(infile)s && + touch %(outfile)s + """ + P.run(statement) + +@transform("*.fastq.gz", suffix(".fastq.gz"), ".bam") +def align_reads(infile, outfile): + """Align reads to reference genome.""" + job_threads = 8 + job_memory = "32G" + + statement = """ + STAR + --runThreadN %(job_threads)s + --genomeDir %(genome_dir)s + --readFilesIn %(infile)s + --readFilesCommand zcat + --outFileNamePrefix %(outfile)s + --outSAMtype BAM SortedByCoordinate + """ + P.run(statement) + +@transform(align_reads, suffix(".bam"), ".counts.tsv") +def count_features(infile, outfile): + """Count reads in genomic features.""" + job_threads = 4 + job_memory = "8G" + + statement = """ + featureCounts + -T %(job_threads)s + -a %(annotations)s + -o %(outfile)s + %(infile)s + """ + P.run(statement) + +@merge(count_features, "deseq2_results") +def run_deseq2(infiles, outfile): + """Differential expression analysis.""" + job_memory = "16G" + + statement = """ + Rscript scripts/run_deseq2.R + --counts=%(infiles)s + --design=%(design_file)s + --outdir=%(outfile)s + """ + P.run(statement) + +if __name__ == "__main__": + sys.exit(P.main(sys.argv)) +``` + +### 2. Data Processing Pipeline + +Example of a data processing pipeline with S3 integration: + +```python +"""data_pipeline.py - Data processing with S3 integration + +Features: +- S3 input/output +- Parallel processing +- Error handling +- Resource management +""" + +from ruffus import * +from cgatcore import pipeline as P +import logging as L +import sys +import os + +# ------------------------------------------------------------------------ +# Configuration +# ------------------------------------------------------------------------ +P.get_parameters([ + "%s/pipeline.yml" % os.path.splitext(__file__)[0], + "pipeline.yml"]) + +# Configure S3 +P.configure_s3() + +# ------------------------------------------------------------------------ +# Tasks +# ------------------------------------------------------------------------ +@P.s3_transform("s3://bucket/data/*.csv", + suffix(".csv"), + ".processed.csv") +def process_data(infile, outfile): + """Process CSV files from S3.""" + job_memory = "4G" + + statement = """ + python scripts/process_data.py + --input=%(infile)s + --output=%(outfile)s + --config=%(processing_config)s + """ + try: + P.run(statement) + except P.PipelineError as e: + L.error("Processing failed: %s" % e) + # Cleanup temporary files + cleanup_temp_files() + raise + finally: + # Always clean up + P.cleanup_tmpdir() + +@P.s3_merge(process_data, + "s3://bucket/results/report.html") +def create_report(infiles, outfile): + """Generate analysis report.""" + job_memory = "8G" + + statement = """ + python scripts/create_report.py + --input=%(infiles)s + --output=%(outfile)s + --template=%(report_template)s + """ + P.run(statement) + +if __name__ == "__main__": + sys.exit(P.main(sys.argv)) +``` + +### 3. Image Processing Pipeline + +Example of an image processing pipeline: + +```python +"""image_pipeline.py - Image processing pipeline + +Features: +- Batch image processing +- Feature extraction +- Analysis reporting +""" + +from ruffus import * +from cgatcore import pipeline as P +import sys +import os + +# ------------------------------------------------------------------------ +# Configuration +# ------------------------------------------------------------------------ +P.get_parameters([ + "%s/pipeline.yml" % os.path.splitext(__file__)[0], + "pipeline.yml"]) + +# ------------------------------------------------------------------------ +# Tasks +# ------------------------------------------------------------------------ +@transform("*.png", suffix(".png"), ".processed.png") +def preprocess_images(infile, outfile): + """Image preprocessing.""" + statement = """ + python scripts/preprocess.py + --input=%(infile)s + --output=%(outfile)s + --params=%(preprocessing_params)s + """ + P.run(statement) + +@transform(preprocess_images, + suffix(".processed.png"), + ".features.json") +def extract_features(infile, outfile): + """Feature extraction.""" + statement = """ + python scripts/extract_features.py + --input=%(infile)s + --output=%(outfile)s + --model=%(feature_model)s + """ + P.run(statement) + +@merge(extract_features, "analysis_report.html") +def analyze_results(infiles, outfile): + """Generate analysis report.""" + statement = """ + python scripts/analyze.py + --input=%(infiles)s + --output=%(outfile)s + --config=%(analysis_config)s + """ + P.run(statement) + +if __name__ == "__main__": + sys.exit(P.main(sys.argv)) +``` + +## Best Practices + +### 1. Resource Management + +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + """Example of proper resource management.""" + # Calculate memory based on input size + infile_size = os.path.getsize(infile) + job_memory = "%dG" % max(4, infile_size // (1024**3) + 2) + + # Set threads based on system + job_threads = min(4, os.cpu_count()) + + # Use temporary directory + tmpdir = P.get_temp_dir() + + statement = """ + samtools sort + -@ %(job_threads)s + -m %(job_memory)s + -T %(tmpdir)s/sort + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +### 2. Error Handling + +```python +@transform("*.txt", suffix(".txt"), ".processed") +def robust_processing(infile, outfile): + """Example of proper error handling.""" + try: + statement = """ + process_data %(infile)s > %(outfile)s + """ + P.run(statement) + except P.PipelineError as e: + L.error("Processing failed: %s" % e) + # Cleanup temporary files + cleanup_temp_files() + raise + finally: + # Always clean up + P.cleanup_tmpdir() +``` + +### 3. Configuration Management + +```yaml +# pipeline.yml - Example configuration + +# Pipeline metadata +pipeline: + name: example_pipeline + version: 1.0.0 + author: Your Name + +# Input/Output +io: + input_dir: /path/to/input + output_dir: /path/to/output + temp_dir: /tmp/pipeline + +# Processing parameters +processing: + threads: 4 + memory: 8G + chunk_size: 1000 + +# Cluster configuration +cluster: + queue_manager: slurm + queue: main + memory_resource: mem + parallel_environment: smp + max_jobs: 100 + +# S3 configuration (if needed) +s3: + bucket: my-pipeline-bucket + region: us-west-2 + transfer: + multipart_threshold: 8388608 + max_concurrency: 10 +``` + +## Running the Examples + +1. **Setup Configuration** + ```bash + # Copy and edit pipeline configuration + cp pipeline.yml.example pipeline.yml + ``` + +2. **Run Pipeline** + ```bash + # Show pipeline tasks + python pipeline.py show full + + # Run specific task + python pipeline.py make task_name + + # Run entire pipeline + python pipeline.py make full + ``` + +3. **Cluster Execution** + ```bash + # Run on cluster + python pipeline.py make full --cluster-queue=main + ``` + +For more information, see: +- [Pipeline Overview](../pipeline_modules/overview.md) +- [Cluster Configuration](../pipeline_modules/cluster.md) +- [S3 Integration](../s3_integration/configuring_s3.md) diff --git a/docs/getting_started/tutorial.md b/docs/getting_started/tutorial.md index 2c13fc1f..0d443090 100644 --- a/docs/getting_started/tutorial.md +++ b/docs/getting_started/tutorial.md @@ -78,6 +78,138 @@ cgatshowcase transdiffexprs make build_report -v 5 --no-cluster This will generate a `MultiQC` report in the folder `MultiQC_report.dir/` and an `Rmarkdown` report in `R_report.dir/`. +## Core Concepts + +### Pipeline Structure + +A CGAT pipeline typically consists of: +1. **Tasks**: Individual processing steps +2. **Dependencies**: Relationships between tasks +3. **Configuration**: Pipeline settings +4. **Execution**: Running the pipeline + +### Task Types + +1. **@transform**: One-to-one file transformation +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + pass +``` + +2. **@merge**: Many-to-one operation +```python +@merge("*.counts", "final_counts.txt") +def merge_counts(infiles, outfile): + pass +``` + +3. **@split**: One-to-many operation +```python +@split("input.txt", "chunk_*.txt") +def split_file(infile, outfiles): + pass +``` + +### Resource Management + +Control resource allocation: +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + job_memory = "8G" + job_threads = 4 + statement = """ + samtools sort -@ %(job_threads)s -m %(job_memory)s + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +### Error Handling + +Implement robust error handling: +```python +try: + P.run(statement) +except P.PipelineError as e: + L.error("Task failed: %s" % e) + raise +``` + +## Advanced Topics + +### 1. Pipeline Parameters + +Access configuration parameters: +```python +# Get parameter with default +threads = PARAMS.get("threads", 1) + +# Required parameter +input_dir = PARAMS["input_dir"] +``` + +### 2. Logging + +Use the logging system: +```python +# Log information +L.info("Processing %s" % infile) + +# Log warnings +L.warning("Low memory condition") + +# Log errors +L.error("Task failed: %s" % e) +``` + +### 3. Temporary Files + +Manage temporary files: +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + # Get temp directory + tmpdir = P.get_temp_dir() + + statement = """ + samtools sort -T %(tmpdir)s/sort + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +## Best Practices + +1. **Code Organization** + - Use clear task names + - Group related tasks + - Document pipeline steps + +2. **Resource Management** + - Set appropriate memory/CPU requirements + - Use temporary directories + - Clean up intermediate files + +3. **Error Handling** + - Implement proper error checking + - Use informative error messages + - Clean up on failure + +4. **Documentation** + - Add docstrings to tasks + - Document configuration options + - Include usage examples + +## Next Steps + +- Review the [Examples](examples.md) section +- Learn about [Cluster Configuration](../pipeline_modules/cluster.md) +- Explore [Cloud Integration](../s3_integration/configuring_s3.md) + +For more advanced topics, see the [Pipeline Modules](../pipeline_modules/overview.md) documentation. + ## Conclusion This completes the tutorial for running the `transdiffexprs` pipeline for `cgat-showcase`. We hope you find it as useful as we do for writing workflows in Python. \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 8fdc1aec..ca539ad0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -4,7 +4,73 @@ ![Conda](https://img.shields.io/conda/v/bioconda/cgatcore.svg) ![Build Status](https://github.com/cgat-developers/cgat-core/actions/workflows/cgatcore_python.yml/badge.svg) -Welcome to the CGAT-core documentation! CGAT-core is a workflow management system designed to support the rapid development of scalable, reproducible data analysis pipelines. It is built upon a flexible and user-friendly set of libraries and functions tailored for large-scale data analysis. +Welcome to the CGAT-core documentation! CGAT-core is a powerful Python framework for building and executing computational pipelines, with robust support for cluster environments and cloud integration. + +## Key Features + +- **Pipeline Management**: Build and execute complex computational workflows using Ruffus +- **Cluster Integration**: Support for multiple cluster platforms (SLURM, SGE, PBS/Torque) +- **Cloud Support**: Native integration with AWS S3, Google Cloud, and Azure +- **Resource Management**: Intelligent handling of compute resources and job distribution +- **Container Support**: Execute pipeline tasks in containers for reproducibility + +## Getting Started + +1. [Installation Guide](getting_started/installation.md) + - System requirements + - Installation methods + - Verification steps + +2. [Tutorial](getting_started/tutorial.md) + - Basic pipeline concepts + - Running your first pipeline + - Troubleshooting tips + +3. [Examples](getting_started/examples.md) + - Common use cases + - Pipeline patterns + - Best practices + +## Core Components + +### Pipeline Development + +- [Writing Workflows](defining_workflow/writing_workflows.md): Create custom pipeline workflows +- [Run Parameters](getting_started/run_parameters.md): Configure pipeline execution +- [Pipeline Modules](pipeline_modules/overview.md): Core pipeline components + +### Execution Environments + +- [Cluster Configuration](pipeline_modules/cluster.md): Set up cluster execution +- [Container Support](container/whole_pipeline.md): Run pipelines in containers +- [Cloud Integration](s3_integration/configuring_s3.md): Work with cloud storage + +### Advanced Features + +- [Parameter Management](pipeline_modules/parameters.md): Handle pipeline parameters +- [Execution Control](pipeline_modules/execution.md): Manage task execution +- [Database Integration](pipeline_modules/database.md): Work with databases + +## Project Information + +- [How to Contribute](project_info/how_to_contribute.md) +- [Citations](project_info/citations.md) +- [License](project_info/license.md) +- [FAQ](project_info/faq.md) + +## Additional Resources + +- [API Documentation](function_doc/pipeline.md) +- [GitHub Repository](https://github.com/cgat-developers/cgat-core) +- [Issue Tracker](https://github.com/cgat-developers/cgat-core/issues) + +## Need Help? + +If you need help or have questions: + +1. Check our [FAQ](project_info/faq.md) +2. Search existing [GitHub Issues](https://github.com/cgat-developers/cgat-core/issues) +3. Create a new issue if your problem isn't already addressed ## Overview diff --git a/docs/pipeline_modules/cluster.md b/docs/pipeline_modules/cluster.md index 2e2f7fff..6ab7cfec 100644 --- a/docs/pipeline_modules/cluster.md +++ b/docs/pipeline_modules/cluster.md @@ -154,6 +154,283 @@ def get_queue_manager(queue_manager, *args, **kwargs): raise ValueError("Queue manager {} not supported".format(queue_manager)) ``` +## Cluster Configuration + +CGAT-core provides robust support for executing pipelines on various cluster platforms, including SLURM, SGE, and PBS/Torque. + +### Supported Platforms + +1. **SLURM Workload Manager** + - Modern, scalable cluster manager + - Extensive resource control + - Fair-share scheduling + +2. **Sun Grid Engine (SGE)** + - Traditional cluster system + - Wide deployment base + - Flexible job control + +3. **PBS/Torque** + - Professional batch system + - Advanced scheduling + - Resource management + +### Configuration + +#### Basic Setup + +Create `.cgat.yml` in your home directory: + +```yaml +cluster: + # Queue manager type (slurm, sge, pbspro, torque) + queue_manager: slurm + + # Default queue + queue: main + + # Memory resource identifier + memory_resource: mem + + # Default memory per job + memory_default: 4G + + # Parallel environment + parallel_environment: dedicated + + # Maximum concurrent jobs + max_jobs: 100 + + # Job priority + priority: 0 +``` + +#### Platform-Specific Configuration + +##### SLURM Configuration +```yaml +cluster: + queue_manager: slurm + options: --time=00:10:00 --cpus-per-task=8 --mem=1G + queue: main + memory_resource: mem + parallel_environment: dedicated +``` + +##### SGE Configuration +```yaml +cluster: + queue_manager: sge + options: -l h_rt=00:10:00 + queue: all.q + memory_resource: h_vmem + parallel_environment: smp +``` + +##### PBS/Torque Configuration +```yaml +cluster: + queue_manager: torque + options: -l walltime=00:10:00 -l nodes=1:ppn=8 + queue: batch + memory_resource: mem + parallel_environment: dedicated +``` + +## Resource Management + +### Memory Allocation + +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + """Sort BAM file with specific memory requirements.""" + job_memory = "8G" + job_threads = 4 + + statement = """ + samtools sort + -@ %(job_threads)s + -m %(job_memory)s + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +### CPU Allocation + +```python +@transform("*.fa", suffix(".fa"), ".indexed") +def index_genome(infile, outfile): + """Index genome using multiple cores.""" + job_threads = 8 + + statement = """ + bwa index + -t %(job_threads)s + %(infile)s + """ + P.run(statement) +``` + +### Temporary Directory + +```python +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_with_temp(infile, outfile): + """Sort using specific temporary directory.""" + tmpdir = P.get_temp_dir() + + statement = """ + samtools sort + -T %(tmpdir)s/sort + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +## Advanced Configuration + +### Job Dependencies + +```python +@follows(previous_task) +@transform("*.txt", suffix(".txt"), ".processed") +def dependent_task(infile, outfile): + """Task that depends on previous_task completion.""" + P.run("process_file %(infile)s > %(outfile)s") +``` + +### Resource Scaling + +```python +@transform("*.bam", suffix(".bam"), ".stats") +def calculate_stats(infile, outfile): + """Scale resources based on input size.""" + infile_size = os.path.getsize(infile) + job_memory = "%dG" % max(4, infile_size // (1024**3) + 2) + job_threads = min(4, os.cpu_count()) + + statement = """ + samtools stats + -@ %(job_threads)s + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +### Queue Selection + +```python +@transform("*.big", suffix(".big"), ".processed") +def process_big_file(infile, outfile): + """Use specific queue for large jobs.""" + job_queue = "bigmem" + job_memory = "64G" + + statement = """ + process_large_file %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +## Best Practices + +### 1. Resource Specification + +- Always specify memory requirements +- Set appropriate number of threads +- Use queue-specific options wisely +- Consider input file sizes + +### 2. Error Handling + +```python +try: + P.run(statement) +except P.PipelineError as e: + L.error("Cluster job failed: %s" % e) + # Cleanup and resubmit if needed + cleanup_and_resubmit() +``` + +### 3. Performance Optimization + +- Group small tasks +- Use appropriate chunk sizes +- Monitor resource usage +- Clean up temporary files + +### 4. Monitoring + +```python +# Log resource usage +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def monitored_sort(infile, outfile): + """Monitor resource usage during sort.""" + job_memory = "8G" + job_threads = 4 + + statement = """ + { time samtools sort %(infile)s > %(outfile)s ; } 2> %(outfile)s.metrics + """ + P.run(statement) +``` + +## Troubleshooting + +### Common Issues + +1. **Job Failures** + - Check error logs + - Verify resource requirements + - Monitor cluster status + +2. **Resource Exhaustion** + - Adjust memory limits + - Check disk space + - Monitor CPU usage + +3. **Queue Issues** + - Verify queue availability + - Check user limits + - Monitor queue status + +### Debugging Tips + +1. **Enable Detailed Logging** + ```python + import logging + logging.basicConfig(level=logging.DEBUG) + ``` + +2. **Test Jobs Locally** + ```bash + python pipeline.py make task --local + ``` + +3. **Monitor Resource Usage** + ```bash + python pipeline.py make task --cluster-queue=main --debug + ``` + +## Security Considerations + +1. **Access Control** + - Use appropriate permissions + - Implement job quotas + - Monitor user activity + +2. **Data Protection** + - Secure temporary files + - Clean up job artifacts + - Protect sensitive data + +For more information, see: +- [SLURM Documentation](https://slurm.schedmd.com/) +- [SGE Documentation](http://gridscheduler.sourceforge.net/) +- [PBS Documentation](https://www.pbsworks.com/) + ## Notes - This module provides a unified interface for running cluster jobs across different cluster managers, allowing the user to switch between cluster types without rewriting job submission scripts. diff --git a/docs/pipeline_modules/overview.md b/docs/pipeline_modules/overview.md index a3363b92..fcd6ea26 100644 --- a/docs/pipeline_modules/overview.md +++ b/docs/pipeline_modules/overview.md @@ -1,40 +1,233 @@ # Pipeline Modules Overview -CGAT-core provides a comprehensive set of modules to facilitate the creation and management of data processing pipelines. These modules offer various functionalities, from pipeline control and execution to database management and file handling. +CGAT-core provides a comprehensive set of modules for building and executing computational pipelines. This document provides an overview of the core modules and their functionality. -## Available Modules +## Core Modules -1. [Control](control.md): Manages the overall pipeline execution flow. -2. [Database](database.md): Handles database operations and uploads. -3. [Files](files.md): Provides utilities for file management and temporary file handling. -4. [Cluster](cluster.md): Manages job submission and execution on compute clusters. -5. [Execution](execution.md): Handles task execution and logging. -6. [Utils](utils.md): Offers various utility functions for pipeline operations. -7. [Parameters](parameters.md): Manages pipeline parameters and configuration. +### 1. Pipeline Control (`control.py`) +- Pipeline initialization and configuration +- Command-line interface +- Parameter management +- Logging setup -## Integration with Ruffus +```python +from cgatcore import pipeline as P -CGAT-core builds upon the Ruffus pipeline library, extending its functionality and providing additional features. It includes the following Ruffus decorators: +# Initialize pipeline +P.initialize(argv) -- `@transform` -- `@merge` -- `@split` -- `@originate` -- `@follows` -- `@suffix` +# Get parameters +PARAMS = P.get_parameters() -These decorators can be used to define pipeline tasks and their dependencies. +# Setup logging +L = P.get_logger() +``` -## S3 Integration +### 2. Task Execution (`execution.py`) +- Job submission and monitoring +- Resource management +- Error handling +- Cleanup procedures -CGAT-core also provides S3-aware decorators and functions for seamless integration with AWS S3: +```python +# Run command +P.run("samtools sort input.bam") -- `@s3_transform` -- `@s3_merge` -- `@s3_split` -- `@s3_originate` -- `@s3_follows` +# Submit Python function +P.submit(module="my_module", + function="process_data", + infiles="input.txt", + outfiles="output.txt") +``` -For more information on working with S3, see the [S3 Integration](../s3_integration/s3_pipeline.md) section. +### 3. Cluster Integration (`cluster.py`) +- Cluster job management +- Resource allocation +- Queue selection +- Job monitoring -By leveraging these modules and decorators, you can build powerful, scalable, and efficient data processing pipelines using CGAT-core. \ No newline at end of file +```python +# Configure cluster +P.setup_cluster() + +# Submit cluster job +statement = """samtools sort input.bam""" +job_threads = 4 +job_memory = "8G" +P.run(statement) +``` + +### 4. File Management (`files.py`) +- File path handling +- Temporary file management +- File type detection +- Pattern matching + +```python +# Get temporary directory +tmpdir = P.get_temp_dir() + +# Clean up temporary files +P.cleanup_tmpdir() +``` + +## Advanced Features + +### 1. Parameter Management (`parameters.py`) +- Configuration file parsing +- Parameter validation +- Default value handling +- Environment integration + +```python +# Load parameters +PARAMS = P.get_parameters([ + "pipeline.yml", + "cluster.yml" +]) + +# Access parameters +input_dir = PARAMS["input_dir"] +threads = PARAMS.get("threads", 1) +``` + +### 2. Database Integration (`database.py`) +- SQLite database support +- Table creation and updates +- Query execution +- Result caching + +```python +# Connect to database +db = P.connect() + +# Execute query +results = db.execute("SELECT * FROM stats") +``` + +### 3. Cloud Integration (`s3_integration.py`) +- AWS S3 support +- Cloud storage access +- File transfer +- Credential management + +```python +# Configure S3 +P.configure_s3() + +# Use S3-aware decorators +@P.s3_transform("s3://bucket/input.txt", + suffix(".txt"), ".processed") +def process_s3_file(infile, outfile): + pass +``` + +## Pipeline Development + +### 1. Task Definition + +```python +@transform("*.fastq.gz", suffix(".fastq.gz"), ".bam") +def map_reads(infile, outfile): + """Map reads to reference genome.""" + job_threads = 4 + job_memory = "8G" + + statement = """ + bwa mem -t %(job_threads)s + reference.fa + %(infile)s > %(outfile)s + """ + P.run(statement) +``` + +### 2. Pipeline Flow + +```python +@follows(map_reads) +@transform("*.bam", suffix(".bam"), ".sorted.bam") +def sort_bam(infile, outfile): + """Sort BAM files.""" + statement = """ + samtools sort %(infile)s > %(outfile)s + """ + P.run(statement) + +@follows(sort_bam) +@merge("*.sorted.bam", "final_report.txt") +def create_report(infiles, outfile): + """Generate final report.""" + statement = """ + multiqc . -o %(outfile)s + """ + P.run(statement) +``` + +## Best Practices + +### 1. Code Organization +- Group related tasks +- Use meaningful task names +- Document pipeline steps +- Implement error handling + +### 2. Resource Management +- Specify memory requirements +- Set appropriate thread counts +- Use temporary directories +- Clean up intermediate files + +### 3. Error Handling +```python +try: + P.run(statement) +except P.PipelineError as e: + L.error("Task failed: %s" % e) + # Implement recovery or cleanup + cleanup_and_notify() +``` + +### 4. Documentation +- Add docstrings to tasks +- Document configuration options +- Include usage examples +- Maintain README files + +## Pipeline Examples + +### Basic Pipeline +```python +"""Example pipeline demonstrating core functionality.""" + +from cgatcore import pipeline as P +import logging as L + +# Initialize +P.initialize() + +@transform("*.txt", suffix(".txt"), ".processed") +def process_files(infile, outfile): + """Process input files.""" + statement = """ + process_data %(infile)s > %(outfile)s + """ + P.run(statement) + +@follows(process_files) +@merge("*.processed", "report.txt") +def create_report(infiles, outfile): + """Generate summary report.""" + statement = """ + cat %(infiles)s > %(outfile)s + """ + P.run(statement) + +if __name__ == "__main__": + P.main() +``` + +For more detailed information about specific modules, see: +- [Cluster Configuration](cluster.md) +- [Task Execution](execution.md) +- [Parameter Management](parameters.md) +- [Database Integration](database.md) \ No newline at end of file diff --git a/docs/project_info/faq.md b/docs/project_info/faq.md index 0460897f..82eb78ac 100644 --- a/docs/project_info/faq.md +++ b/docs/project_info/faq.md @@ -1,5 +1,123 @@ -# FAQs +# Frequently Asked Questions (FAQ) -As our workflow develops, we will add frequently asked questions here. +## General Questions -In the meantime, please add issues to the [GitHub page](https://github.com/cgat-developers/cgat-core/issues). \ No newline at end of file +### What is CGAT-core? +CGAT-core is a Python framework for building and executing computational pipelines, particularly suited for bioinformatics and data analysis workflows. It provides robust support for cluster environments and cloud integration. + +### Why use CGAT-core instead of other workflow managers? +- Built on the proven Ruffus framework +- Native support for multiple cluster platforms +- Integrated cloud storage support +- Extensive resource management capabilities +- Container support for reproducibility + +### What platforms are supported? +- **Operating Systems**: Linux, macOS +- **Cluster Systems**: SLURM, SGE, PBS/Torque +- **Cloud Platforms**: AWS S3, Google Cloud, Azure + +## Installation + +### What are the system requirements? +- Python 3.7 or later +- Compatible cluster system (optional) +- Sufficient disk space for pipeline execution +- Memory requirements depend on specific pipeline needs + +### How do I install CGAT-core? +```bash +# Using pip +pip install cgatcore + +# Using conda +conda install -c bioconda cgatcore +``` + +### How do I verify my installation? +```bash +# Check installation +cgat --help + +# Run test pipeline +cgat showcase make all +``` + +## Pipeline Development + +### How do I create a new pipeline? +1. Create a new Python file +2. Import required modules +3. Define pipeline tasks using decorators +4. Add command-line interface + +Example: +```python +from cgatcore import pipeline as P + +@P.transform("*.txt", suffix(".txt"), ".processed") +def process_files(infile, outfile): + # Processing logic here + pass + +if __name__ == "__main__": + P.main() +``` + +### How do I configure cluster settings? +Create a `.cgat.yml` file in your home directory: +```yaml +cluster: + queue_manager: slurm + queue: main + memory_resource: mem + memory_default: 4G +``` + +## Troubleshooting + +### Common Issues + +#### Pipeline fails with memory error +- Check available memory with `free -h` +- Adjust memory settings in `.cgat.yml` +- Use `job_memory` parameter in task decorators + +#### Cluster jobs not starting +- Verify cluster configuration +- Check queue availability +- Ensure proper permissions + +#### Temporary files filling disk space +- Set appropriate tmpdir in configuration +- Enable automatic cleanup +- Monitor disk usage during execution + +### Getting Help + +1. Check documentation at [readthedocs](https://cgat-core.readthedocs.io/) +2. Search [GitHub Issues](https://github.com/cgat-developers/cgat-core/issues) +3. Join our community discussions +4. Create a new issue with: + - System information + - Error messages + - Minimal reproducible example + +## Best Practices + +### Resource Management +- Always specify memory requirements +- Use appropriate number of threads +- Clean up temporary files + +### Error Handling +- Implement proper error checking +- Use logging effectively +- Handle cleanup in failure cases + +### Performance +- Optimize chunk sizes +- Monitor resource usage +- Use appropriate cluster settings + +For more detailed information, refer to our [documentation](https://cgat-core.readthedocs.io/). \ No newline at end of file diff --git a/docs/s3_integration/configuring_s3.md b/docs/s3_integration/configuring_s3.md index 75126a7c..5bd05c87 100644 --- a/docs/s3_integration/configuring_s3.md +++ b/docs/s3_integration/configuring_s3.md @@ -1,90 +1,219 @@ -# Configuring S3 for Pipeline Execution +# Configuring AWS S3 Integration -To integrate AWS S3 into your CGAT pipeline, you need to configure S3 access to facilitate file handling for reading and writing data. This document explains how to set up S3 configuration for the CGAT pipelines. +CGAT-core provides native support for working with AWS S3 storage, allowing pipelines to read from and write to S3 buckets seamlessly. -## Overview +## Prerequisites -`configure_s3()` is a utility function provided by the CGATcore pipeline tools to handle authentication and access to AWS S3. This function allows you to provide credentials, specify regions, and set up other configurations that enable seamless integration of S3 into your workflow. +1. **AWS Account Setup** + - Active AWS account + - IAM user with S3 access + - Access key and secret key -### Basic Configuration +2. **Required Packages** + ```bash + pip install boto3 + pip install cgatcore[s3] + ``` -To get started, you will need to import and use the `configure_s3()` function. Here is a basic example: +## Configuration -```python -from cgatcore.pipeline import configure_s3 +### 1. AWS Credentials + +Configure AWS credentials using one of these methods: + +#### a. Environment Variables +```bash +export AWS_ACCESS_KEY_ID='your_access_key' +export AWS_SECRET_ACCESS_KEY='your_secret_key' +export AWS_DEFAULT_REGION='your_region' +``` + +#### b. AWS Credentials File +Create `~/.aws/credentials`: +```ini +[default] +aws_access_key_id = your_access_key +aws_secret_access_key = your_secret_key +region = your_region +``` -configure_s3(aws_access_key_id="YOUR_AWS_ACCESS_KEY", aws_secret_access_key="YOUR_AWS_SECRET_KEY") +#### c. Pipeline Configuration +In `pipeline.yml`: +```yaml +s3: + access_key: your_access_key + secret_key: your_secret_key + region: your_region + bucket: your_default_bucket ``` -### Configurable Parameters +### 2. S3 Pipeline Configuration + +Configure S3-specific settings in `pipeline.yml`: +```yaml +s3: + # Default bucket for pipeline + bucket: my-pipeline-bucket + + # Temporary directory for downloaded files + local_tmpdir: /tmp/s3_cache + + # File transfer settings + transfer: + multipart_threshold: 8388608 # 8MB + max_concurrency: 10 + multipart_chunksize: 8388608 # 8MB + + # Retry configuration + retry: + max_attempts: 5 + mode: standard +``` -- **`aws_access_key_id`**: Your AWS access key, used to authenticate and identify the user. -- **`aws_secret_access_key`**: Your secret key, corresponding to your access key. -- **`region_name`** (optional): AWS region where your S3 bucket is located. Defaults to the region set in your environment, if available. -- **`profile_name`** (optional): Name of the AWS profile to use if you have multiple profiles configured locally. +## Usage Examples -### Using AWS Profiles +### 1. Basic S3 Operations -If you have multiple AWS profiles configured locally, you can use the `profile_name` parameter to select the appropriate one without hardcoding the access keys in your code: +#### Reading from S3 +```python +from cgatcore import pipeline as P +@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed") +def process_s3_file(infile, outfile): + """Process a file from S3.""" + statement = """ + cat %(infile)s | process_data > %(outfile)s + """ + P.run(statement) +``` + +#### Writing to S3 ```python -configure_s3(profile_name="my-profile") +@P.s3_transform("input.txt", suffix(".txt"), + "s3://bucket/output.processed") +def write_to_s3(infile, outfile): + """Write results to S3.""" + statement = """ + process_data %(infile)s > %(outfile)s + """ + P.run(statement) ``` -### Configuring Endpoints +### 2. Advanced Operations -To use custom endpoints, such as when working with MinIO or an AWS-compatible service: +#### Working with Multiple Files +```python +@P.s3_merge(["s3://bucket/*.txt"], "s3://bucket/merged.txt") +def merge_s3_files(infiles, outfile): + """Merge multiple S3 files.""" + statement = """ + cat %(infiles)s > %(outfile)s + """ + P.run(statement) +``` +#### Conditional S3 Usage ```python -configure_s3( - aws_access_key_id="YOUR_AWS_ACCESS_KEY", - aws_secret_access_key="YOUR_AWS_SECRET_KEY", - endpoint_url="https://custom-endpoint.com" -) +@P.transform("*.txt", suffix(".txt"), + P.s3_path_if("use_s3", ".processed")) +def conditional_s3(infile, outfile): + """Use S3 based on configuration.""" + statement = """ + process_data %(infile)s > %(outfile)s + """ + P.run(statement) ``` -### Security Recommendations +## Best Practices -1. **Environment Variables**: Use environment variables to set credentials securely rather than hardcoding them in your scripts. This avoids potential exposure of credentials: - - ```bash - export AWS_ACCESS_KEY_ID=YOUR_AWS_ACCESS_KEY - export AWS_SECRET_ACCESS_KEY=YOUR_AWS_SECRET_KEY - ``` +### 1. Performance Optimization -2. **AWS IAM Roles**: If you are running the pipeline on AWS infrastructure (such as EC2 instances), it's recommended to use IAM roles. These roles provide temporary security credentials that are automatically rotated by AWS. +- **Batch Operations**: Group small files for transfers +- **Multipart Uploads**: Configure for large files +- **Concurrent Transfers**: Set appropriate concurrency +- **Local Caching**: Use temporary directory efficiently -### Example Pipeline Integration +```yaml +s3: + transfer: + multipart_threshold: 100_000_000 # 100MB + max_concurrency: 20 + multipart_chunksize: 10_000_000 # 10MB + local_tmpdir: /fast/local/disk/s3_cache +``` -After configuring S3, you can seamlessly use the S3-aware methods within your pipeline. Below is an example: +### 2. Cost Management + +- **Data Transfer**: Minimize cross-region transfers +- **Storage Classes**: Use appropriate storage tiers +- **Cleanup**: Remove temporary files +- **Lifecycle Rules**: Configure bucket lifecycle + +### 3. Error Handling ```python -from cgatcore.pipeline import get_s3_pipeline +@P.s3_transform("s3://bucket/input.txt", suffix(".txt"), ".processed") +def robust_s3_processing(infile, outfile): + """Handle S3 operations with proper error checking.""" + try: + statement = """ + process_data %(infile)s > %(outfile)s + """ + P.run(statement) + except P.S3Error as e: + L.error("S3 operation failed: %s" % e) + raise + finally: + # Clean up local temporary files + P.cleanup_tmpdir() +``` -# Configure S3 access -configure_s3(profile_name="my-profile") +## Troubleshooting -# Instantiate the S3 pipeline -s3_pipeline = get_s3_pipeline() +### Common Issues -# Use S3-aware methods in the pipeline -@s3_pipeline.s3_transform("s3://my-bucket/input.txt", suffix(".txt"), ".processed") -def process_s3_file(infile, outfile): - # Processing logic - with open(infile, 'r') as fin: - data = fin.read() - processed_data = data.upper() - with open(outfile, 'w') as fout: - fout.write(processed_data) +1. **Access Denied** + - Check AWS credentials + - Verify IAM permissions + - Ensure bucket policy allows access + +2. **Transfer Failures** + - Check network connectivity + - Verify file permissions + - Monitor transfer logs + +3. **Performance Issues** + - Adjust multipart settings + - Check network bandwidth + - Monitor memory usage + +### Debugging + +Enable detailed S3 logging: +```python +import logging +logging.getLogger('boto3').setLevel(logging.DEBUG) +logging.getLogger('botocore').setLevel(logging.DEBUG) ``` -### Summary +## Security Considerations + +1. **Credentials Management** + - Use IAM roles when possible + - Rotate access keys regularly + - Never commit credentials -- Use the `configure_s3()` function to set up AWS credentials and S3 access. -- Options are available to use IAM roles, profiles, or custom endpoints. -- Use the S3-aware decorators to integrate S3 files seamlessly in your pipeline. +2. **Data Protection** + - Enable bucket encryption + - Use HTTPS endpoints + - Configure appropriate bucket policies -## Additional Resources +3. **Access Control** + - Implement least privilege + - Use bucket policies + - Enable access logging -- [AWS IAM Roles Documentation](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) -- [AWS CLI Configuration and Credential Files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html) \ No newline at end of file +For more information, see: +- [AWS S3 Documentation](https://docs.aws.amazon.com/s3/) +- [Boto3 Documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) +- [CGAT Pipeline Examples](examples.md) \ No newline at end of file