Rules (Workflows)
A Rule (also called Workflow) is a collection of actions that execute sequentially when a condition is met. This is the core IF → THEN unit: IF condition matches, THEN execute actions.
Overview
Rules provide:
- Conditional Execution - Only run when JSONLogic conditions are met (against full context:
data,metadata,temp_data) - Priority Ordering - Control execution order across rules
- Action Organization - Group related processing steps
- Error Handling - Continue or stop on errors
Rule Structure
{
"id": "premium_order",
"name": "Premium Order Processing",
"priority": 1,
"channel": "orders",
"version": 2,
"status": "active",
"tags": ["premium", "high-priority"],
"condition": { ">=": [{"var": "data.order.total"}, 1000] },
"continue_on_error": false,
"tasks": [
{
"id": "apply_discount",
"name": "Apply Discount",
"function": { ... }
},
{
"id": "notify_manager",
"name": "Notify Manager",
"function": { ... }
}
]
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique rule identifier |
name | string | No | Human-readable name |
priority | number | No | Execution order (default: 0, lower = first) |
condition | JSONLogic | No | When to execute rule (evaluated against full context) |
continue_on_error | boolean | No | Continue on action failure (default: false) |
tasks | array | Yes | Actions to execute |
channel | string | No | Channel for message routing (default: "default") |
version | number | No | Workflow version number (default: 1) |
status | string | No | Lifecycle status: active, paused, or archived (default: active) |
tags | array | No | Arbitrary tags for organization (default: []) |
created_at | datetime | No | Creation timestamp (ISO 8601) |
updated_at | datetime | No | Last update timestamp (ISO 8601) |
Creating Rules
From JSON String
#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;
let rule = Workflow::from_json(r#"{
"id": "my_rule",
"name": "My Rule",
"tasks": [...]
}"#)?;
}
Using the Convenience Constructor
#![allow(unused)]
fn main() {
use dataflow_rs::{Rule, Task};
use serde_json::json;
let rule = Rule::rule(
"premium_discount",
"Premium Discount",
json!({">=": [{"var": "data.order.total"}, 1000]}),
vec![/* actions */],
);
}
From File
#![allow(unused)]
fn main() {
let rule = Workflow::from_file("rules/my_rule.json")?;
}
Priority Ordering
Rules execute in priority order (lowest first). This enables the THAT (chaining) in the IF → THEN → THAT model:
// Executes first (priority 1) — validate input
{
"id": "validation",
"priority": 1,
"tasks": [...]
}
// Executes second (priority 2) — transform data
{
"id": "transformation",
"priority": 2,
"tasks": [...]
}
// Executes last (priority 10) — send notifications
{
"id": "notification",
"priority": 10,
"tasks": [...]
}
Conditional Execution
Use JSONLogic conditions to control when rules run. Conditions evaluate against the full message context — data, metadata, and temp_data:
{
"id": "premium_user_rule",
"condition": {
"and": [
{">=": [{"var": "data.order.total"}, 500]},
{"==": [{"var": "data.user.is_vip"}, true]}
]
},
"tasks": [...]
}
Common Condition Patterns
// Match on data fields
{">=": [{"var": "data.order.total"}, 1000]}
// Check data exists
{"!!": {"var": "data.email"}}
// Multiple conditions
{"and": [
{">=": [{"var": "data.amount"}, 100]},
{"==": [{"var": "data.currency"}, "USD"]}
]}
// Either condition
{"or": [
{"==": [{"var": "metadata.source"}, "api"]},
{"==": [{"var": "metadata.source"}, "webhook"]}
]}
Error Handling
Stop on Error (Default)
{
"id": "strict_rule",
"continue_on_error": false,
"tasks": [...]
}
If any action fails, the rule stops and the error is recorded.
Continue on Error
{
"id": "resilient_rule",
"continue_on_error": true,
"tasks": [...]
}
Actions continue executing even if previous actions fail. Errors are collected in message.errors.
Action Dependencies
Actions within a rule execute sequentially, allowing later actions to depend on earlier results:
{
"id": "pipeline",
"tasks": [
{
"id": "fetch_data",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.fetched", "logic": {"var": "data.source"}}
]
}
}
},
{
"id": "process_data",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": {"var": "temp_data.fetched"}}
]
}
}
}
]
}
Workflow Lifecycle
Workflows support lifecycle management with status, versioning, and tagging. All lifecycle fields are optional and backward-compatible.
Status
Control whether a workflow is active using the status field:
{"id": "my_rule", "status": "active", "tasks": [...]}
{"id": "old_rule", "status": "paused", "tasks": [...]}
{"id": "legacy_rule", "status": "archived", "tasks": [...]}
active(default) — the workflow executes normally and is included in channel routingpaused— the workflow is excluded from channel routing but still runs viaprocess_message()archived— same as paused; used to indicate permanently retired workflows
Channel Routing
Group workflows by channel for efficient message routing:
[
{"id": "order_validate", "channel": "orders", "priority": 1, "tasks": [...]},
{"id": "order_process", "channel": "orders", "priority": 2, "tasks": [...]},
{"id": "user_notify", "channel": "notifications", "priority": 1, "tasks": [...]}
]
Then route messages to a specific channel:
#![allow(unused)]
fn main() {
// Only runs workflows on the "orders" channel
engine.process_message_for_channel("orders", &mut message).await?;
}
Version and Tags
Use version and tags for workflow organization:
{
"id": "discount_rule",
"version": 3,
"tags": ["finance", "discount", "v3"],
"tasks": [...]
}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.
Try changing role to something other than “admin” to see the conditional rule skip.