Error Handling
Dataflow-rs provides flexible error handling at multiple levels to build resilient data pipelines.
Error Levels
Errors can be handled at three levels:
- Task Level - Individual task error handling
- Workflow Level - Workflow-wide error policy
- Engine Level - Processing errors
Task-Level Error Handling
Stop on Error (Default)
{
"id": "critical_task",
"continue_on_error": false,
"function": { ... }
}
If the task fails:
- Error is recorded in
message.errors - Workflow execution stops
- No further tasks execute
Continue on Error
{
"id": "optional_task",
"continue_on_error": true,
"function": { ... }
}
If the task fails:
- Error is recorded in
message.errors - Workflow continues to next task
Workflow-Level Error Handling
The workflow’s continue_on_error applies to all tasks by default:
{
"id": "resilient_workflow",
"continue_on_error": true,
"tasks": [
{"id": "task1", "function": { ... }},
{"id": "task2", "function": { ... }},
{"id": "task3", "function": { ... }}
]
}
All tasks will continue even if earlier tasks fail.
Override at Task Level
{
"id": "mixed_workflow",
"continue_on_error": true,
"tasks": [
{"id": "optional_task", "function": { ... }},
{
"id": "critical_task",
"continue_on_error": false,
"function": { ... }
}
]
}
Accessing Errors
After processing, check message.errors:
#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;
if !message.errors.is_empty() {
for error in &message.errors {
println!("Error: {} in {}/{}",
error.message,
error.workflow_id.as_deref().unwrap_or("unknown"),
error.task_id.as_deref().unwrap_or("unknown")
);
}
}
}
Error Types
Validation Errors
Generated by the validation function when rules fail:
{
"function": {
"name": "validation",
"input": {
"rules": [
{
"condition": {"!!": {"var": "data.email"}},
"error_message": "Email is required"
}
]
}
}
}
Execution Errors
Generated when function execution fails:
- JSONLogic evaluation errors
- Data type mismatches
- Missing required fields
Custom Function Errors
Return errors from custom functions:
#![allow(unused)]
fn main() {
impl AsyncFunctionHandler for MyFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
if some_condition {
return Err(DataflowError::ExecutionError(
"Custom error message".to_string()
));
}
Ok((200, vec![]))
}
}
}
Error Recovery Patterns
Fallback Values
Use conditions to provide fallback values:
{
"tasks": [
{
"id": "try_primary",
"continue_on_error": true,
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.result", "logic": {"var": "data.primary"}}
]
}
}
},
{
"id": "use_fallback",
"condition": {"!": {"var": "temp_data.result"}},
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": "default_value"}
]
}
}
}
]
}
Validation Before Processing
Validate data before critical operations:
{
"tasks": [
{
"id": "validate",
"function": {
"name": "validation",
"input": {
"rules": [
{"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
]
}
}
},
{
"id": "process",
"function": { ... }
}
]
}
If validation fails, the workflow stops before processing.
Try It
Notice the validation error is recorded but processing continues.
Best Practices
-
Validate Early
- Add validation tasks at the start of workflows
- Fail fast on invalid data
-
Use continue_on_error Wisely
- Only for truly optional operations
- Critical operations should stop on error
-
Check Errors
- Always check
message.errorsafter processing - Log errors for monitoring
- Always check
-
Provide Context
- Include meaningful error messages
- Include field paths in validation errors