Error Handling
Dataflow-rs provides flexible error handling at multiple levels to build resilient automation rules.
Two complementary error channels
Every error encountered during process_message flows through two
complementary channels:
message.errors()— always contains every error encountered: validation failures, task panics, 5xx-status outcomes, workflow wrappers. Callers that want a uniform view scan this list.Result::Errfromprocess_message— signals only that the engine stopped before processing every workflow. Callers that want fail-fast match on it; the error pushed tomessage.errors()for the same failure carries the workflow context that the bareErrdoesn’t.
A workflow with continue_on_error: true records its errors to
message.errors() and returns Ok(()). A workflow with
continue_on_error: false records to message.errors() and returns
Result::Err (which short-circuits the rest of process_message).
Error Levels
Errors can be handled at three levels:
- Action Level - Individual action (task) error handling
- Rule Level - Rule-wide (workflow) error policy
- Engine Level - Processing errors
Action-Level Error Handling
Stop on Error (Default)
{
"id": "critical_action",
"continue_on_error": false,
"function": { ... }
}
If the action fails:
- Error is recorded in
message.errors() - Rule execution stops
- No further actions execute
Continue on Error
{
"id": "optional_action",
"continue_on_error": true,
"function": { ... }
}
If the action fails:
- Error is recorded in
message.errors() - Rule continues to next action
Rule-Level Error Handling
The rule’s continue_on_error applies to all actions by default:
{
"id": "resilient_rule",
"continue_on_error": true,
"tasks": [
{"id": "action1", "function": { ... }},
{"id": "action2", "function": { ... }},
{"id": "action3", "function": { ... }}
]
}
All actions will continue even if earlier actions fail.
Override at Action Level
{
"id": "mixed_rule",
"continue_on_error": true,
"tasks": [
{"id": "optional_action", "function": { ... }},
{
"id": "critical_action",
"continue_on_error": false,
"function": { ... }
}
]
}
Accessing Errors
After processing, walk message.errors():
#![allow(unused)]
fn main() {
let result = engine.process_message(&mut message).await;
for error in message.errors() {
println!("Error: {} in {}/{}",
error.message,
error.workflow_id.as_deref().unwrap_or("unknown"),
error.task_id.as_deref().unwrap_or("unknown")
);
}
// Fail-fast signal — true when the engine stopped before all workflows ran.
if let Err(e) = result {
eprintln!("engine stopped early: {e}");
}
}
Common error codes you’ll see:
VALIDATION_ERROR— from thevalidationbuilt-inTASK_ERROR— handler returnedResult::ErrTASK_STATUS_ERROR— handler returnedTaskOutcome::Status(s)withs >= 500WORKFLOW_ERROR— wrapper recording workflow context for the failure above
Error Types
Validation Errors
Generated by the validation function when rules fail:
{
"function": {
"name": "validation",
"input": {
"rules": [
{
"condition": {"!!": {"var": "data.email"}},
"error_message": "Email is required"
}
]
}
}
}
Execution Errors
Generated when function execution fails:
- JSONLogic evaluation errors
- Data type mismatches
- Missing required fields
Custom Function Errors
Return errors from custom functions via Result::Err:
use dataflow_rs::prelude::*;
impl AsyncFunctionHandler for MyFunction {
type Input = serde_json::Value;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
_input: &serde_json::Value,
) -> Result<TaskOutcome> {
if some_condition {
return Err(DataflowError::Task(
"Custom error message".to_string()
));
}
Ok(TaskOutcome::Success)
}
}
DataflowError provides typed variants for the most common cases —
Validation, Task, Workflow, FunctionExecution, FunctionNotFound,
Http, Timeout, Io, LogicEvaluation, Deserialization, Unknown.
See the API reference for the full list.
Error Recovery Patterns
Fallback Values
Use conditions to provide fallback values:
{
"tasks": [
{
"id": "try_primary",
"continue_on_error": true,
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.result", "logic": {"var": "data.primary"}}
]
}
}
},
{
"id": "use_fallback",
"condition": {"!": {"var": "temp_data.result"}},
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": "default_value"}
]
}
}
}
]
}
Validation Before Processing
Validate data before critical operations:
{
"tasks": [
{
"id": "validate",
"function": {
"name": "validation",
"input": {
"rules": [
{"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
]
}
}
},
{
"id": "process",
"function": { ... }
}
]
}
If validation fails, the rule stops before further processing.
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice the validation error is recorded but processing continues.
Best Practices
-
Validate Early
- Add validation actions at the start of rules
- Fail fast on invalid data
-
Use continue_on_error Wisely
- Only for truly optional actions
- Critical operations should stop on error
-
Check Errors
- Always check
message.errors()after processing - Log errors for monitoring
- Always check
-
Provide Context
- Include meaningful error messages
- Include field paths in validation errors