WarpBuild LogoWarpBuild Docs
Control Nodes

Merge Node

Synchronization node that combines results from parallel workflows with configurable merge strategies

Merge Node

The Merge node synchronizes and combines outputs from multiple parallel execution paths. It provides different merging strategies to handle various parallel processing scenarios, making it essential for workflows that split execution and need to converge.

Configuration

Configuration Schema

FieldTypeRequiredDescription
mode"onFirstComplete" | "onAllComplete"YesStrategy for merging parallel outputs

Output Schema

The output varies based on the merge mode:

onFirstComplete Mode

FieldTypeDescription
outputJsonValueThe output of the first completed path

onAllComplete Mode

FieldTypeDescription
outputJsonValue[]Array of outputs from all paths

Merge Modes

onFirstComplete

Completes when the first parallel path finishes execution. Useful for race conditions, fallback mechanisms, and performance optimization.

# API fallback with multiple endpoints
- id: api_race
  type: merge
  configuration:
    mode: "onFirstComplete"
# Returns the first successful API response

onAllComplete

Waits for all parallel paths to complete before continuing. Useful for data aggregation, comprehensive validation, and ensuring all operations finish.

# Data aggregation from multiple sources
- id: aggregate_data
  type: merge
  configuration:
    mode: "onAllComplete"
# Returns array of all results

Use Cases

Execute multiple API calls and use the fastest response:

# Try multiple endpoints for best performance
- id: endpoint_race
  type: switch
  input:
    switch: "{{ dataSource }}"
    cases:
      - id: primary
        case: "primary"
        then: ["call_primary_api"]
      - id: backup
        case: "backup"
        then: ["call_backup_api", "call_tertiary_api"]

- id: fastest_response
  type: merge
  configuration:
    mode: "onFirstComplete"

Perfect for performance optimization when multiple services can provide the same data with different latencies.

Collect data from multiple sources:

# Gather user data from different services
- id: user_data_sources
  type: condition
  input:
    if: true
    then: ["get_profile", "get_preferences", "get_activity"]
    else: []

- id: aggregate_user_data
  type: merge
  configuration:
    mode: "onAllComplete"

Essential for building comprehensive data views from distributed microservices or multiple data sources.

Run multiple validation checks:

# Comprehensive validation
- id: validation_checks
  type: condition
  input:
    if: true
    then: ["validate_format", "validate_business_rules", "validate_permissions"]
    else: []

- id: validation_results
  type: merge
  configuration:
    mode: "onAllComplete"

Ensure data quality by running independent validation checks simultaneously for faster processing.

Implement graceful degradation:

# Try premium service, fall back to basic
- id: service_fallback
  type: condition
  input:
    if: "{{ user.isPremium }}"
    then: ["premium_service", "basic_service"]
    else: ["basic_service"]

- id: service_response
  type: merge
  configuration:
    mode: "onFirstComplete"

Build resilient systems that gracefully degrade when premium services are unavailable or fail.

Process multiple items and collect results:

# Process files in parallel
- id: file_processing
  type: loop
  input:
    forEach: "{{ files }}"
  configuration:
    mode: "parallel"

- id: collect_results
  type: merge
  configuration:
    mode: "onAllComplete"

Efficiently handle bulk operations by processing items in parallel and collecting all results.

Advanced Patterns

# Dynamic merge strategy based on context
- id: dynamic_merge
  type: merge
  configuration:
    mode: "{{ urgentRequest ? 'onFirstComplete' : 'onAllComplete' }}"

Dynamically choose merge strategy based on runtime conditions like priority, user type, or system load.

# Merge with timeout consideration
- id: parallel_operations
  type: condition
  input:
    if: true
    then: ["fast_operation", "slow_operation_with_timeout"]
    else: []

- id: merge_with_timeout
  type: merge
  configuration:
    mode: "onFirstComplete"  # Don't wait for slow operations

Prevent slow operations from blocking workflows by using first-complete strategy with timeout-aware design.

# Multiple levels of merging
- id: regional_data
  type: loop
  input:
    forEach: "{{ regions }}"
  configuration:
    mode: "parallel"

- id: merge_regional
  type: merge
  configuration:
    mode: "onAllComplete"

- id: global_processing
  type: condition
  input:
    if: true
    then: ["process_global", "process_summary"]
    else: []

- id: final_merge
  type: merge
  configuration:
    mode: "onAllComplete"

Build complex hierarchical workflows with multiple merge levels for sophisticated data aggregation patterns.

Best Practices

Use onFirstComplete When:

  • Performance is critical
  • Implementing fallback mechanisms
  • Racing multiple options
  • Any single result is sufficient
# Good: Performance-critical scenarios
- id: cache_or_compute
  type: merge
  configuration:
    mode: "onFirstComplete"

Use onAllComplete When:

  • All results are needed
  • Data aggregation is required
  • Comprehensive validation needed
  • State consistency is important
# Good: Comprehensive data collection
- id: complete_user_profile
  type: merge
  configuration:
    mode: "onAllComplete"
# Robust error handling with merge
- id: parallel_with_fallback
  type: condition
  input:
    if: true
    then: ["primary_operation", "backup_operation"]
    else: []

- id: merge_results
  type: merge
  configuration:
    mode: "onFirstComplete"

# Handle case where all operations fail
- id: validate_merge_result
  type: condition
  input:
    if: "{{ merge_results.output !== null }}"
    then: ["process_result"]
    else: ["handle_all_failed"]

Always plan for scenarios where parallel operations might fail or return unexpected results.

Key optimization strategies:

  • Balance parallelism: Too many parallel paths can overwhelm resources
  • Consider timeouts: Prevent slow operations from blocking onAllComplete
  • Monitor resource usage: Track memory and CPU usage in parallel operations
  • Use appropriate merge mode: Match the mode to your use case

Profile your workflows to find the optimal balance between speed and resource consumption.

Integration Patterns

# Parallel loop with result aggregation
- id: parallel_processing
  type: loop
  input:
    forEach: "{{ items }}"
  configuration:
    mode: "parallel"

- id: collect_loop_results
  type: merge
  configuration:
    mode: "onAllComplete"
# Multiple API calls with fastest response
- id: api_endpoints
  type: condition
  input:
    if: true
    then: ["call_api_us", "call_api_eu", "call_api_asia"]
    else: []

- id: fastest_api
  type: merge
  configuration:
    mode: "onFirstComplete"
# Multiple AI models for comparison
- id: ai_comparison
  type: condition
  input:
    if: true
    then: ["gpt_analysis", "claude_analysis", "gemini_analysis"]
    else: []

- id: ai_consensus
  type: merge
  configuration:
    mode: "onAllComplete"

Data Processing

Result Structure

onFirstComplete Output

{
  "output": "first_completed_result",
  "completedPath": "path_identifier",
  "completionTime": "2024-01-01T12:00:00Z"
}

onAllComplete Output

{
  "output": [
    "result_from_path_1",
    "result_from_path_2", 
    "result_from_path_3"
  ],
  "totalPaths": 3,
  "completionTime": "2024-01-01T12:00:05Z"
}

Accessing Merge Results

# Process onFirstComplete result
- id: process_first_result
  type: condition
  input:
    if: "{{ merge_results.output !== null }}"
    then: ["handle_success"]
    else: ["handle_failure"]

# Process onAllComplete results
- id: process_all_results
  type: loop
  input:
    forEach: "{{ merge_results.output }}"
  configuration:
    mode: "sequential"

Troubleshooting

Debugging Tips

# Add logging to track merge behavior
- id: debug_merge
  type: merge
  configuration:
    mode: "onAllComplete"

# Log results for analysis
- id: log_merge_results
  type: script
  input:
    script: |
      console.log('Merge completed:', {
        mode: 'onAllComplete',
        resultCount: merge_results.output.length,
        results: merge_results.output
      });
      return merge_results.output;

Last updated on