step

for_each

Process each item in an array with a sequence of steps.

Overview

Process each item in an array with a sequence of steps. This step iterates over an array in your event data and executes a sequence of nested steps for each item. It's essential for batch processing, transforming lists, sending notifications to multiple recipients, or any scenario where you need to apply the same logic to multiple items. You can control how results are aggregated (collect all, replace array, or discard), whether items are processed in isolation or share context, and how errors are handled. The current item is exposed via a configurable variable name that you can reference in nested steps using template syntax like ${item.field}.

Quick Start

steps:
- type: for_each
  input_from: <string>
  steps: []

Configuration

Parameter Type Required Description
input_from string Yes Dot-delimited location of the source array within the event (for example 'items').
path string No DEPRECATED: Use 'input_from' instead. Dot-delimited location of the source array.
steps array Yes Nested step declarations that will be executed for each element. The list must be non-empty.
var string No Name of the variable injected into child steps when using shared scope or non-dict elements.
Default: "item"
result_strategy string No Controls how per-item results are written back: 'collect' (default) gathers results into a new array, 'replace' overwrites the target path with collected results, 'extend_array' appends results to the source list, 'merge_objects' flattens dict outputs into a single object, 'custom' mirrors 'collect' but signals intentional downstream handling.
Default: "collect"
Options: [object Object], [object Object], [object Object], [object Object], [object Object]
output_to string No Destination path for aggregated output. Defaults to ``input_from`` (the source array location) when omitted.
target_path string No DEPRECATED: Use 'output_to' instead. Destination path for aggregated output.
keep_original boolean No Reserved for future use. Present for backward compatibility with historical workflow definitions; the current implementation always preserves the source array.
Default: false
on_empty string No Behaviour when the source resolves to an empty list or null: 'skip' leaves the event unchanged, 'inject_null' writes null to the target, 'inject_empty' writes an empty list or object depending on the result strategy.
Default: "skip"
Options: [object Object], [object Object], [object Object]
error_mode string No 'fail_fast' raises when the source is not a list; 'continue' leaves the event untouched instead.
Default: "fail_fast"
Options: [object Object], [object Object]
item_envelope string No When using shared scope, 'wrap' stores the current element under {'value': ...}; 'none' injects the element directly.
Default: "none"
Options: [object Object], [object Object]
item_scope string No Processing scope: 'isolated' (default) copies each element into its own event, 'shared' reuses the outer event and writes the current element to ``var``.
Default: "isolated"
Options: [object Object], [object Object]

Examples

Transform array of items

Apply transformation to each item and collect results

type: for_each
input_from: products
var: product
steps:
  - type: python_function
    code: |
      def process(event):
          event['price_with_tax'] = event['price'] * 1.08
          event['formatted_name'] = event['name'].upper()
          return event
    handler: process
result_strategy: collect
output_to: processed_products

Send notifications to multiple users

Send an email or webhook to each user in a list

type: for_each
input_from: users
var: user
item_scope: isolated
steps:
  - type: http_call
    url: https://api.example.com/notify
    method: POST
    body:
      email: ${user.email}
      message: "Hello ${user.name}!"
result_strategy: discard

Filter and enrich array items

Process items with multiple nested steps

type: for_each
input_from: orders
var: order
steps:
  - type: filter
    groups:
      - conditions:
          - field: order.status
            op: eq
            value: pending
  - type: http_call
    url: https://api.example.com/enrich/${order.id}
    output_to: enrichment
result_strategy: collect
keep_original: false

Replace original array

Transform array in place without creating a new field

type: for_each
input_from: items
var: item
steps:
  - type: python_function
    code: |
      def process(event):
          event['processed'] = True
          event['timestamp'] = int(time.time())
          return event
    handler: process
result_strategy: replace

Parallel processing with error handling

Process items with graceful error handling

type: for_each
input_from: api_requests
var: request
item_scope: isolated
error_mode: skip_and_log
steps:
  - type: http_call
    url: ${request.endpoint}
    method: ${request.method}
    drop_on_failure: false
result_strategy: collect
output_to: api_responses

Nested loop processing

Iterate over nested arrays

type: for_each
input_from: customers
var: customer
steps:
  - type: for_each
    input_from: customer.orders
    var: order
    steps:
      - type: print
        template: "Processing order ${order.id} for ${customer.name}"
result_strategy: discard