Pipeline Documentation¶
The Pipeline module is designed to provide the user with easily callable and command line usable pipelines that allow orthology inference to be completed in a parallel fashion.
This module uses Luigi and SunGrid Engine (SGE) to distribute computational tasks across cluster nodes. Tasks are designed to run on clusters that use pbspro or SunGrid Engine.
Overview¶
The Pipeline module provides pre-configured pipeline tasks that can be executed in parallel on cluster computing systems. Currently, the module includes:
BlastPipelineTask: Runs BLAST searches in parallel across multiple nodes
TestPipelineTask: Example task for testing pipeline functionality
Examples¶
Running a Test Pipeline¶
The TestPipelineTask is a simple example that demonstrates how to
create and run a pipeline task:
import logging
import luigi
import os
from OrthoEvol.Tools.sge import SGEPipelineTask
from OrthoEvol.Pipeline.testpipelinetask import TestPipelineTask
# Configure SGE settings
SGEPipelineTask.shared_tmp_dir = os.getcwd()
SGEPipelineTask.parallel_env = None
# Create and run test tasks
tasks = [TestPipelineTask(i=str(i), select=i+1) for i in range(3)]
luigi.build(tasks, local_scheduler=True, workers=3)
Running a BLAST Pipeline¶
The BlastPipelineTask runs BLAST searches in parallel:
import logging
import luigi
import os
from OrthoEvol.Tools.sge import SGEPipelineTask
from OrthoEvol.Pipeline.blastpipeline import BlastPipelineTask
from OrthoEvol.Orthologs.Blast import OrthoBlastN
# Configure BLAST settings
blast_config = {
"taxon_file": None,
"go_list": None,
"post_blast": True,
"template": None,
"save_data": True,
"copy_from_package": True,
"MAF": 'test_blast.csv'
}
# Initialize BLAST instance
myblast = OrthoBlastN(
proj_mana=None,
project="sdh-test",
project_path=os.getcwd(),
**blast_config
)
# Configure SGE settings
logger = logging.getLogger('luigi-interface')
SGEPipelineTask.shared_tmp_dir = os.getcwd()
SGEPipelineTask.parallel_env = None
# Create and run BLAST tasks
path = os.getcwd()
accessions = myblast.acc_list[1:]
num_accs = len(accessions)
tasks = [
BlastPipelineTask(
path=path,
accessions=str(accessions),
select=i+1
) for i in range(num_accs)
]
luigi.build(tasks, local_scheduler=True, workers=num_accs)
Task Parameters¶
All pipeline tasks inherit from SGEPipelineTask and support the
following parameters:
select: Number of CPUs (slots) to allocate for the task (default: 3)
shared_tmp_dir: Shared drive accessible from all cluster nodes (default: ‘/home’)
parallel_env: SGE parallel environment name (default: ‘orte’)
job_name: Explicit job name for qsub
run_locally: Run locally instead of on the cluster (default: False)
Software Dependencies¶
Luigi: Workflow management library
SunGrid Engine (SGE): Job scheduler for cluster computing
pbspro: Alternative job scheduler (version 14.1.0 or higher)
Notes¶
Tasks should override the
work()method instead ofrun()for SGE executionUse
local_scheduler=Truefor local testing and debuggingSet
workersparameter to the number of parallel tasks you want to runEnsure Luigi is installed on all cluster nodes