Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Error Handling

Dataflow-rs provides flexible error handling at multiple levels to build resilient automation rules.

Two complementary error channels

Every error encountered during process_message flows through two complementary channels:

  • message.errors()always contains every error encountered: validation failures, task panics, 5xx-status outcomes, workflow wrappers. Callers that want a uniform view scan this list.
  • Result::Err from process_message — signals only that the engine stopped before processing every workflow. Callers that want fail-fast match on it; the error pushed to message.errors() for the same failure carries the workflow context that the bare Err doesn’t.

A workflow with continue_on_error: true records its errors to message.errors() and returns Ok(()). A workflow with continue_on_error: false records to message.errors() and returns Result::Err (which short-circuits the rest of process_message).

Error Levels

Errors can be handled at three levels:

  1. Action Level - Individual action (task) error handling
  2. Rule Level - Rule-wide (workflow) error policy
  3. Engine Level - Processing errors

Action-Level Error Handling

Stop on Error (Default)

{
    "id": "critical_action",
    "continue_on_error": false,
    "function": { ... }
}

If the action fails:

  • Error is recorded in message.errors()
  • Rule execution stops
  • No further actions execute

Continue on Error

{
    "id": "optional_action",
    "continue_on_error": true,
    "function": { ... }
}

If the action fails:

  • Error is recorded in message.errors()
  • Rule continues to next action

Rule-Level Error Handling

The rule’s continue_on_error applies to all actions by default:

{
    "id": "resilient_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "action1", "function": { ... }},
        {"id": "action2", "function": { ... }},
        {"id": "action3", "function": { ... }}
    ]
}

All actions will continue even if earlier actions fail.

Override at Action Level

{
    "id": "mixed_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "optional_action", "function": { ... }},
        {
            "id": "critical_action",
            "continue_on_error": false,
            "function": { ... }
        }
    ]
}

Accessing Errors

After processing, walk message.errors():

#![allow(unused)]
fn main() {
let result = engine.process_message(&mut message).await;

for error in message.errors() {
    println!("Error: {} in {}/{}",
        error.message,
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown")
    );
}

// Fail-fast signal — true when the engine stopped before all workflows ran.
if let Err(e) = result {
    eprintln!("engine stopped early: {e}");
}
}

Common error codes you’ll see:

  • VALIDATION_ERROR — from the validation built-in
  • TASK_ERROR — handler returned Result::Err
  • TASK_STATUS_ERROR — handler returned TaskOutcome::Status(s) with s >= 500
  • WORKFLOW_ERROR — wrapper recording workflow context for the failure above

Error Types

Validation Errors

Generated by the validation function when rules fail:

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "condition": {"!!": {"var": "data.email"}},
                    "error_message": "Email is required"
                }
            ]
        }
    }
}

Execution Errors

Generated when function execution fails:

  • JSONLogic evaluation errors
  • Data type mismatches
  • Missing required fields

Custom Function Errors

Return errors from custom functions via Result::Err:

use dataflow_rs::prelude::*;

impl AsyncFunctionHandler for MyFunction {
    type Input = serde_json::Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        _input: &serde_json::Value,
    ) -> Result<TaskOutcome> {
        if some_condition {
            return Err(DataflowError::Task(
                "Custom error message".to_string()
            ));
        }
        Ok(TaskOutcome::Success)
    }
}

DataflowError provides typed variants for the most common cases — Validation, Task, Workflow, FunctionExecution, FunctionNotFound, Http, Timeout, Io, LogicEvaluation, Deserialization, Unknown. See the API reference for the full list.

Error Recovery Patterns

Fallback Values

Use conditions to provide fallback values:

{
    "tasks": [
        {
            "id": "try_primary",
            "continue_on_error": true,
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.result", "logic": {"var": "data.primary"}}
                    ]
                }
            }
        },
        {
            "id": "use_fallback",
            "condition": {"!": {"var": "temp_data.result"}},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": "default_value"}
                    ]
                }
            }
        }
    ]
}

Validation Before Processing

Validate data before critical operations:

{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
                    ]
                }
            }
        },
        {
            "id": "process",
            "function": { ... }
        }
    ]
}

If validation fails, the rule stops before further processing.

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice the validation error is recorded but processing continues.

Best Practices

  1. Validate Early

    • Add validation actions at the start of rules
    • Fail fast on invalid data
  2. Use continue_on_error Wisely

    • Only for truly optional actions
    • Critical operations should stop on error
  3. Check Errors

    • Always check message.errors() after processing
    • Log errors for monitoring
  4. Provide Context

    • Include meaningful error messages
    • Include field paths in validation errors