Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Error Handling

Dataflow-rs provides flexible error handling at multiple levels to build resilient data pipelines.

Error Levels

Errors can be handled at three levels:

  1. Task Level - Individual task error handling
  2. Workflow Level - Workflow-wide error policy
  3. Engine Level - Processing errors

Task-Level Error Handling

Stop on Error (Default)

{
    "id": "critical_task",
    "continue_on_error": false,
    "function": { ... }
}

If the task fails:

  • Error is recorded in message.errors
  • Workflow execution stops
  • No further tasks execute

Continue on Error

{
    "id": "optional_task",
    "continue_on_error": true,
    "function": { ... }
}

If the task fails:

  • Error is recorded in message.errors
  • Workflow continues to next task

Workflow-Level Error Handling

The workflow’s continue_on_error applies to all tasks by default:

{
    "id": "resilient_workflow",
    "continue_on_error": true,
    "tasks": [
        {"id": "task1", "function": { ... }},
        {"id": "task2", "function": { ... }},
        {"id": "task3", "function": { ... }}
    ]
}

All tasks will continue even if earlier tasks fail.

Override at Task Level

{
    "id": "mixed_workflow",
    "continue_on_error": true,
    "tasks": [
        {"id": "optional_task", "function": { ... }},
        {
            "id": "critical_task",
            "continue_on_error": false,
            "function": { ... }
        }
    ]
}

Accessing Errors

After processing, check message.errors:

#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;

if !message.errors.is_empty() {
    for error in &message.errors {
        println!("Error: {} in {}/{}",
            error.message,
            error.workflow_id.as_deref().unwrap_or("unknown"),
            error.task_id.as_deref().unwrap_or("unknown")
        );
    }
}
}

Error Types

Validation Errors

Generated by the validation function when rules fail:

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "condition": {"!!": {"var": "data.email"}},
                    "error_message": "Email is required"
                }
            ]
        }
    }
}

Execution Errors

Generated when function execution fails:

  • JSONLogic evaluation errors
  • Data type mismatches
  • Missing required fields

Custom Function Errors

Return errors from custom functions:

#![allow(unused)]
fn main() {
impl AsyncFunctionHandler for MyFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        if some_condition {
            return Err(DataflowError::ExecutionError(
                "Custom error message".to_string()
            ));
        }
        Ok((200, vec![]))
    }
}
}

Error Recovery Patterns

Fallback Values

Use conditions to provide fallback values:

{
    "tasks": [
        {
            "id": "try_primary",
            "continue_on_error": true,
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.result", "logic": {"var": "data.primary"}}
                    ]
                }
            }
        },
        {
            "id": "use_fallback",
            "condition": {"!": {"var": "temp_data.result"}},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": "default_value"}
                    ]
                }
            }
        }
    ]
}

Validation Before Processing

Validate data before critical operations:

{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
                    ]
                }
            }
        },
        {
            "id": "process",
            "function": { ... }
        }
    ]
}

If validation fails, the workflow stops before processing.

Try It

Notice the validation error is recorded but processing continues.

Best Practices

  1. Validate Early

    • Add validation tasks at the start of workflows
    • Fail fast on invalid data
  2. Use continue_on_error Wisely

    • Only for truly optional operations
    • Critical operations should stop on error
  3. Check Errors

    • Always check message.errors after processing
    • Log errors for monitoring
  4. Provide Context

    • Include meaningful error messages
    • Include field paths in validation errors