for_each
Process each item in an array with a sequence of steps.
Overview
Process each item in an array with a sequence of steps. This step iterates over an array in your event data and executes a sequence of nested steps for each item. It's essential for batch processing, transforming lists, sending notifications to multiple recipients, or any scenario where you need to apply the same logic to multiple items. You can control how results are aggregated (collect all, replace array, or discard), whether items are processed in isolation or share context, and how errors are handled. The current item is exposed via a configurable variable name that you can reference in nested steps using template syntax like ${item.field}.
Quick Start
steps:
- type: for_each
input_from: <string>
steps: []Configuration
| Parameter | Type | Required | Description |
|---|---|---|---|
input_from | string | Yes | Dot-delimited location of the source array within the event (for example 'items'). |
path | string | No | DEPRECATED: Use 'input_from' instead. Dot-delimited location of the source array. |
steps | array | Yes | Nested step declarations that will be executed for each element. The list must be non-empty. |
var | string | No | Name of the variable injected into child steps when using shared scope or non-dict elements.
Default: "item" |
result_strategy | string | No | Controls how per-item results are written back: 'collect' (default) gathers results into a new array, 'replace' overwrites the target path with collected results, 'extend_array' appends results to the source list, 'merge_objects' flattens dict outputs into a single object, 'custom' mirrors 'collect' but signals intentional downstream handling.
Default: "collect"
Options: [object Object], [object Object], [object Object], [object Object], [object Object] |
output_to | string | No | Destination path for aggregated output. Defaults to ``input_from`` (the source array location) when omitted. |
target_path | string | No | DEPRECATED: Use 'output_to' instead. Destination path for aggregated output. |
keep_original | boolean | No | Reserved for future use. Present for backward compatibility with historical workflow definitions; the current implementation always preserves the source array.
Default: false |
on_empty | string | No | Behaviour when the source resolves to an empty list or null: 'skip' leaves the event unchanged, 'inject_null' writes null to the target, 'inject_empty' writes an empty list or object depending on the result strategy.
Default: "skip"
Options: [object Object], [object Object], [object Object] |
error_mode | string | No | 'fail_fast' raises when the source is not a list; 'continue' leaves the event untouched instead.
Default: "fail_fast"
Options: [object Object], [object Object] |
item_envelope | string | No | When using shared scope, 'wrap' stores the current element under {'value': ...}; 'none' injects the element directly.
Default: "none"
Options: [object Object], [object Object] |
item_scope | string | No | Processing scope: 'isolated' (default) copies each element into its own event, 'shared' reuses the outer event and writes the current element to ``var``.
Default: "isolated"
Options: [object Object], [object Object] |
Examples
Transform array of items
Apply transformation to each item and collect results
type: for_each
input_from: products
var: product
steps:
- type: python_function
code: |
def process(event):
event['price_with_tax'] = event['price'] * 1.08
event['formatted_name'] = event['name'].upper()
return event
handler: process
result_strategy: collect
output_to: processed_products
Send notifications to multiple users
Send an email or webhook to each user in a list
type: for_each
input_from: users
var: user
item_scope: isolated
steps:
- type: http_call
url: https://api.example.com/notify
method: POST
body:
email: ${user.email}
message: "Hello ${user.name}!"
result_strategy: discard
Filter and enrich array items
Process items with multiple nested steps
type: for_each
input_from: orders
var: order
steps:
- type: filter
groups:
- conditions:
- field: order.status
op: eq
value: pending
- type: http_call
url: https://api.example.com/enrich/${order.id}
output_to: enrichment
result_strategy: collect
keep_original: false
Replace original array
Transform array in place without creating a new field
type: for_each
input_from: items
var: item
steps:
- type: python_function
code: |
def process(event):
event['processed'] = True
event['timestamp'] = int(time.time())
return event
handler: process
result_strategy: replace
Parallel processing with error handling
Process items with graceful error handling
type: for_each
input_from: api_requests
var: request
item_scope: isolated
error_mode: skip_and_log
steps:
- type: http_call
url: ${request.endpoint}
method: ${request.method}
drop_on_failure: false
result_strategy: collect
output_to: api_responses
Nested loop processing
Iterate over nested arrays
type: for_each
input_from: customers
var: customer
steps:
- type: for_each
input_from: customer.orders
var: order
steps:
- type: print
template: "Processing order ${order.id} for ${customer.name}"
result_strategy: discard