IMPRESS Adaptive Pipeline - Protein Binding HPC Use Case¶
This documentation walks through a real-world, computationally intensive IMPRESS adaptive pipeline for protein binding analysis that runs on High-Performance Computing (HPC) systems with GPU requirements.
Use Case Overview¶
This example demonstrates adaptive optimization for AlphaFold protein structure analysis where: - Each protein requires at least 1 GPU for processing - Pipelines adaptively spawn children based on protein quality degradation - Underperforming proteins are migrated to new pipeline instances for re-optimization - Runs on HPC infrastructure (Purdue Anvil GPU cluster)
Adaptive Components Breakdown¶
1. Adaptive Criteria Function¶
async def adaptive_criteria(current_score: float, previous_score: float) -> bool:
"""
Determine if protein quality has degraded requiring pipeline migration.
"""
return current_score > previous_score
Adaptive Logic:
- Simple but effective: Higher scores indicate degraded protein quality
- Comparison-based: Evaluates current vs previous protein structure scores
- Migration trigger: Returns True
when quality degrades, triggering protein migration
2. Core Adaptive Decision Function¶
The main adaptive intelligence is implemented in adaptive_decision()
:
async def adaptive_decision(pipeline: ProteinBindingPipeline) -> Optional[Dict[str, Any]]:
MAX_SUB_PIPELINES: int = 3
sub_iter_seqs: Dict[str, str] = {}
# Read current scores from CSV output
file_name = f'af_stats_{pipeline.name}_pass_{pipeline.passes}.csv'
with open(file_name) as fd:
for line in fd.readlines()[1:]:
# Parse protein scores from AlphaFold output
name, *_, score_str = line.split(',')
protein = name.split('.')[0]
pipeline.current_scores[protein] = float(score_str)
Score Processing: - File-based communication: Reads AlphaFold statistics from CSV files - Dynamic score tracking: Updates current protein quality scores - Real-time evaluation: Processes actual computational results
3. Adaptive Migration Logic¶
# First pass — establish baseline
if not pipeline.previous_scores:
pipeline.logger.pipeline_log('Saving current scores as previous and returning')
pipeline.previous_scores = copy.deepcopy(pipeline.current_scores)
return
# Identify proteins that deteriorated
sub_iter_seqs = {}
for protein, curr_score in pipeline.current_scores.items():
if protein not in pipeline.iter_seqs:
continue
decision = await adaptive_criteria(curr_score, pipeline.previous_scores[protein])
if decision:
sub_iter_seqs[protein] = pipeline.iter_seqs.pop(protein) # Remove from current pipeline
Migration Decision Process:
- Baseline establishment: First pass saves scores as reference
- Protein evaluation: Each protein is individually assessed
- Selective migration: Only degraded proteins are moved to child pipelines
- Pipeline cleanup: Migrated proteins are removed from parent pipeline
4. Child Pipeline Creation and Resource Management¶
# Spawn new pipeline for underperforming proteins
if sub_iter_seqs and pipeline.sub_order < MAX_SUB_PIPELINES:
new_name: str = f"{pipeline.name}_sub{pipeline.sub_order + 1}"
pipeline.set_up_new_pipeline_dirs(new_name)
# Copy PDB files for migrated proteins
for protein in sub_iter_seqs:
src = f'{pipeline.output_path_af}/{protein}.pdb'
dst = f'{pipeline.base_path}/{new_name}_in/{protein}.pdb'
shutil.copyfile(src, dst)
# Configure child pipeline
new_config = {
'name': new_name,
'type': type(pipeline),
'adaptive_fn': adaptive_decision, # Recursive adaptivity
'config': {
'passes': pipeline.passes,
'iter_seqs': sub_iter_seqs, # Only degraded proteins
'seq_rank': pipeline.seq_rank + 1,
'sub_order': pipeline.sub_order + 1,
'previous_scores': copy.deepcopy(pipeline.previous_scores),
}
}
pipeline.submit_child_pipeline_request(new_config)
Resource and Data Management: - File system operations: Creates directories and copies PDB files for migrated proteins - Selective data transfer: Only problematic proteins are moved to child pipelines - Configuration inheritance: Child pipelines inherit optimization parameters - Recursive adaptivity: Child pipelines can also spawn their own children
5. Pipeline Lifecycle Management¶
Lifecycle Control: - Parent finalization: Completes current pipeline processing - Conditional termination: Parent pipeline can terminate if no remaining work - Resource optimization: Prevents idle pipeline instances
6. HPC Resource Configuration¶
async def impress_protein_bind() -> None:
manager: ImpressManager = ImpressManager(
execution_backend=RadicalExecutionBackend({
'gpus': 2, # GPU allocation per pipeline
'cores': 32, # CPU cores per pipeline
'runtime': 13 * 60, # 13 hours maximum runtime
'resource': 'purdue.anvil_gpu' # HPC cluster specification
})
)
pipeline_setups: List[PipelineSetup] = [
PipelineSetup(
name='p1',
type=ProteinBindingPipeline,
adaptive_fn=adaptive_decision
)
]
await manager.start(pipeline_setups=pipeline_setups)
HPC Integration: - GPU allocation: 2 GPUs per pipeline for intensive AlphaFold calculations - Resource specification: 32 CPU cores and 13-hour runtime limits - Cluster targeting: Specifically configured for Purdue Anvil GPU cluster - Scalable architecture: Framework handles resource allocation for child pipelines
Adaptive Execution Flow¶
- Initial pipeline starts with full protein set on HPC with GPU resources
- AlphaFold processing generates protein structure quality scores
- Adaptive evaluation compares current vs previous scores per protein
- Migration decision identifies degraded proteins for re-optimization
- Child pipeline creation moves problematic proteins to new pipeline instance
- Resource allocation HPC system allocates GPUs/CPUs to child pipeline
- Recursive optimization child pipelines can further adapt and spawn children
- Resource cleanup completed pipelines release HPC resources
Key Adaptive Features for HPC¶
- Performance-based adaptation: Real computational results drive pipeline decisions
- Resource-aware scaling: GPU/CPU resources allocated per pipeline instance
- Data locality: PDB files copied to maintain data proximity
- Hierarchical optimization: Multi-level pipeline spawning for complex optimization
- HPC integration: Native support for cluster resource management
- Fault tolerance: Pipeline termination and resource cleanup mechanisms