Skip to main content

Overview

Tool Name

process_manager_tools

Purpose

The process_manager_tools enable orchestration of complex, multi-step workflows and coordination of multiple agents working together. Manage long-running processes, delegate tasks to specialized agents, track progress across distributed operations, and implement sophisticated data pipelines with parallel execution, error handling, and state management. Perfect for enterprise-scale data operations, multi-phase projects, and collaborative agent workflows.

Functions Available

  1. _delegate_work: Delegate tasks to other bots and WAIT for completion (synchronous blocking operation).
  2. sub_thread: Create isolated execution contexts for focused tasks with optional tool constraints and async execution.
  3. repeat: Execute instruction repeatedly for each item in a list (synchronous sequential processing).
  4. check_sub_thread_status: Check the status of an async sub-thread execution.
  5. get_sub_thread_messages: Retrieve conversation history from a sub-thread.

Key Features

Work Delegation

Delegate tasks to specialized agents and wait for completion with comprehensive results.

Sub-Thread Isolation

Create isolated execution contexts with constrained tools for focused, secure task execution.

Batch Processing

Execute the same instruction across multiple items with sequential or parallel processing.

Async Execution

Start long-running tasks asynchronously and check status later without blocking.

Tool Constraints

Restrict available tools in sub-threads for security and focused execution.

Context Management

Control whether sub-threads inherit parent context or start fresh.

Input Parameters for Each Function

_delegate_work

Parameters
NameDefinitionFormat
promptCOMPLETE instructions for the task. Provide ALL details in this single call—the function will WAIT for completion.String (required)
target_botBot ID or name to delegate to. The function BLOCKS until this bot completes the task or times out.String
timeout_secondsMaximum seconds to WAIT for task completion before timing out (default 300). Function blocks for this duration.Integer
max_retriesMaximum retries if bot returns invalid JSON (during the single synchronous execution).Integer

sub_thread

Parameters
NameDefinitionFormat
taskSpecific task or prompt to execute in the sub-thread.String (required)
allowed_toolsList of tool names that can be used in sub-thread (e.g., [‘read_file’, ‘write_file’]). Empty = all tools.Array of Strings
include_contextWhether to include main thread’s conversation context (default: False).Boolean
async_modeIf True, returns immediately with thread_id for later checking (default: False for synchronous behavior).Boolean
timeout_secondsMaximum seconds to wait for sub-thread completion.Integer
callback_urlOptional URL to POST results to when complete (useful for async mode).String
resume_thread_idID of existing sub-thread to resume (must start with ‘sub_thread_’).String
stop_on_timeoutIf True (default), stop remote work on timeout; if False, leave running and return partial result.Boolean

repeat

Parameters
NameDefinitionFormat
instructionBase instruction to repeat for each item. Each iteration runs SYNCHRONOUSLY to completion before next starts.String (required)
itemsList of parameters to iterate over (max 20 items). Function processes each SEQUENTIALLY, waiting for completion.Array of Objects
timeout_secondsMaximum seconds to WAIT for EACH iteration to complete (not total time). Function blocks for each item.Integer
max_retriesMaximum retries if bot returns invalid JSON (per iteration during synchronous execution).Integer

check_sub_thread_status

Parameters
NameDefinitionFormat
thread_idSub-thread ID to check status for (must start with ‘sub_thread_’).String (required)
last_n_messagesOptional: include last N user/assistant messages (default 0 = none).Integer

get_sub_thread_messages

Parameters
NameDefinitionFormat
thread_idSub-thread ID to retrieve messages from (must start with ‘sub_thread_’).String (required)
last_n_messagesNumber of most recent messages to retrieve (default: 10).Integer
include_tool_callsInclude sanitized tool calls and tool results (default: True).Boolean
Critical Understanding: _delegate_work and repeat are BLOCKING functions that wait for completion. Use sub_thread with async_mode=True for non-blocking operations.

Use Cases

  1. Multi-Agent Collaboration Coordinate multiple specialized agents (data analyst, report generator, quality checker) to complete complex projects.
  2. Parallel Data Processing Process multiple datasets, files, or customers simultaneously by delegating to multiple agent instances.
  3. Long-Running Operations Execute time-intensive tasks (model training, large data processing) asynchronously without blocking main workflow.
  4. Tool-Constrained Execution Run code with restricted tool access for security or to prevent unintended operations.
  5. Batch Operations Apply the same analysis, transformation, or report generation across multiple entities (customers, products, regions).

Workflow/How It Works

  1. Step 1: Simple Task Delegation Delegate a task to another bot and wait for results:
    result = _delegate_work(
        prompt="Analyze the sales data in sales_2024.csv and provide a summary",
        target_bot="data_analyst_bot"
    )
    
    print(f"Analysis complete: {result['result']}")
    
  2. Step 2: Sub-Thread with Tool Constraints Execute task with limited tools for security:
    result = sub_thread(
        task="Read the config.json file and extract the database connection string",
        allowed_tools=['file'],  # Only file operations allowed
        include_context=False
    )
    
    print(f"Connection string: {result['result']}")
    
  3. Step 3: Async Sub-Thread for Long Operations Start a long-running task without blocking:
    # Start async task
    result = sub_thread(
        task="Train machine learning model on customer data and save results",
        async_mode=True,
        timeout_seconds=1800  # 30 minutes
    )
    
    thread_id = result['thread_id']
    print(f"Task started: {thread_id}")
    
    # Do other work...
    
    # Check status later
    status = check_sub_thread_status(thread_id=thread_id)
    if status['status'] == 'completed':
        print(f"Model training complete: {status['result']}")
    elif status['status'] == 'running':
        print("Still training...")
    
  4. Step 4: Batch Processing with Repeat Process multiple items sequentially:
    customers = [
        {"customer_id": "C001", "name": "Acme Corp"},
        {"customer_id": "C002", "name": "TechStart Inc"},
        {"customer_id": "C003", "name": "Global Solutions"}
    ]
    
    result = repeat(
        instruction="Generate monthly report for this customer and save to their folder",
        items=customers,
        timeout_seconds=120  # 2 minutes per customer
    )
    
    print(f"Generated {result['completed_iterations']} reports")
    
  5. Step 5: Multi-Agent Pipeline Coordinate multiple agents for complex workflow:
    # Step 1: Extract data
    extraction = _delegate_work(
        prompt="Extract customer transaction data for Q1 2024",
        target_bot="data_extractor_bot",
        timeout_seconds=300
    )
    
    # Step 2: Analyze data
    analysis = _delegate_work(
        prompt=f"Analyze this transaction data and identify trends: {extraction['result']}",
        target_bot="data_analyst_bot",
        timeout_seconds=600
    )
    
    # Step 3: Generate report
    report = _delegate_work(
        prompt=f"Create executive report from this analysis: {analysis['result']}",
        target_bot="report_generator_bot",
        timeout_seconds=180
    )
    
    # Step 4: Quality check
    review = _delegate_work(
        prompt=f"Review this report for accuracy and completeness: {report['result']}",
        target_bot="quality_checker_bot",
        timeout_seconds=120
    )
    
    print(f"Pipeline complete. Final report: {report['result']}")
    print(f"Quality review: {review['result']}")
    
  6. Step 6: Resume Interrupted Sub-Thread Continue work from previous sub-thread:
    # Resume previous sub-thread
    result = sub_thread(
        task="Continue the analysis where we left off",
        resume_thread_id="sub_thread_abc123"
    )
    

Integration Relevance

  • project_manager_tools to orchestrate complex missions with multiple tasks and agents.
  • process_scheduler_tools to schedule batch operations and multi-agent workflows.
  • data_connector_tools with delegation for parallel data extraction across multiple sources.
  • github_connector_tools / gitlab_connector_tools for coordinated version control operations.
  • airflow_tools to trigger multi-agent pipelines from Airflow DAGs.
  • code_executor_tools within sub-threads for sandboxed code execution.

Configuration Details

  • Delegation Mode: Synchronous blocking—waits for delegated bot to complete before returning.
  • Sub-Thread Isolation: Each sub-thread has independent context and variable scope.
  • Tool Constraints: Sub-threads can restrict tools to specific subsets for security.
  • Timeout Handling: Configurable timeout with option to stop or continue on timeout.
  • Context Inheritance: Sub-threads can optionally inherit parent thread conversation history.
  • Async Support: Sub-threads support async mode for non-blocking long operations.
  • Thread Tracking: All sub-threads tracked with unique IDs for status checking.
  • Error Propagation: Errors in delegated/sub-thread work returned to caller for handling.
Blocking Behavior: _delegate_work and repeat are BLOCKING operations. They will NOT return until the work completes or times out. Plan your timeout values accordingly.

Limitations or Notes

  1. Blocking Operations: _delegate_work and repeat block execution—cannot check status or cancel mid-execution.
  2. Timeout Limits: Default 300 seconds (5 minutes); maximum timeout may be system-configured.
  3. Sequential Repeat: repeat processes items one at a time—not parallel (use multiple sub_thread calls for parallel).
  4. Item Limit: repeat maximum 20 items per call to prevent excessive execution time.
  5. Context Size: Including context in sub-threads increases memory usage—use sparingly.
  6. Tool Availability: Delegated bots must have required tools; sub-threads inherit from parent unless constrained.
  7. Thread Lifecycle: Sub-threads cleaned up after completion; results available only while tracked.
  8. No Nested Delegation: Sub-threads cannot create their own sub-threads (one level deep only).

Supported Operations

_delegate_work - Synchronous task delegation to other bots
sub_thread (sync) - Blocking isolated execution with tool constraints
sub_thread (async) - Non-blocking isolated execution
repeat - Sequential batch processing
check_sub_thread_status - Status checking for async sub-threads
get_sub_thread_messages - Retrieve sub-thread conversation
resume_thread_id - Resume interrupted sub-threads
callback_url - Webhook notification on completion

Not Supported

❌ Parallel execution of repeat items (sequential only)
❌ Canceling in-progress _delegate_work (waits for timeout)
❌ Nested sub-threads (sub-thread creating sub-threads)
❌ Real-time progress updates during delegation
❌ Partial result retrieval before completion
❌ Dynamic timeout adjustment during execution
❌ Cross-bot state sharing (each execution isolated)
❌ Priority or queue management for delegations

Output

  • _delegate_work: Dict with success, result (bot’s response), callback_id (thread ID reference), error.
  • sub_thread (sync): Dict with success, result, thread_id, error.
  • sub_thread (async): Dict with success: True, async: True, thread_id, message.
  • repeat: Dict with success, results (list), errors, completed_iterations, total_iterations.
  • check_sub_thread_status: Dict with found, status (‘running’, ‘completed’, ‘error’), result, error, started_at.
  • get_sub_thread_messages: Dict with found, thread_id, messages, message_count, returned_count.
  • Errors: Detailed error messages with context about what failed and why.

Best Practices

Complete Instructions

Provide ALL context and details in delegation prompt. Delegated bot won’t have access to your conversation history.

Appropriate Timeouts

Set realistic timeouts based on task complexity. Too short = premature failure, too long = wasted wait time.

Error Handling

Always check for errors in results. Delegated tasks may fail—handle gracefully and retry if appropriate.

Tool Constraints

Use allowed_tools to restrict sub-thread capabilities when security or focus is important.

Async for Long Tasks

Use async sub-threads for operations > 1 minute to avoid blocking. Check status periodically.

Context Awareness

Only include context when necessary. Increases memory usage and can introduce confusion for focused tasks.

Example: Complete Multi-Agent Workflow

# SCENARIO: Comprehensive customer analysis pipeline with multiple specialized agents

print("=== Starting Customer Analysis Pipeline ===\n")

# Step 1: Data Extraction (delegate to specialized extractor)
print("Step 1: Extracting customer data...")
extraction_result = _delegate_work(
    prompt="""
Extract customer data for Q1 2024 including:
1. Transaction history from database
2. Support ticket data
3. Product usage metrics
4. Customer satisfaction scores

Return data in JSON format with structure:
{
  "transactions": [...],
  "support_tickets": [...],
  "usage_metrics": {...},
  "satisfaction_scores": {...}
}
""",
    target_bot="data_extractor_bot",
    timeout_seconds=300
)

if not extraction_result['success']:
    print(f"❌ Extraction failed: {extraction_result['error']}")
    exit(1)

print("✅ Data extraction complete\n")
customer_data = extraction_result['result']

# Step 2: Parallel Analysis (using async sub-threads)
print("Step 2: Running parallel analysis tasks...")

# Start transaction analysis (async)
transaction_analysis = sub_thread(
    task=f"""
Analyze transaction data: {customer_data['transactions']}

Calculate:
1. Total revenue by customer segment
2. Average transaction value
3. Purchase frequency trends
4. Top product categories
5. Customer lifetime value estimates

Return structured analysis with key metrics.
""",
    allowed_tools=['code_executor_tools', 'data_connector_tools'],
    async_mode=True,
    timeout_seconds=600
)

# Start support analysis (async)
support_analysis = sub_thread(
    task=f"""
Analyze support ticket data: {customer_data['support_tickets']}

Identify:
1. Most common issues
2. Resolution time trends
3. Customer satisfaction correlation
4. Issue severity distribution
5. Recurring problem patterns

Return structured analysis with insights.
""",
    allowed_tools=['code_executor_tools', 'data_connector_tools'],
    async_mode=True,
    timeout_seconds=600
)

# Start usage analysis (async)
usage_analysis = sub_thread(
    task=f"""
Analyze product usage metrics: {customer_data['usage_metrics']}

Determine:
1. Feature adoption rates
2. User engagement levels
3. Churn risk indicators
4. Power user characteristics
5. Underutilized features

Return structured analysis with recommendations.
""",
    allowed_tools=['code_executor_tools', 'data_connector_tools'],
    async_mode=True,
    timeout_seconds=600
)

# Wait for all analyses to complete
import time
analyses = {
    'transaction': transaction_analysis['thread_id'],
    'support': support_analysis['thread_id'],
    'usage': usage_analysis['thread_id']
}

results = {}
max_wait = 600  # 10 minutes total
start_time = time.time()

while len(results) < 3 and (time.time() - start_time) < max_wait:
    for name, thread_id in analyses.items():
        if name not in results:
            status = check_sub_thread_status(thread_id=thread_id)
            if status['status'] == 'completed':
                results[name] = status['result']
                print(f"✅ {name.title()} analysis complete")
            elif status['status'] == 'error':
                results[name] = {'error': status['error']}
                print(f"❌ {name.title()} analysis failed: {status['error']}")
    
    if len(results) < 3:
        time.sleep(10)  # Check every 10 seconds

print(f"\n✅ All analyses complete ({len(results)}/3)\n")

# Step 3: Generate insights report (delegate to analyst)
print("Step 3: Generating comprehensive insights...")
insights_result = _delegate_work(
    prompt=f"""
Create comprehensive customer insights report from these analyses:

Transaction Analysis:
{results.get('transaction', 'Not available')}

Support Analysis:
{results.get('support', 'Not available')}

Usage Analysis:
{results.get('usage', 'Not available')}

Generate report including:
1. Executive summary
2. Key findings and trends
3. Customer segments with characteristics
4. Risk and opportunity analysis
5. Strategic recommendations
6. Data-driven action items

Format as structured markdown report.
""",
    target_bot="senior_analyst_bot",
    timeout_seconds=300
)

if not insights_result['success']:
    print(f"❌ Insights generation failed: {insights_result['error']}")
    exit(1)

print("✅ Insights report generated\n")

# Step 4: Generate visualizations (delegate to viz specialist)
print("Step 4: Creating visualizations...")
viz_result = _delegate_work(
    prompt=f"""
Create visualizations for customer insights report:

Data:
{results}

Generate:
1. Revenue trend chart (line chart)
2. Customer segment distribution (pie chart)
3. Support ticket categories (bar chart)
4. Feature adoption heatmap
5. Churn risk matrix

Return Plotly JSON specifications for each chart.
""",
    target_bot="visualization_bot",
    timeout_seconds=180
)

print("✅ Visualizations created\n")

# Step 5: Generate customer-specific reports (batch processing)
print("Step 5: Generating individual customer reports...")

top_customers = [
    {"customer_id": "C001", "name": "Acme Corp", "tier": "Enterprise"},
    {"customer_id": "C002", "name": "TechStart Inc", "tier": "Pro"},
    {"customer_id": "C003", "name": "Global Solutions", "tier": "Enterprise"}
]

customer_reports = repeat(
    instruction="""
Generate personalized report for this customer:
- Customer ID: {customer_id}
- Customer Name: {name}
- Tier: {tier}

Include:
1. Personalized insights from overall analysis
2. Customer-specific metrics
3. Tailored recommendations
4. Account health score

Save report to customer_reports/{customer_id}_report.pdf
""",
    items=top_customers,
    timeout_seconds=120
)

print(f"✅ Generated {customer_reports['completed_iterations']} customer reports\n")

# Step 6: Quality review (delegate to QA bot)
print("Step 6: Quality assurance review...")
qa_result = _delegate_work(
    prompt=f"""
Review the following deliverables for quality and accuracy:

1. Insights Report: {insights_result['result']}
2. Visualizations: {viz_result['result']}
3. Customer Reports: {customer_reports['results']}

Check for:
- Data accuracy and consistency
- Completeness of analysis
- Clarity of recommendations
- Visual quality and correctness
- Actionability of insights

Provide quality score (1-10) and list any issues found.
""",
    target_bot="qa_specialist_bot",
    timeout_seconds=180
)

print(f"✅ QA Review complete\n")
print(f"Quality Score: {qa_result['result'].get('score', 'N/A')}/10")

# Step 7: Final assembly and distribution
print("Step 7: Assembling final deliverable...")
final_result = _delegate_work(
    prompt=f"""
Create final customer analysis package:

Components:
1. Executive insights report
2. Visualization suite
3. Individual customer reports (3)
4. QA review summary

Package as:
- Single PDF with all components
- Separate files in organized folder structure
- Email summary for stakeholders

Save to: /customer_analysis_q1_2024/
Send email notification to: analytics-team@company.com
""",
    target_bot="document_specialist_bot",
    timeout_seconds=240
)

print("✅ Final deliverable assembled and distributed\n")

print("=== Customer Analysis Pipeline Complete ===")
print(f"Summary:")
print(f"- Data extracted for Q1 2024")
print(f"- 3 parallel analyses completed")
print(f"- Comprehensive insights generated")
print(f"- {len(top_customers)} customer-specific reports created")
print(f"- QA Score: {qa_result['result'].get('score', 'N/A')}/10")
print(f"- Final package delivered to analytics team")

Advanced Features

Parallel Processing Pattern

Execute multiple independent tasks simultaneously:
# Start multiple async sub-threads
thread_ids = []

tasks = [
    "Analyze sales data for North region",
    "Analyze sales data for South region",
    "Analyze sales data for East region",
    "Analyze sales data for West region"
]

for task in tasks:
    result = sub_thread(
        task=task,
        async_mode=True,
        timeout_seconds=300
    )
    thread_ids.append(result['thread_id'])

# Wait for all to complete
import time
results = []
start_time = time.time()

while len(results) < len(thread_ids) and (time.time() - start_time) < 600:
    for thread_id in thread_ids:
        if thread_id not in [r['thread_id'] for r in results]:
            status = check_sub_thread_status(thread_id=thread_id)
            if status['status'] in ['completed', 'error']:
                results.append({
                    'thread_id': thread_id,
                    'status': status['status'],
                    'result': status.get('result')
                })
    
    if len(results) < len(thread_ids):
        time.sleep(5)

print(f"Completed {len(results)}/{len(thread_ids)} tasks")

Error Recovery and Retry

Implement retry logic for delegated work:
def delegate_with_retry(prompt, target_bot, max_attempts=3):
    """Delegate work with automatic retry on failure"""
    for attempt in range(max_attempts):
        try:
            result = _delegate_work(
                prompt=prompt,
                target_bot=target_bot,
                timeout_seconds=300
            )
            
            if result['success']:
                return result
            
            print(f"Attempt {attempt + 1} failed: {result.get('error')}")
            
            # Exponential backoff
            if attempt < max_attempts - 1:
                wait_time = 2 ** attempt
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
        
        except Exception as e:
            print(f"Attempt {attempt + 1} exception: {str(e)}")
            if attempt == max_attempts - 1:
                raise
    
    return {'success': False, 'error': 'Max retries exceeded'}

# Usage
result = delegate_with_retry(
    prompt="Extract customer data",
    target_bot="data_bot",
    max_attempts=3
)

Conditional Workflows

Implement decision logic in multi-agent workflows:
# Step 1: Assess data quality
quality_check = _delegate_work(
    prompt="Assess data quality of customer_data.csv",
    target_bot="qa_bot"
)

# Step 2: Conditional path based on quality
if quality_check['result']['quality_score'] >= 8:
    # High quality: proceed with full analysis
    analysis = _delegate_work(
        prompt="Perform comprehensive analysis on customer_data.csv",
        target_bot="advanced_analyst_bot",
        timeout_seconds=600
    )
else:
    # Low quality: data cleaning required first
    cleaning = _delegate_work(
        prompt="Clean and validate customer_data.csv",
        target_bot="data_cleaner_bot"
    )
    
    # Then proceed with analysis
    analysis = _delegate_work(
        prompt=f"Analyze cleaned data: {cleaning['result']}",
        target_bot="analyst_bot"
    )

print(f"Analysis complete: {analysis['result']}")

Progressive Enhancement

Build results incrementally across agents:
# Start with basic analysis
current_result = _delegate_work(
    prompt="Perform basic statistical analysis on sales_data.csv",
    target_bot="stats_bot"
)

# Enhance with trend analysis
current_result = _delegate_work(
    prompt=f"Add trend analysis to these statistics: {current_result['result']}",
    target_bot="trends_bot"
)

# Enhance with forecasting
current_result = _delegate_work(
    prompt=f"Add 3-month forecast based on this analysis: {current_result['result']}",
    target_bot="forecast_bot"
)

# Final enhancement with recommendations
final_result = _delegate_work(
    prompt=f"Add strategic recommendations to this complete analysis: {current_result['result']}",
    target_bot="strategy_bot"
)

print(f"Final enhanced analysis: {final_result['result']}")

Troubleshooting

  • Increase timeout_seconds parameter
  • Check if target bot is responsive
  • Verify task complexity matches timeout
  • Break complex tasks into smaller delegations
  • Check target bot has required tools
  • Review target bot’s execution history
  • Check timeout configuration
  • Review task complexity
  • Verify allowed_tools include what’s needed
  • Check for infinite loops in task logic
  • Use check_sub_thread_status to get details
  • Consider killing stuck thread and restarting
  • Check errors list in repeat result
  • Verify all items have required fields
  • Test instruction with problem items manually
  • Add error handling to instruction
  • Reduce timeout if items timing out
  • Check for data-specific issues
  • Set include_context=True explicitly
  • Verify parent thread has context to share
  • Note context increases memory usage
  • Consider passing needed data explicitly
  • Check context size isn’t too large
  • Use resume_thread_id if continuing work
  • Verify allowed_tools list is correct
  • Check tool names match exactly
  • Empty list = all tools available
  • Test with broader tool access first
  • Review error for which tool was blocked
  • Add missing tools to allowed_tools
  • Verify thread_id is correct (starts with ‘sub_thread_’)
  • Check thread hasn’t been cleaned up
  • Allow time for status to update
  • Use get_sub_thread_messages for more details
  • Check if thread completed while not monitoring
  • Verify tracking system is functioning
  • Provide more complete instructions in prompt
  • Check target bot understood requirements
  • Review returned result structure
  • Add explicit format requirements to prompt
  • Increase timeout if bot needs more time
  • Test prompt manually with target bot first

Process Manager Architecture

Understanding the orchestration components:

Key Concepts

  • Delegation: Synchronous blocking call to another bot—waits for completion
  • Sub-Thread (Sync): Isolated execution that blocks until complete
  • Sub-Thread (Async): Non-blocking execution with status checking
  • Repeat: Sequential iteration blocking on each item
  • Tool Constraints: Filter available tools in sub-threads
  • Context Inheritance: Optional sharing of conversation history

Performance Considerations

Appropriate Blocking

Understand when operations block. Plan workflow to minimize unnecessary waiting.

Timeout Tuning

Set timeouts based on actual task duration. Monitor execution times and adjust.

Parallel When Possible

Use async sub-threads for independent tasks. Don’t use repeat when parallel execution is better.

Context Size

Only include context when necessary. Large contexts slow execution and increase memory.

Comparison: Orchestration Options

Feature_delegate_worksub_thread (sync)sub_thread (async)repeat
Execution ModelBlockingBlockingNon-blockingBlocking sequential
Use CaseTask to another botIsolated executionLong-running tasksBatch processing
Tool Control❌ No✅ Yes (allowed_tools)✅ Yes (allowed_tools)❌ No
Context Inheritance❌ No✅ Optional✅ Optional❌ No
Status Checking❌ Waits for completion❌ Waits for completion✅ check_sub_thread_status❌ Waits for completion
Best ForBot collaborationFocused secure tasksAsync operationsMultiple similar items
Choosing the Right Tool:
  • Need another bot’s expertise? Use _delegate_work
  • Need tool restrictions? Use sub_thread with allowed_tools
  • Task takes > 1 minute? Use sub_thread async_mode=True
  • Same task for multiple items? Use repeat

Best Practices Summary

  1. Always provide complete instructions in delegation prompts—no context is shared
  2. Set realistic timeouts based on task complexity
  3. Use async mode for long-running operations (> 1 minute)
  4. Implement error handling for all delegation/sub-thread calls
  5. Constrain tools in sub-threads when security or focus is important
  6. Monitor execution via status checks for async operations
  7. Break complex workflows into smaller, focused delegations
  8. Test prompts manually before using in automated workflows
  9. Log execution details for debugging and optimization
  10. Consider parallel execution instead of sequential when tasks are independent
I