Merge Node
Synchronization node that combines results from parallel workflows with configurable merge strategies
Merge Node
The Merge node synchronizes and combines outputs from multiple parallel execution paths. It provides different merging strategies to handle various parallel processing scenarios, making it essential for workflows that split execution and need to converge.
Configuration
Configuration Schema
Field | Type | Required | Description |
---|---|---|---|
mode | "onFirstComplete" | "onAllComplete" | Yes | Strategy for merging parallel outputs |
Output Schema
The output varies based on the merge mode:
onFirstComplete Mode
Field | Type | Description |
---|---|---|
output | JsonValue | The output of the first completed path |
onAllComplete Mode
Field | Type | Description |
---|---|---|
output | JsonValue[] | Array of outputs from all paths |
Merge Modes
onFirstComplete
Completes when the first parallel path finishes execution. Useful for race conditions, fallback mechanisms, and performance optimization.
# API fallback with multiple endpoints
- id: api_race
type: merge
configuration:
mode: "onFirstComplete"
# Returns the first successful API response
onAllComplete
Waits for all parallel paths to complete before continuing. Useful for data aggregation, comprehensive validation, and ensuring all operations finish.
# Data aggregation from multiple sources
- id: aggregate_data
type: merge
configuration:
mode: "onAllComplete"
# Returns array of all results
Use Cases
Execute multiple API calls and use the fastest response:
# Try multiple endpoints for best performance
- id: endpoint_race
type: switch
input:
switch: "{{ dataSource }}"
cases:
- id: primary
case: "primary"
then: ["call_primary_api"]
- id: backup
case: "backup"
then: ["call_backup_api", "call_tertiary_api"]
- id: fastest_response
type: merge
configuration:
mode: "onFirstComplete"
Perfect for performance optimization when multiple services can provide the same data with different latencies.
Collect data from multiple sources:
# Gather user data from different services
- id: user_data_sources
type: condition
input:
if: true
then: ["get_profile", "get_preferences", "get_activity"]
else: []
- id: aggregate_user_data
type: merge
configuration:
mode: "onAllComplete"
Essential for building comprehensive data views from distributed microservices or multiple data sources.
Run multiple validation checks:
# Comprehensive validation
- id: validation_checks
type: condition
input:
if: true
then: ["validate_format", "validate_business_rules", "validate_permissions"]
else: []
- id: validation_results
type: merge
configuration:
mode: "onAllComplete"
Ensure data quality by running independent validation checks simultaneously for faster processing.
Implement graceful degradation:
# Try premium service, fall back to basic
- id: service_fallback
type: condition
input:
if: "{{ user.isPremium }}"
then: ["premium_service", "basic_service"]
else: ["basic_service"]
- id: service_response
type: merge
configuration:
mode: "onFirstComplete"
Build resilient systems that gracefully degrade when premium services are unavailable or fail.
Process multiple items and collect results:
# Process files in parallel
- id: file_processing
type: loop
input:
forEach: "{{ files }}"
configuration:
mode: "parallel"
- id: collect_results
type: merge
configuration:
mode: "onAllComplete"
Efficiently handle bulk operations by processing items in parallel and collecting all results.
Advanced Patterns
# Dynamic merge strategy based on context
- id: dynamic_merge
type: merge
configuration:
mode: "{{ urgentRequest ? 'onFirstComplete' : 'onAllComplete' }}"
Dynamically choose merge strategy based on runtime conditions like priority, user type, or system load.
# Merge with timeout consideration
- id: parallel_operations
type: condition
input:
if: true
then: ["fast_operation", "slow_operation_with_timeout"]
else: []
- id: merge_with_timeout
type: merge
configuration:
mode: "onFirstComplete" # Don't wait for slow operations
Prevent slow operations from blocking workflows by using first-complete strategy with timeout-aware design.
# Multiple levels of merging
- id: regional_data
type: loop
input:
forEach: "{{ regions }}"
configuration:
mode: "parallel"
- id: merge_regional
type: merge
configuration:
mode: "onAllComplete"
- id: global_processing
type: condition
input:
if: true
then: ["process_global", "process_summary"]
else: []
- id: final_merge
type: merge
configuration:
mode: "onAllComplete"
Build complex hierarchical workflows with multiple merge levels for sophisticated data aggregation patterns.
Best Practices
Use onFirstComplete When:
- Performance is critical
- Implementing fallback mechanisms
- Racing multiple options
- Any single result is sufficient
# Good: Performance-critical scenarios
- id: cache_or_compute
type: merge
configuration:
mode: "onFirstComplete"
Use onAllComplete When:
- All results are needed
- Data aggregation is required
- Comprehensive validation needed
- State consistency is important
# Good: Comprehensive data collection
- id: complete_user_profile
type: merge
configuration:
mode: "onAllComplete"
# Robust error handling with merge
- id: parallel_with_fallback
type: condition
input:
if: true
then: ["primary_operation", "backup_operation"]
else: []
- id: merge_results
type: merge
configuration:
mode: "onFirstComplete"
# Handle case where all operations fail
- id: validate_merge_result
type: condition
input:
if: "{{ merge_results.output !== null }}"
then: ["process_result"]
else: ["handle_all_failed"]
Always plan for scenarios where parallel operations might fail or return unexpected results.
Key optimization strategies:
- Balance parallelism: Too many parallel paths can overwhelm resources
- Consider timeouts: Prevent slow operations from blocking onAllComplete
- Monitor resource usage: Track memory and CPU usage in parallel operations
- Use appropriate merge mode: Match the mode to your use case
Profile your workflows to find the optimal balance between speed and resource consumption.
Integration Patterns
# Parallel loop with result aggregation
- id: parallel_processing
type: loop
input:
forEach: "{{ items }}"
configuration:
mode: "parallel"
- id: collect_loop_results
type: merge
configuration:
mode: "onAllComplete"
# Multiple API calls with fastest response
- id: api_endpoints
type: condition
input:
if: true
then: ["call_api_us", "call_api_eu", "call_api_asia"]
else: []
- id: fastest_api
type: merge
configuration:
mode: "onFirstComplete"
# Multiple AI models for comparison
- id: ai_comparison
type: condition
input:
if: true
then: ["gpt_analysis", "claude_analysis", "gemini_analysis"]
else: []
- id: ai_consensus
type: merge
configuration:
mode: "onAllComplete"
Data Processing
Result Structure
onFirstComplete Output
{
"output": "first_completed_result",
"completedPath": "path_identifier",
"completionTime": "2024-01-01T12:00:00Z"
}
onAllComplete Output
{
"output": [
"result_from_path_1",
"result_from_path_2",
"result_from_path_3"
],
"totalPaths": 3,
"completionTime": "2024-01-01T12:00:05Z"
}
Accessing Merge Results
# Process onFirstComplete result
- id: process_first_result
type: condition
input:
if: "{{ merge_results.output !== null }}"
then: ["handle_success"]
else: ["handle_failure"]
# Process onAllComplete results
- id: process_all_results
type: loop
input:
forEach: "{{ merge_results.output }}"
configuration:
mode: "sequential"
Troubleshooting
Debugging Tips
# Add logging to track merge behavior
- id: debug_merge
type: merge
configuration:
mode: "onAllComplete"
# Log results for analysis
- id: log_merge_results
type: script
input:
script: |
console.log('Merge completed:', {
mode: 'onAllComplete',
resultCount: merge_results.output.length,
results: merge_results.output
});
return merge_results.output;
Last updated on