Pipeline Documentation

The Pipeline module is designed to provide the user with easily callable and command line usable pipelines that allow orthology inference to be completed in a parallel fashion.

This module uses Luigi and SunGrid Engine (SGE) to distribute computational tasks across cluster nodes. Tasks are designed to run on clusters that use pbspro or SunGrid Engine.

Overview

The Pipeline module provides pre-configured pipeline tasks that can be executed in parallel on cluster computing systems. Currently, the module includes:

  • BlastPipelineTask: Runs BLAST searches in parallel across multiple nodes

  • TestPipelineTask: Example task for testing pipeline functionality

Examples

Running a Test Pipeline

The TestPipelineTask is a simple example that demonstrates how to create and run a pipeline task:

import logging
import luigi
import os
from OrthoEvol.Tools.sge import SGEPipelineTask
from OrthoEvol.Pipeline.testpipelinetask import TestPipelineTask

# Configure SGE settings
SGEPipelineTask.shared_tmp_dir = os.getcwd()
SGEPipelineTask.parallel_env = None

# Create and run test tasks
tasks = [TestPipelineTask(i=str(i), select=i+1) for i in range(3)]
luigi.build(tasks, local_scheduler=True, workers=3)

Running a BLAST Pipeline

The BlastPipelineTask runs BLAST searches in parallel:

import logging
import luigi
import os
from OrthoEvol.Tools.sge import SGEPipelineTask
from OrthoEvol.Pipeline.blastpipeline import BlastPipelineTask
from OrthoEvol.Orthologs.Blast import OrthoBlastN

# Configure BLAST settings
blast_config = {
    "taxon_file": None,
    "go_list": None,
    "post_blast": True,
    "template": None,
    "save_data": True,
    "copy_from_package": True,
    "MAF": 'test_blast.csv'
}

# Initialize BLAST instance
myblast = OrthoBlastN(
    proj_mana=None,
    project="sdh-test",
    project_path=os.getcwd(),
    **blast_config
)

# Configure SGE settings
logger = logging.getLogger('luigi-interface')
SGEPipelineTask.shared_tmp_dir = os.getcwd()
SGEPipelineTask.parallel_env = None

# Create and run BLAST tasks
path = os.getcwd()
accessions = myblast.acc_list[1:]
num_accs = len(accessions)
tasks = [
    BlastPipelineTask(
        path=path,
        accessions=str(accessions),
        select=i+1
    ) for i in range(num_accs)
]
luigi.build(tasks, local_scheduler=True, workers=num_accs)

Task Parameters

All pipeline tasks inherit from SGEPipelineTask and support the following parameters:

  • select: Number of CPUs (slots) to allocate for the task (default: 3)

  • shared_tmp_dir: Shared drive accessible from all cluster nodes (default: ‘/home’)

  • parallel_env: SGE parallel environment name (default: ‘orte’)

  • job_name: Explicit job name for qsub

  • run_locally: Run locally instead of on the cluster (default: False)

Software Dependencies

  • Luigi: Workflow management library

  • SunGrid Engine (SGE): Job scheduler for cluster computing

  • pbspro: Alternative job scheduler (version 14.1.0 or higher)

Notes

  • Tasks should override the work() method instead of run() for SGE execution

  • Use local_scheduler=True for local testing and debugging

  • Set workers parameter to the number of parallel tasks you want to run

  • Ensure Luigi is installed on all cluster nodes