Introduction
Dataflow-rs
A high-performance rules engine for IFTTT-style automation in Rust with zero-overhead JSONLogic evaluation.
Dataflow-rs is a lightweight rules engine that lets you define IF → THEN → THAT automation in JSON. Rules are evaluated using pre-compiled JSONLogic for zero runtime overhead, and actions execute asynchronously for high throughput.
Whether you’re routing events, validating data, building REST APIs, or creating automation pipelines, Dataflow-rs provides enterprise-grade performance with minimal complexity.
Key Features
- IF → THEN → THAT Model - Define rules with JSONLogic conditions, execute actions, chain with priority ordering
- Async-First Architecture - Native async/await support with Tokio for high-throughput processing
- Zero Runtime Compilation - All JSONLogic expressions pre-compiled at startup for optimal performance
- Full Context Access - Conditions can access any field:
data,metadata,temp_data - Execution Tracing - Step-by-step debugging with message snapshots after each action
- Built-in Functions - Parse, Map, Validate, Filter, Log, and Publish for complete data pipelines
- Pipeline Control Flow - Filter/gate function to halt workflows or skip tasks based on conditions
- Channel Routing - Route messages to specific workflow channels with O(1) lookup
- Workflow Lifecycle - Manage workflow status (active/paused/archived), versioning, and tagging
- Hot Reload - Swap workflows at runtime without re-registering custom functions
- Extensible - Easily add custom async actions to the engine
- Typed Integration Configs - Pre-validated configs for HTTP, Enrich, and Kafka integrations
- WebAssembly Support - Run rules in the browser with
@goplasmatic/dataflow-wasm - React UI Components - Visualize and debug rules with
@goplasmatic/dataflow-ui - Auditing - Track all changes to your data as it moves through the pipeline
Try It Now
Experience the power of dataflow-rs directly in your browser. Define a rule and message, then see the processing result instantly.
Want more features? Try the Full Debugger UI with step-by-step execution, breakpoints, and rule visualization.
How It Works
┌─────────────────────────────────────────────────────────────────┐
│ Rule (Workflow) │
│ │
│ IF condition matches → JSONLogic against any field │
│ THEN execute actions (tasks) → map, validate, custom logic │
│ THAT chain more rules → priority-ordered execution │
└─────────────────────────────────────────────────────────────────┘
- Define Rules - Create JSON-based rule definitions with conditions and actions
- Create an Engine - Initialize the rules engine (all logic compiled once at startup)
- Process Messages - Send messages through the engine for evaluation
- Get Results - Receive transformed data with full audit trail
Next Steps
- Installation - Add dataflow-rs to your project
- Quick Start - Build your first rule
- Playground - Experiment with rules interactively
Playground
Try dataflow-rs directly in your browser. Define rules, create messages, and see the processing results in real-time.
Looking for advanced debugging? Try the Full Debugger UI with step-by-step execution, breakpoints, rule visualization, and more!
How to Use
- Select an Example - Choose from the dropdown or write your own
- Edit Rules - Modify the rule JSON on the left panel
- Edit Message - Customize the input message on the right panel
- Process - Click “Process Message” or press
Ctrl+Enter - View Results - See the processed output with data, metadata, and audit trail
Tips
- JSONLogic - Use JSONLogic expressions in your rules for dynamic data access and transformation
- Multiple Actions - Add multiple actions (tasks) to a rule for sequential processing
- Multiple Rules - Define multiple rules that execute in priority order
- Conditions - Add conditions to actions or rules to control when they execute (conditions can access
data,metadata, andtemp_data) - Audit Trail - The output shows all changes made during processing
Installation
Add dataflow-rs to your Rust project using Cargo.
Requirements
- Rust 1.85 or later (Edition 2024)
- Cargo (comes with Rust)
Add to Cargo.toml
[dependencies]
dataflow-rs = "2.1"
serde_json = "1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }
Verify Installation
Create a simple test to verify the installation:
use dataflow_rs::Engine;
fn main() {
// Create an empty rules engine
let engine = Engine::new(vec![], None);
println!("Rules engine created with {} rules", engine.workflows().len());
}
Run with:
cargo run
You should see:
Rules engine created with 0 rules
Optional Dependencies
Depending on your use case, you may want to add:
[dependencies]
# For async operations
async-trait = "0.1"
# For custom error handling
thiserror = "2.0"
# For logging
log = "0.4"
env_logger = "0.11"
Next Steps
- Quick Start - Build your first rule
- Basic Concepts - Understand the core architecture
Quick Start
Build your first rule in minutes.
Create a Simple Rule
Rules are defined in JSON and consist of actions (tasks) that process data sequentially.
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::message::Message;
use serde_json::json;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Define a rule that transforms data
let rule_json = r#"{
"id": "greeting_rule",
"name": "Greeting Rule",
"tasks": [
{
"id": "create_greeting",
"name": "Create Greeting",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.greeting",
"logic": { "cat": ["Hello, ", {"var": "data.name"}, "!"] }
}
]
}
}
}
]
}"#;
// Parse the rule
let rule = Workflow::from_json(rule_json)?;
// Create the engine (compiles all logic at startup)
let engine = Engine::new(vec![rule], None);
// Create a message with payload
let payload = Arc::new(json!({"name": "World"}));
let mut message = Message::new(payload);
// Load payload into data context
message.context["data"]["name"] = json!("World");
// Process the message
engine.process_message(&mut message).await?;
// Print the result
println!("Greeting: {}", message.data()["greeting"]);
Ok(())
}
Try It Interactively
Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.
Understanding the Code
- Rule Definition - JSON structure defining actions (tasks) to execute
- Engine Creation - Compiles all JSONLogic expressions at startup
- Message Creation - Input data wrapped in a Message structure
- Processing - Engine evaluates each rule’s condition and executes matching actions
- Result - Modified message with transformed data and audit trail
Add Validation
Extend your rule with data validation:
{
"id": "validated_rule",
"name": "Validated Rule",
"tasks": [
{
"id": "validate_input",
"name": "Validate Input",
"function": {
"name": "validation",
"input": {
"rules": [
{
"logic": { "!!": {"var": "data.name"} },
"message": "Name is required"
}
]
}
}
},
{
"id": "create_greeting",
"name": "Create Greeting",
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.greeting",
"logic": { "cat": ["Hello, ", {"var": "data.name"}, "!"] }
}
]
}
}
}
]
}
Next Steps
- Basic Concepts - Understand the core architecture
- Map Function - Learn about data transformation
- Validation - Learn about data validation
Basic Concepts
Understanding the core components of dataflow-rs.
The IF → THEN → THAT Model
Dataflow-rs follows an IFTTT-style rules engine pattern:
- IF — Define conditions using JSONLogic (evaluated against
data,metadata,temp_data) - THEN — Execute actions: data transformation, validation, or custom async logic
- THAT — Chain multiple actions and rules with priority ordering
Architecture Overview
Dataflow-rs follows a two-phase architecture:
- Compilation Phase (Startup) - All JSONLogic expressions are compiled once
- Execution Phase (Runtime) - Messages are processed using compiled logic
┌─────────────────────────────────────────────────────────────┐
│ Compilation Phase │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Rules │ -> │ LogicCompiler│ -> │ Compiled Cache │ │
│ └──────────┘ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────────────────────────┐
│ Execution Phase │
│ ┌─────────┐ ┌────────┐ ┌──────────────────────────┐ │
│ │ Message │ -> │ Engine │ -> │ Processed Message │ │
│ └─────────┘ └────────┘ │ (data + audit trail) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Core Components
| Rules Engine | Workflow Engine | Description |
|---|---|---|
| RulesEngine | Engine | Central async component that evaluates rules and executes actions |
| Rule | Workflow | A condition + actions bundle — IF condition THEN execute actions |
| Action | Task | An individual processing step (map, validate, or custom function) |
Both naming conventions work — use whichever fits your mental model.
Engine (RulesEngine)
The central orchestrator that processes messages through rules.
#![allow(unused)]
fn main() {
use dataflow_rs::Engine;
// Create engine with rules (compiled at creation)
let engine = Engine::new(rules, custom_functions);
// Process messages (uses pre-compiled logic)
engine.process_message(&mut message).await?;
}
Rule (Workflow)
A collection of actions executed sequentially. Rules can have:
- Priority - Determines execution order (lower = first)
- Conditions - JSONLogic expression evaluated against the full context (
data,metadata,temp_data)
{
"id": "premium_order",
"name": "Premium Order Processing",
"priority": 1,
"condition": { ">=": [{"var": "data.order.total"}, 1000] },
"tasks": [...]
}
Action (Task)
An individual processing unit within a rule. Actions can:
- Execute built-in functions (map, validation)
- Execute custom functions
- Have conditions for conditional execution
{
"id": "apply_discount",
"name": "Apply Discount",
"condition": { "!!": {"var": "data.order.total"} },
"function": {
"name": "map",
"input": { ... }
}
}
Message
The data structure that flows through rules. Contains:
- context.data - Main data payload
- context.metadata - Message metadata
- context.temp_data - Temporary processing data
- audit_trail - Change history
- errors - Collected errors
#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;
let mut message = Message::new(&json!({
"name": "John",
"email": "john@example.com"
}));
// Access after processing
println!("Data: {:?}", message.context["data"]);
println!("Audit: {:?}", message.audit_trail);
}
Data Flow
- Input - Message created with initial data
- Rule Selection - Engine evaluates each rule’s condition
- Action Execution - Actions run sequentially within each matching rule
- Output - Message contains transformed data and audit trail
Message (input)
│
v
┌─────────────────────────────────────────┐
│ Rule 1 (priority: 1) │
│ Action 1 -> Action 2 -> Action 3 │
└─────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────┐
│ Rule 2 (priority: 2) │
│ Action 1 -> Action 2 │
└─────────────────────────────────────────┘
│
v
Message (output with audit trail)
JSONLogic
Dataflow-rs uses JSONLogic for:
- Conditions - Control when rules/actions execute (can access any context field)
- Data Access - Read values from message context
- Transformations - Transform and combine data
Common operations:
// Access data
{"var": "data.name"}
// String concatenation
{"cat": ["Hello, ", {"var": "data.name"}]}
// Conditionals
{"if": [{"var": "data.premium"}, "VIP", "Standard"]}
// Comparisons
{">=": [{"var": "data.order.total"}, 1000]}
Next Steps
- Rules Engine - Deep dive into the engine
- JSONLogic - Advanced JSONLogic usage
- Custom Functions - Extend with custom logic
Core Concepts Overview
Dataflow-rs is built around a small set of core concepts that work together to evaluate rules and execute actions efficiently.
The Big Picture
┌─────────────────────────────────────────────────────────────────────┐
│ Rules Engine (Engine) │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ Compiled Logic Cache │ │
│ │ (All JSONLogic pre-compiled at startup) │ │
│ └───────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Rule 1 │ │ Rule 2 │ │ Rule N │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │Action 1 │ │ │ │Action 1 │ │ │ │Action 1 │ │ │
│ │ │Action 2 │ │ │ │Action 2 │ │ │ │Action 2 │ │ │
│ │ │ ... │ │ │ │ ... │ │ │ │ ... │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
v
┌─────────────────────────────────────────────────────────────────────┐
│ Message │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │
│ │ data │ │ metadata │ │ temp_data │ │ audit_trail│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Component Summary
| Rules Engine | Workflow Engine | Purpose | Key Features |
|---|---|---|---|
| RulesEngine | Engine | Orchestrates processing | Pre-compiled logic, rule management |
| Rule | Workflow | Groups related actions | Priority ordering, conditions |
| Action | Task | Individual processing unit | Built-in or custom functions |
| — | — | Message | Data, metadata, audit trail |
Processing Flow
-
Engine Initialization
- Parse rule definitions
- Compile all JSONLogic expressions
- Store in indexed cache
-
Message Processing
- Create message with input data
- Engine evaluates each rule’s condition against the full context
- Matching rules execute in priority order
-
Action Execution
- Actions run sequentially within each rule
- Each action can modify message data
- Changes recorded in audit trail
-
Result
- Message contains transformed data
- Audit trail shows all modifications
- Errors collected (if any)
Key Design Principles
Pre-compilation
All JSONLogic expressions are compiled once at engine creation. This eliminates runtime parsing overhead and ensures consistent, predictable performance.
Immutability
Rules are immutable after engine creation. This enables safe concurrent processing and eliminates race conditions.
Separation of Concerns
- LogicCompiler handles all compilation
- InternalExecutor executes built-in functions
- Engine orchestrates the flow
Audit Trail
Every data modification is recorded, providing complete visibility into processing steps for debugging and compliance.
Detailed Documentation
- Rules Engine - The central orchestrator
- Rules (Workflows) - Condition + actions bundles
- Actions (Tasks) - Individual processing units
- Message - Data container with audit trail
- Error Handling - Managing failures gracefully
Rules Engine
The Engine (also available as RulesEngine type alias) is the central component that evaluates rules and orchestrates action execution.
Overview
The Engine is responsible for:
- Compiling all JSONLogic expressions at initialization
- Pre-sorting rules by priority at startup (no per-message sorting)
- Evaluating rule conditions against the full message context
- Processing messages through matching rules
- Channel-based routing with O(1) lookup
- Coordinating action execution
- Hot-reloading workflows without losing custom functions
Creating an Engine
#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow};
use std::collections::HashMap;
// Parse rules from JSON
let rule1 = Workflow::from_json(r#"{
"id": "rule1",
"name": "First Rule",
"priority": 1,
"tasks": [...]
}"#)?;
let rule2 = Workflow::from_json(r#"{
"id": "rule2",
"name": "Second Rule",
"priority": 2,
"tasks": [...]
}"#)?;
// Create engine with rules
let engine = Engine::new(
vec![rule1, rule2],
None // Optional custom functions
);
// Engine is now ready - all logic compiled
println!("Loaded {} rules", engine.workflows().len());
}
You can also use the RulesEngine type alias:
#![allow(unused)]
fn main() {
use dataflow_rs::RulesEngine;
let engine = RulesEngine::new(vec![rule1, rule2], None);
}
Processing Messages
#![allow(unused)]
fn main() {
use dataflow_rs::engine::message::Message;
use serde_json::json;
use std::sync::Arc;
// Create a message with payload
let payload = Arc::new(json!({
"user": "john",
"action": "login"
}));
let mut message = Message::new(payload);
// Process through all matching rules
engine.process_message(&mut message).await?;
// Access results
println!("Processed data: {:?}", message.data());
println!("Audit trail: {:?}", message.audit_trail);
}
Execution Tracing
For debugging, use process_message_with_trace to capture step-by-step execution:
#![allow(unused)]
fn main() {
let (mut message, trace) = engine.process_message_with_trace(&mut message).await?;
println!("Steps executed: {}", trace.executed_count());
println!("Steps skipped: {}", trace.skipped_count());
for step in &trace.steps {
println!("Rule: {}, Action: {:?}, Result: {:?}",
step.workflow_id, step.task_id, step.result);
}
}
Rule Execution Order
Rules execute in priority order (lowest priority number first):
#![allow(unused)]
fn main() {
// Priority 1 executes first
let high_priority = Workflow::from_json(r#"{
"id": "high",
"priority": 1,
"tasks": [...]
}"#)?;
// Priority 10 executes later
let low_priority = Workflow::from_json(r#"{
"id": "low",
"priority": 10,
"tasks": [...]
}"#)?;
}
Rule Conditions
Rules have conditions that determine if they should execute. Conditions are evaluated against the full message context — data, metadata, and temp_data:
{
"id": "premium_order",
"name": "Premium Order Processing",
"condition": { ">=": [{"var": "data.order.total"}, 1000] },
"tasks": [...]
}
The rule only executes if the condition evaluates to true.
Custom Functions
Register custom action handlers when creating the engine:
#![allow(unused)]
fn main() {
use dataflow_rs::engine::AsyncFunctionHandler;
use std::collections::HashMap;
let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> = HashMap::new();
custom_functions.insert("my_function".to_string(), Box::new(MyCustomFunction));
let engine = Engine::new(rules, Some(custom_functions));
}
Thread Safety
The Engine is designed for concurrent use:
- Rules are immutable after creation
- Compiled logic is shared via
Arc - Each message is processed independently
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;
let engine = Arc::new(Engine::new(rules, None));
// Process multiple messages concurrently
let handles: Vec<_> = messages.into_iter().map(|mut msg| {
let engine = Arc::clone(&engine);
task::spawn(async move {
engine.process_message(&mut msg).await
})
}).collect();
// Wait for all to complete
for handle in handles {
handle.await??;
}
}
API Reference
Engine::new(workflows, custom_functions)
Creates a new engine with the given rules and optional custom functions.
workflows: Vec<Workflow>- Rules to registercustom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler>>>- Custom action implementations
engine.process_message(&mut message)
Processes a message through all matching rules.
- Returns
Result<()>- Ok if processing succeeded - Message is modified in place with results and audit trail
engine.process_message_with_trace(&mut message)
Processes a message and returns an execution trace for debugging.
- Returns
Result<ExecutionTrace>- Contains all execution steps with message snapshots - Useful for step-by-step debugging and visualization
engine.workflows()
Returns a reference to the registered rules (sorted by priority).
#![allow(unused)]
fn main() {
let count = engine.workflows().len();
}
engine.workflow_by_id(id)
Find a specific workflow by its ID.
#![allow(unused)]
fn main() {
if let Some(workflow) = engine.workflow_by_id("my_rule") {
println!("Found: {}", workflow.name.as_deref().unwrap_or("unnamed"));
}
}
engine.process_message_for_channel(channel, message)
Processes a message through only the active workflows on a specific channel. Uses O(1) channel index lookup.
#![allow(unused)]
fn main() {
engine.process_message_for_channel("orders", &mut message).await?;
}
Only workflows with status: "active" are included in channel routing.
engine.process_message_for_channel_with_trace(channel, message)
Same as process_message_for_channel but returns an execution trace for debugging.
#![allow(unused)]
fn main() {
let trace = engine.process_message_for_channel_with_trace("orders", &mut message).await?;
}
engine.with_new_workflows(workflows)
Creates a new engine with different workflows while preserving custom function registrations. Useful for hot-reloading workflow definitions at runtime.
#![allow(unused)]
fn main() {
let new_workflows = vec![Workflow::from_json(r#"{ ... }"#)?];
let new_engine = engine.with_new_workflows(new_workflows);
// Old engine is still valid for in-flight messages
// New engine has freshly compiled logic + same custom functions
}
Rules (Workflows)
A Rule (also called Workflow) is a collection of actions that execute sequentially when a condition is met. This is the core IF → THEN unit: IF condition matches, THEN execute actions.
Overview
Rules provide:
- Conditional Execution - Only run when JSONLogic conditions are met (against full context:
data,metadata,temp_data) - Priority Ordering - Control execution order across rules
- Action Organization - Group related processing steps
- Error Handling - Continue or stop on errors
Rule Structure
{
"id": "premium_order",
"name": "Premium Order Processing",
"priority": 1,
"channel": "orders",
"version": 2,
"status": "active",
"tags": ["premium", "high-priority"],
"condition": { ">=": [{"var": "data.order.total"}, 1000] },
"continue_on_error": false,
"tasks": [
{
"id": "apply_discount",
"name": "Apply Discount",
"function": { ... }
},
{
"id": "notify_manager",
"name": "Notify Manager",
"function": { ... }
}
]
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique rule identifier |
name | string | No | Human-readable name |
priority | number | No | Execution order (default: 0, lower = first) |
condition | JSONLogic | No | When to execute rule (evaluated against full context) |
continue_on_error | boolean | No | Continue on action failure (default: false) |
tasks | array | Yes | Actions to execute |
channel | string | No | Channel for message routing (default: "default") |
version | number | No | Workflow version number (default: 1) |
status | string | No | Lifecycle status: active, paused, or archived (default: active) |
tags | array | No | Arbitrary tags for organization (default: []) |
created_at | datetime | No | Creation timestamp (ISO 8601) |
updated_at | datetime | No | Last update timestamp (ISO 8601) |
Creating Rules
From JSON String
#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;
let rule = Workflow::from_json(r#"{
"id": "my_rule",
"name": "My Rule",
"tasks": [...]
}"#)?;
}
Using the Convenience Constructor
#![allow(unused)]
fn main() {
use dataflow_rs::{Rule, Task};
use serde_json::json;
let rule = Rule::rule(
"premium_discount",
"Premium Discount",
json!({">=": [{"var": "data.order.total"}, 1000]}),
vec![/* actions */],
);
}
From File
#![allow(unused)]
fn main() {
let rule = Workflow::from_file("rules/my_rule.json")?;
}
Priority Ordering
Rules execute in priority order (lowest first). This enables the THAT (chaining) in the IF → THEN → THAT model:
// Executes first (priority 1) — validate input
{
"id": "validation",
"priority": 1,
"tasks": [...]
}
// Executes second (priority 2) — transform data
{
"id": "transformation",
"priority": 2,
"tasks": [...]
}
// Executes last (priority 10) — send notifications
{
"id": "notification",
"priority": 10,
"tasks": [...]
}
Conditional Execution
Use JSONLogic conditions to control when rules run. Conditions evaluate against the full message context — data, metadata, and temp_data:
{
"id": "premium_user_rule",
"condition": {
"and": [
{">=": [{"var": "data.order.total"}, 500]},
{"==": [{"var": "data.user.is_vip"}, true]}
]
},
"tasks": [...]
}
Common Condition Patterns
// Match on data fields
{">=": [{"var": "data.order.total"}, 1000]}
// Check data exists
{"!!": {"var": "data.email"}}
// Multiple conditions
{"and": [
{">=": [{"var": "data.amount"}, 100]},
{"==": [{"var": "data.currency"}, "USD"]}
]}
// Either condition
{"or": [
{"==": [{"var": "metadata.source"}, "api"]},
{"==": [{"var": "metadata.source"}, "webhook"]}
]}
Error Handling
Stop on Error (Default)
{
"id": "strict_rule",
"continue_on_error": false,
"tasks": [...]
}
If any action fails, the rule stops and the error is recorded.
Continue on Error
{
"id": "resilient_rule",
"continue_on_error": true,
"tasks": [...]
}
Actions continue executing even if previous actions fail. Errors are collected in message.errors.
Action Dependencies
Actions within a rule execute sequentially, allowing later actions to depend on earlier results:
{
"id": "pipeline",
"tasks": [
{
"id": "fetch_data",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.fetched", "logic": {"var": "data.source"}}
]
}
}
},
{
"id": "process_data",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": {"var": "temp_data.fetched"}}
]
}
}
}
]
}
Workflow Lifecycle
Workflows support lifecycle management with status, versioning, and tagging. All lifecycle fields are optional and backward-compatible.
Status
Control whether a workflow is active using the status field:
{"id": "my_rule", "status": "active", "tasks": [...]}
{"id": "old_rule", "status": "paused", "tasks": [...]}
{"id": "legacy_rule", "status": "archived", "tasks": [...]}
active(default) — the workflow executes normally and is included in channel routingpaused— the workflow is excluded from channel routing but still runs viaprocess_message()archived— same as paused; used to indicate permanently retired workflows
Channel Routing
Group workflows by channel for efficient message routing:
[
{"id": "order_validate", "channel": "orders", "priority": 1, "tasks": [...]},
{"id": "order_process", "channel": "orders", "priority": 2, "tasks": [...]},
{"id": "user_notify", "channel": "notifications", "priority": 1, "tasks": [...]}
]
Then route messages to a specific channel:
#![allow(unused)]
fn main() {
// Only runs workflows on the "orders" channel
engine.process_message_for_channel("orders", &mut message).await?;
}
Version and Tags
Use version and tags for workflow organization:
{
"id": "discount_rule",
"version": 3,
"tags": ["finance", "discount", "v3"],
"tasks": [...]
}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.
Try changing role to something other than “admin” to see the conditional rule skip.
Actions (Tasks)
An Action (also called Task) is an individual processing unit within a rule that executes a function. Actions are the THEN in the IF → THEN model.
Overview
Actions are the building blocks of rules. Each action:
- Executes a single function (built-in or custom)
- Can have a condition for conditional execution
- Can modify message data
- Records changes in the audit trail
Action Structure
{
"id": "apply_discount",
"name": "Apply Discount",
"condition": { ">=": [{"var": "data.order.total"}, 100] },
"continue_on_error": false,
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.order.discount",
"logic": {"*": [{"var": "data.order.total"}, 0.1]}
}
]
}
}
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique action identifier within rule |
name | string | No | Human-readable name |
condition | JSONLogic | No | When to execute action (evaluated against full context) |
continue_on_error | boolean | No | Continue rule on failure |
function | object | Yes | Function to execute |
Creating Actions Programmatically
#![allow(unused)]
fn main() {
use dataflow_rs::{Action, FunctionConfig};
let action = Action::action(
"apply_discount",
"Apply Discount",
function_config,
);
}
Function Configuration
The function object specifies what the action does:
{
"function": {
"name": "function_name",
"input": { ... }
}
}
Built-in Functions
| Function | Purpose |
|---|---|
map | Data transformation and field mapping |
validation | Data validation with custom error messages |
filter | Pipeline control flow — halt workflow or skip task |
log | Structured logging with JSONLogic expressions |
parse_json | Parse JSON from payload into data context |
parse_xml | Parse XML string into JSON data structure |
publish_json | Serialize data to JSON string |
publish_xml | Serialize data to XML string |
Custom Functions
Register custom functions when creating the engine:
#![allow(unused)]
fn main() {
let engine = Engine::new(rules, Some(custom_functions));
}
Then reference them by name in actions:
{
"function": {
"name": "my_custom_function",
"input": { ... }
}
}
Conditional Execution
Actions can have conditions that determine if they should run. Conditions evaluate against the full context (data, metadata, temp_data):
{
"id": "premium_greeting",
"condition": { "==": [{"var": "data.tier"}, "premium"] },
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.greeting", "logic": "Welcome, VIP member!"}
]
}
}
}
Common Patterns
// Only if field exists
{"!!": {"var": "data.email"}}
// Only if field equals value
{"==": [{"var": "data.status"}, "active"]}
// Only if numeric condition
{">=": [{"var": "data.amount"}, 100]}
// Combine conditions
{"and": [
{"!!": {"var": "data.email"}},
{"==": [{"var": "data.verified"}, true]}
]}
Error Handling
Action-Level Error Handling
{
"id": "optional_action",
"continue_on_error": true,
"function": { ... }
}
When continue_on_error is true:
- Action errors are recorded in
message.errors - Rule continues to the next action
Rule-Level Error Handling
The rule’s continue_on_error setting applies to all actions unless overridden.
Sequential Execution
Actions execute in order within a rule. Later actions can use results from earlier actions:
{
"tasks": [
{
"id": "step1",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.intermediate", "logic": {"var": "data.raw"}}
]
}
}
},
{
"id": "step2",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.final", "logic": {"var": "temp_data.intermediate"}}
]
}
}
}
]
}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.
Try changing tier to “standard” to see different discount applied.
Best Practices
- Unique IDs - Use descriptive, unique IDs for debugging
- Single Responsibility - Each action should do one thing well
- Use temp_data - Store intermediate results in
temp_data - Conditions - Add conditions to skip unnecessary processing
- Error Handling - Use
continue_on_errorfor optional actions
Message
A Message is the data container that flows through rules, carrying data, metadata, and an audit trail.
Overview
The Message structure contains:
- context.data - Main data payload
- context.metadata - Message metadata (routing, source info)
- context.temp_data - Temporary processing data
- audit_trail - Record of all changes
- errors - Collected errors during processing
Message Structure
#![allow(unused)]
fn main() {
pub struct Message {
pub id: Uuid,
pub payload: Arc<Value>,
pub context: Value, // Contains data, metadata, temp_data
pub audit_trail: Vec<AuditTrail>,
pub errors: Vec<ErrorInfo>,
}
}
The context is structured as:
{
"data": { ... },
"metadata": { ... },
"temp_data": { ... }
}
Creating Messages
Basic Creation
#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;
let mut message = Message::new(&json!({
"name": "John",
"email": "john@example.com"
}));
}
With Metadata
#![allow(unused)]
fn main() {
let mut message = Message::new(&json!({
"name": "John"
}));
message.context["metadata"] = json!({
"source": "api",
"type": "user",
"timestamp": "2024-01-01T00:00:00Z"
});
}
From Full Context
#![allow(unused)]
fn main() {
let mut message = Message::from_value(&json!({
"data": {
"name": "John"
},
"metadata": {
"source": "api"
},
"temp_data": {}
}));
}
Context Fields
data
The main data payload. This is where your primary data lives and is transformed.
#![allow(unused)]
fn main() {
// Set data
message.context["data"]["name"] = json!("John");
// Read data
let name = &message.context["data"]["name"];
}
metadata
Information about the message itself (not the data). Commonly used for:
- Routing decisions (rule conditions)
- Source tracking
- Timestamps
- Message type classification
#![allow(unused)]
fn main() {
message.context["metadata"] = json!({
"type": "user",
"source": "webhook",
"received_at": "2024-01-01T00:00:00Z"
});
}
temp_data
Temporary storage for intermediate processing results. Cleared between processing runs if needed.
#![allow(unused)]
fn main() {
// Store intermediate result
message.context["temp_data"]["calculated_value"] = json!(42);
// Use in later task
// {"var": "temp_data.calculated_value"}
}
Audit Trail
Every modification to message data is recorded:
#![allow(unused)]
fn main() {
#[derive(Debug)]
pub struct AuditTrail {
pub task_id: String,
pub workflow_id: String,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
}
pub struct Change {
pub path: Arc<str>,
pub old_value: Arc<Value>,
pub new_value: Arc<Value>,
}
}
Accessing Audit Trail
#![allow(unused)]
fn main() {
// After processing
for entry in &message.audit_trail {
println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
for change in &entry.changes {
println!(" {} -> {} at {}", change.old_value, change.new_value, change.path);
}
}
}
Error Handling
Errors are collected in message.errors:
#![allow(unused)]
fn main() {
for error in &message.errors {
println!("Error in {}/{}: {}",
error.workflow_id.as_deref().unwrap_or("unknown"),
error.task_id.as_deref().unwrap_or("unknown"),
error.message
);
}
}
JSONLogic Access
In rule conditions and mappings, access message fields using JSONLogic:
// Access data fields
{"var": "data.name"}
{"var": "data.user.email"}
// Access metadata
{"var": "metadata.type"}
{"var": "metadata.source"}
// Access temp_data
{"var": "temp_data.intermediate_result"}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice how temp_data is used to store an intermediate result.
Best Practices
-
Separate Concerns
- Use
datafor business data - Use
metadatafor routing and rule conditions - Use
temp_datafor intermediate results
- Use
-
Don’t Modify metadata in Tasks
- Metadata should remain stable for routing decisions
-
Clean temp_data
- Use
temp_datafor values only needed during processing
- Use
-
Check Audit Trail
- Use the audit trail for debugging and compliance
Error Handling
Dataflow-rs provides flexible error handling at multiple levels to build resilient automation rules.
Error Levels
Errors can be handled at three levels:
- Action Level - Individual action (task) error handling
- Rule Level - Rule-wide (workflow) error policy
- Engine Level - Processing errors
Action-Level Error Handling
Stop on Error (Default)
{
"id": "critical_action",
"continue_on_error": false,
"function": { ... }
}
If the action fails:
- Error is recorded in
message.errors - Rule execution stops
- No further actions execute
Continue on Error
{
"id": "optional_action",
"continue_on_error": true,
"function": { ... }
}
If the action fails:
- Error is recorded in
message.errors - Rule continues to next action
Rule-Level Error Handling
The rule’s continue_on_error applies to all actions by default:
{
"id": "resilient_rule",
"continue_on_error": true,
"tasks": [
{"id": "action1", "function": { ... }},
{"id": "action2", "function": { ... }},
{"id": "action3", "function": { ... }}
]
}
All actions will continue even if earlier actions fail.
Override at Action Level
{
"id": "mixed_rule",
"continue_on_error": true,
"tasks": [
{"id": "optional_action", "function": { ... }},
{
"id": "critical_action",
"continue_on_error": false,
"function": { ... }
}
]
}
Accessing Errors
After processing, check message.errors:
#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;
if !message.errors.is_empty() {
for error in &message.errors {
println!("Error: {} in {}/{}",
error.message,
error.workflow_id.as_deref().unwrap_or("unknown"),
error.task_id.as_deref().unwrap_or("unknown")
);
}
}
}
Error Types
Validation Errors
Generated by the validation function when rules fail:
{
"function": {
"name": "validation",
"input": {
"rules": [
{
"condition": {"!!": {"var": "data.email"}},
"error_message": "Email is required"
}
]
}
}
}
Execution Errors
Generated when function execution fails:
- JSONLogic evaluation errors
- Data type mismatches
- Missing required fields
Custom Function Errors
Return errors from custom functions:
#![allow(unused)]
fn main() {
impl AsyncFunctionHandler for MyFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
if some_condition {
return Err(DataflowError::ExecutionError(
"Custom error message".to_string()
));
}
Ok((200, vec![]))
}
}
}
Error Recovery Patterns
Fallback Values
Use conditions to provide fallback values:
{
"tasks": [
{
"id": "try_primary",
"continue_on_error": true,
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "temp_data.result", "logic": {"var": "data.primary"}}
]
}
}
},
{
"id": "use_fallback",
"condition": {"!": {"var": "temp_data.result"}},
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": "default_value"}
]
}
}
}
]
}
Validation Before Processing
Validate data before critical operations:
{
"tasks": [
{
"id": "validate",
"function": {
"name": "validation",
"input": {
"rules": [
{"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
]
}
}
},
{
"id": "process",
"function": { ... }
}
]
}
If validation fails, the rule stops before further processing.
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice the validation error is recorded but processing continues.
Best Practices
-
Validate Early
- Add validation actions at the start of rules
- Fail fast on invalid data
-
Use continue_on_error Wisely
- Only for truly optional actions
- Critical operations should stop on error
-
Check Errors
- Always check
message.errorsafter processing - Log errors for monitoring
- Always check
-
Provide Context
- Include meaningful error messages
- Include field paths in validation errors
Built-in Functions Overview
Dataflow-rs comes with built-in action functions for common data processing tasks, covering the complete lifecycle from parsing input to publishing output.
Available Functions
| Function | Purpose | Modifies Data |
|---|---|---|
parse_json | Parse JSON from payload into data context | Yes |
parse_xml | Parse XML string into JSON data structure | Yes |
map | Data transformation and field mapping | Yes |
validation | Rule-based data validation | No (read-only) |
filter | Pipeline control flow — halt workflow or skip task | No |
log | Structured logging with JSONLogic expressions | No |
publish_json | Serialize data to JSON string | Yes |
publish_xml | Serialize data to XML string | Yes |
Common Patterns
Complete Pipeline: Parse → Transform → Validate → Publish
{
"tasks": [
{
"id": "parse_input",
"function": {
"name": "parse_json",
"input": {
"source": "payload",
"target": "input"
}
}
},
{
"id": "transform",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.user.fullName", "logic": {"cat": [{"var": "data.input.firstName"}, " ", {"var": "data.input.lastName"}]}}
]
}
}
},
{
"id": "validate",
"function": {
"name": "validation",
"input": {
"rules": [
{"logic": {"!!": {"var": "data.user.fullName"}}, "message": "Full name required"}
]
}
}
},
{
"id": "publish",
"function": {
"name": "publish_json",
"input": {
"source": "user",
"target": "response",
"pretty": true
}
}
}
]
}
Conditional Transformation
{
"tasks": [
{
"id": "conditional_map",
"condition": {"==": [{"var": "data.tier"}, "premium"]},
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.discount", "logic": 20}
]
}
}
}
]
}
XML Processing Pipeline
{
"tasks": [
{
"id": "parse_xml_input",
"function": {
"name": "parse_xml",
"input": {
"source": "payload",
"target": "xmlData"
}
}
},
{
"id": "transform",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.response.status", "logic": "processed"}
]
}
}
},
{
"id": "publish_xml_output",
"function": {
"name": "publish_xml",
"input": {
"source": "response",
"target": "xmlOutput",
"root_element": "Response"
}
}
}
]
}
Function Configuration
All functions use this structure:
{
"function": {
"name": "function_name",
"input": {
// Function-specific configuration
}
}
}
Custom Functions
For operations beyond built-in functions, implement the AsyncFunctionHandler trait. See Custom Functions.
Learn More
- Parse Functions - JSON and XML parsing
- Map Function - Data transformation
- Validation Function - Rule-based validation
- Filter Function - Pipeline control flow (halt/skip)
- Log Function - Structured logging
- Publish Functions - JSON and XML serialization
Parse Functions
The parse functions convert payload data into structured context data. They are typically used at the start of a workflow to load input data into the processing context.
parse_json
Extracts JSON data from the payload or data context and stores it in a target field.
Configuration
{
"function": {
"name": "parse_json",
"input": {
"source": "payload",
"target": "input_data"
}
}
}
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
source | string | Yes | Path to read from: payload, payload.field, or data.field |
target | string | Yes | Field name in data where the result will be stored |
Examples
Parse Entire Payload
{
"id": "load_payload",
"function": {
"name": "parse_json",
"input": {
"source": "payload",
"target": "request"
}
}
}
Input:
{
"payload": {"name": "Alice", "age": 30}
}
Result:
{
"data": {
"request": {"name": "Alice", "age": 30}
}
}
Parse Nested Payload Field
{
"id": "extract_body",
"function": {
"name": "parse_json",
"input": {
"source": "payload.body.user",
"target": "user_data"
}
}
}
Input:
{
"payload": {
"headers": {},
"body": {
"user": {"id": 123, "name": "Bob"}
}
}
}
Result:
{
"data": {
"user_data": {"id": 123, "name": "Bob"}
}
}
parse_xml
Parses an XML string from the source path, converts it to JSON, and stores it in the target field.
Configuration
{
"function": {
"name": "parse_xml",
"input": {
"source": "payload",
"target": "xml_data"
}
}
}
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
source | string | Yes | Path to XML string: payload, payload.field, or data.field |
target | string | Yes | Field name in data where the parsed JSON will be stored |
XML to JSON Conversion
The XML parser follows these conventions:
- Element names become object keys
- Text content is stored under the element key
- Attributes are preserved in the JSON structure
- Multiple child elements with the same name become arrays
Examples
Parse XML Payload
{
"id": "parse_xml_request",
"function": {
"name": "parse_xml",
"input": {
"source": "payload",
"target": "request"
}
}
}
Input:
{
"payload": "<user><name>Alice</name><email>alice@example.com</email></user>"
}
Result:
{
"data": {
"request": {
"name": "Alice",
"email": "alice@example.com"
}
}
}
Parse Nested XML String
{
"id": "parse_xml_body",
"function": {
"name": "parse_xml",
"input": {
"source": "payload.xmlContent",
"target": "parsed"
}
}
}
Common Patterns
Load and Transform Pipeline
{
"tasks": [
{
"id": "load",
"function": {
"name": "parse_json",
"input": {"source": "payload", "target": "input"}
}
},
{
"id": "transform",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.output.name", "logic": {"var": "data.input.name"}}
]
}
}
}
]
}
Handle XML API Response
{
"tasks": [
{
"id": "parse_response",
"function": {
"name": "parse_xml",
"input": {"source": "payload.response", "target": "apiResponse"}
}
},
{
"id": "extract_data",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": {"var": "data.apiResponse.result"}}
]
}
}
}
]
}
Error Handling
- parse_json: Returns the source value as-is (even if null or not JSON)
- parse_xml: Returns an error if the source is not a string or if XML parsing fails
Next Steps
- Map Function - Transform the parsed data
- Validation Function - Validate the data structure
- Publish Functions - Serialize data for output
Map Function
The map function transforms and reorganizes data using JSONLogic expressions.
Overview
The map function:
- Evaluates JSONLogic expressions against message context
- Assigns results to specified paths
- Supports nested path creation
- Tracks changes for audit trail
Basic Usage
{
"function": {
"name": "map",
"input": {
"mappings": [
{
"path": "data.full_name",
"logic": {"cat": [{"var": "data.first_name"}, " ", {"var": "data.last_name"}]}
}
]
}
}
}
Configuration
| Field | Type | Required | Description |
|---|---|---|---|
mappings | array | Yes | List of mapping operations |
Mapping Object
| Field | Type | Required | Description |
|---|---|---|---|
path | string | Yes | Target path (e.g., “data.user.name”) |
logic | JSONLogic | Yes | Expression to evaluate |
Path Syntax
Dot Notation
Access and create nested structures:
{"path": "data.user.profile.name", "logic": "John"}
Creates: {"data": {"user": {"profile": {"name": "John"}}}}
Numeric Field Names
Use # prefix for numeric keys:
{"path": "data.items.#0", "logic": "first item"}
Creates: {"data": {"items": {"0": "first item"}}}
Root Field Assignment
Assigning to root fields (data, metadata, temp_data) merges objects:
{"path": "data", "logic": {"new_field": "value"}}
Merges into existing data rather than replacing it.
JSONLogic Expressions
Copy Value
{"path": "data.copy", "logic": {"var": "data.original"}}
Static Value
{"path": "data.status", "logic": "active"}
String Concatenation
{
"path": "data.greeting",
"logic": {"cat": ["Hello, ", {"var": "data.name"}, "!"]}
}
Conditional Value
{
"path": "data.tier",
"logic": {"if": [
{">=": [{"var": "data.points"}, 1000]}, "gold",
{">=": [{"var": "data.points"}, 500]}, "silver",
"bronze"
]}
}
Arithmetic
{
"path": "data.total",
"logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}
Array Operations
{
"path": "data.count",
"logic": {"reduce": [
{"var": "data.items"},
{"+": [{"var": "accumulator"}, 1]},
0
]}
}
Null Handling
If a JSONLogic expression evaluates to null, the mapping is skipped:
// If data.optional doesn't exist, this mapping is skipped
{"path": "data.copy", "logic": {"var": "data.optional"}}
Sequential Mappings
Mappings execute in order, allowing later mappings to use earlier results:
{
"mappings": [
{
"path": "temp_data.full_name",
"logic": {"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}
},
{
"path": "data.greeting",
"logic": {"cat": ["Hello, ", {"var": "temp_data.full_name"}]}
}
]
}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Common Patterns
Copy Between Contexts
// Copy from data to metadata
{"path": "metadata.user_id", "logic": {"var": "data.id"}}
// Copy from data to temp_data
{"path": "temp_data.original", "logic": {"var": "data.value"}}
Default Values
{
"path": "data.name",
"logic": {"if": [
{"!!": {"var": "data.name"}},
{"var": "data.name"},
"Unknown"
]}
}
Computed Fields
{
"path": "data.subtotal",
"logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}
Best Practices
- Use temp_data - Store intermediate results in temp_data
- Order Matters - Place dependencies before dependent mappings
- Check for Null - Handle missing fields with
ifor!!checks - Merge Root Fields - Use root assignment to merge, not replace
Validation Function
The validation function evaluates rules against message data and collects validation errors.
Overview
The validation function:
- Evaluates JSONLogic rules against message context
- Collects errors for failed validations
- Is read-only (doesn’t modify message data)
- Returns status 200 (pass) or 400 (fail)
Basic Usage
{
"function": {
"name": "validation",
"input": {
"rules": [
{
"logic": {"!!": {"var": "data.email"}},
"message": "Email is required"
},
{
"logic": {">": [{"var": "data.age"}, 0]},
"message": "Age must be positive"
}
]
}
}
}
Configuration
| Field | Type | Required | Description |
|---|---|---|---|
rules | array | Yes | List of validation rules |
Rule Object
| Field | Type | Required | Description |
|---|---|---|---|
logic | JSONLogic | Yes | Expression that must evaluate to true |
message | string | No | Error message (default: “Validation failed”) |
How Validation Works
- Each rule’s
logicis evaluated against the message context - If the result is exactly
true, the rule passes - Any other result (false, null, etc.) is a failure
- Failed rules add errors to
message.errors
Common Validation Patterns
Required Field
{
"logic": {"!!": {"var": "data.email"}},
"message": "Email is required"
}
Numeric Range
{
"logic": {"and": [
{">=": [{"var": "data.age"}, 18]},
{"<=": [{"var": "data.age"}, 120]}
]},
"message": "Age must be between 18 and 120"
}
String Length
{
"logic": {">=": [
{"length": {"var": "data.password"}},
8
]},
"message": "Password must be at least 8 characters"
}
Pattern Matching (with Regex)
{
"logic": {"regex_match": [
{"var": "data.email"},
"^[^@]+@[^@]+\\.[^@]+$"
]},
"message": "Invalid email format"
}
Conditional Required
{
"logic": {"or": [
{"!": {"var": "data.is_business"}},
{"!!": {"var": "data.company_name"}}
]},
"message": "Company name required for business accounts"
}
Value in List
{
"logic": {"in": [
{"var": "data.status"},
["active", "pending", "suspended"]
]},
"message": "Invalid status value"
}
Multiple Rules
All rules are evaluated, collecting all errors:
{
"rules": [
{
"logic": {"!!": {"var": "data.name"}},
"message": "Name is required"
},
{
"logic": {"!!": {"var": "data.email"}},
"message": "Email is required"
},
{
"logic": {">": [{"var": "data.amount"}, 0]},
"message": "Amount must be positive"
}
]
}
Accessing Errors
After processing, check message.errors:
#![allow(unused)]
fn main() {
for error in &message.errors {
println!("{}: {}", error.code, error.message);
}
}
Error structure:
code: “VALIDATION_ERROR” for failed rulesmessage: The error message from the rule
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice the validation errors in the output.
Validation with Continue on Error
Combine validation with data transformation:
{
"id": "validated_transform",
"continue_on_error": true,
"tasks": [
{
"id": "validate",
"function": {
"name": "validation",
"input": {
"rules": [...]
}
}
},
{
"id": "transform",
"function": {
"name": "map",
"input": {
"mappings": [...]
}
}
}
]
}
With continue_on_error: true, transformation proceeds even if validation fails.
Stop on Validation Failure
For strict validation (stop on failure):
{
"continue_on_error": false,
"tasks": [
{
"id": "validate",
"function": {"name": "validation", "input": {...}}
},
{
"id": "process",
"function": {"name": "map", "input": {...}}
}
]
}
If validation fails, subsequent tasks are skipped.
Best Practices
- Validate Early - Add validation as the first task
- Clear Messages - Write specific, actionable error messages
- Check All Rules - Validation evaluates all rules (doesn’t short-circuit)
- Use continue_on_error - Decide if processing should continue on failure
- Handle Errors - Always check
message.errorsafter processing
Filter (Pipeline Control Flow)
The filter function provides pipeline control flow by evaluating a JSONLogic condition and either halting the workflow or skipping the task when the condition is false.
Overview
Filter is a gate function — it doesn’t modify data but controls whether subsequent tasks execute. This enables patterns like:
- Guard clauses — halt a workflow early if prerequisites aren’t met
- Conditional branches — skip optional processing steps
- Data quality gates — stop processing if data doesn’t meet criteria
Configuration
{
"function": {
"name": "filter",
"input": {
"condition": { "JSONLogic expression" },
"on_reject": "halt | skip"
}
}
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
condition | JSONLogic | Yes | Condition to evaluate against the full message context |
on_reject | string | No | What to do when condition is false: "halt" (default) or "skip" |
Rejection Behavior
halt (default)
When the condition is false, the entire workflow stops — no further tasks in the workflow execute.
{
"id": "guard_active_status",
"name": "Check Active Status",
"function": {
"name": "filter",
"input": {
"condition": {"==": [{"var": "data.status"}, "active"]},
"on_reject": "halt"
}
}
}
If data.status is not "active", the workflow halts immediately. The halt is recorded in the audit trail.
skip
When the condition is false, only the current task is skipped — the workflow continues with the next task.
{
"id": "optional_premium_check",
"name": "Check Premium Tier",
"function": {
"name": "filter",
"input": {
"condition": {"==": [{"var": "data.tier"}, "premium"]},
"on_reject": "skip"
}
}
}
If the user is not premium, this task is skipped silently and the next task runs.
Examples
Guard Clause Pattern
Stop processing if required data is missing:
{
"id": "validation_pipeline",
"tasks": [
{
"id": "parse",
"function": { "name": "parse_json", "input": {"source": "payload", "target": "input"} }
},
{
"id": "require_email",
"function": {
"name": "filter",
"input": {
"condition": {"!!": {"var": "data.input.email"}},
"on_reject": "halt"
}
}
},
{
"id": "process",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.result", "logic": {"cat": ["Processed: ", {"var": "data.input.email"}]}}
]
}
}
}
]
}
Multi-Condition Gate
Combine conditions with JSONLogic and/or:
{
"id": "complex_gate",
"function": {
"name": "filter",
"input": {
"condition": {
"and": [
{">=": [{"var": "data.order.total"}, 100]},
{"==": [{"var": "data.order.currency"}, "USD"]},
{"!!": {"var": "data.order.shipping_address"}}
]
},
"on_reject": "halt"
}
}
}
Optional Processing Step
Use skip for non-critical conditional logic:
{
"tasks": [
{
"id": "apply_coupon",
"function": {
"name": "filter",
"input": {
"condition": {"!!": {"var": "data.coupon_code"}},
"on_reject": "skip"
}
}
},
{
"id": "process_coupon",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.discount", "logic": 10}
]
}
}
}
]
}
Status Codes
| Code | Meaning | Behavior |
|---|---|---|
200 | Pass | Condition was true, continue normally |
298 | Skip | Condition false + on_reject: skip — skip task, continue workflow |
299 | Halt | Condition false + on_reject: halt — stop workflow execution |
Notes
- The filter condition is pre-compiled at engine startup for zero runtime overhead
- Filter never modifies the message — it only controls execution flow
- When a workflow halts, the halt is recorded in the audit trail for debugging
- When a task is skipped, no audit trail entry is created
Log (Structured Logging)
The log function provides structured logging within workflows using the Rust log crate. Log messages and fields support JSONLogic expressions for dynamic content.
Overview
The log function allows you to:
- Emit structured log messages at any point in a workflow
- Use JSONLogic expressions for dynamic message content
- Attach structured fields for machine-readable log data
- Debug data flow without modifying the message
Configuration
{
"function": {
"name": "log",
"input": {
"level": "info",
"message": "JSONLogic expression or static string",
"fields": {
"field_name": "JSONLogic expression"
}
}
}
}
Fields
| Field | Type | Required | Description |
|---|---|---|---|
level | string | No | Log level: trace, debug, info (default), warn, error |
message | JSONLogic | Yes | The log message (evaluated as JSONLogic against message context) |
fields | object | No | Key-value pairs where values are JSONLogic expressions |
Log Levels
| Level | Use Case |
|---|---|
trace | Very detailed debugging (function entry/exit, variable values) |
debug | Debugging information (intermediate processing state) |
info | General informational messages (processing milestones) |
warn | Warning conditions (unusual but not erroneous states) |
error | Error conditions (failures that are handled) |
Examples
Simple Static Message
{
"id": "log_start",
"function": {
"name": "log",
"input": {
"level": "info",
"message": "Starting order processing"
}
}
}
Dynamic Message with JSONLogic
{
"id": "log_order",
"function": {
"name": "log",
"input": {
"level": "info",
"message": {"cat": ["Processing order ", {"var": "data.order.id"}, " for $", {"var": "data.order.total"}]},
"fields": {
"order_id": {"var": "data.order.id"},
"customer": {"var": "data.customer.name"},
"total": {"var": "data.order.total"}
}
}
}
}
Debug Logging
{
"id": "debug_state",
"function": {
"name": "log",
"input": {
"level": "debug",
"message": {"cat": ["Current data state: ", {"var": "data"}]},
"fields": {
"has_email": {"!!": {"var": "data.email"}},
"item_count": {"var": "data.items.length"}
}
}
}
}
Warning on Edge Cases
{
"id": "warn_missing",
"condition": {"!": {"var": "data.shipping_address"}},
"function": {
"name": "log",
"input": {
"level": "warn",
"message": {"cat": ["Order ", {"var": "data.order.id"}, " has no shipping address"]}
}
}
}
Log Target
All log messages are emitted with the target dataflow::log, making it easy to filter in your logging configuration:
#![allow(unused)]
fn main() {
// Using env_logger
RUST_LOG=dataflow::log=info cargo run
// Or filter specifically for dataflow logs
env_logger::Builder::new()
.filter_module("dataflow::log", log::LevelFilter::Debug)
.init();
}
Notes
- The log function never modifies the message — it is read-only
- The log function never fails — it always returns status 200 with no changes
- All JSONLogic expressions in
messageandfieldsare pre-compiled at engine startup - If a JSONLogic expression fails to evaluate, the raw expression value is logged instead
- The
fieldsare formatted askey=valuepairs appended to the log message
Publish Functions
The publish functions serialize structured data into string formats (JSON or XML). They are typically used at the end of a workflow to prepare output data for transmission or storage.
publish_json
Serializes data from the source field to a JSON string.
Configuration
{
"function": {
"name": "publish_json",
"input": {
"source": "output",
"target": "json_string"
}
}
}
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
source | string | Yes | - | Field name in data to serialize (e.g., output or nested.field) |
target | string | Yes | - | Field name where the JSON string will be stored |
pretty | boolean | No | false | Whether to pretty-print the JSON output |
Examples
Serialize Data to JSON
{
"id": "publish_response",
"function": {
"name": "publish_json",
"input": {
"source": "response",
"target": "responseBody"
}
}
}
Input:
{
"data": {
"response": {"status": "success", "count": 42}
}
}
Result:
{
"data": {
"response": {"status": "success", "count": 42},
"responseBody": "{\"status\":\"success\",\"count\":42}"
}
}
Pretty-Print JSON
{
"id": "publish_pretty",
"function": {
"name": "publish_json",
"input": {
"source": "user",
"target": "userJson",
"pretty": true
}
}
}
Result:
{
"data": {
"userJson": "{\n \"name\": \"Alice\",\n \"age\": 30\n}"
}
}
publish_xml
Serializes data from the source field to an XML string.
Configuration
{
"function": {
"name": "publish_xml",
"input": {
"source": "output",
"target": "xml_string",
"root_element": "Response"
}
}
}
Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
source | string | Yes | - | Field name in data to serialize |
target | string | Yes | - | Field name where the XML string will be stored |
root_element | string | No | root | Name of the root XML element |
JSON to XML Conversion
The serializer follows these rules:
- Object keys become XML element names
- Array items are wrapped in
<item>elements - Special characters are properly escaped (
<,>,&,",') - Invalid XML element names are sanitized (e.g., names starting with numbers get an underscore prefix)
Examples
Serialize Data to XML
{
"id": "publish_xml_response",
"function": {
"name": "publish_xml",
"input": {
"source": "user",
"target": "userXml",
"root_element": "User"
}
}
}
Input:
{
"data": {
"user": {"name": "Alice", "age": 30}
}
}
Result:
{
"data": {
"user": {"name": "Alice", "age": 30},
"userXml": "<User><name>Alice</name><age>30</age></User>"
}
}
Serialize Nested Data
{
"id": "publish_nested",
"function": {
"name": "publish_xml",
"input": {
"source": "response.data",
"target": "xmlOutput",
"root_element": "Data"
}
}
}
Common Patterns
Complete API Pipeline
{
"tasks": [
{
"id": "parse_request",
"function": {
"name": "parse_json",
"input": {"source": "payload", "target": "request"}
}
},
{
"id": "process",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.response.message", "logic": {"cat": ["Hello, ", {"var": "data.request.name"}]}}
]
}
}
},
{
"id": "publish_response",
"function": {
"name": "publish_json",
"input": {"source": "response", "target": "body"}
}
}
]
}
XML-to-XML Transformation
{
"tasks": [
{
"id": "parse_xml",
"function": {
"name": "parse_xml",
"input": {"source": "payload", "target": "input"}
}
},
{
"id": "transform",
"function": {
"name": "map",
"input": {
"mappings": [
{"path": "data.output.result", "logic": {"var": "data.input.value"}}
]
}
}
},
{
"id": "publish_xml",
"function": {
"name": "publish_xml",
"input": {"source": "output", "target": "xmlResponse", "root_element": "Result"}
}
}
]
}
Generate Both JSON and XML Outputs
{
"tasks": [
{
"id": "publish_json",
"function": {
"name": "publish_json",
"input": {"source": "response", "target": "jsonOutput"}
}
},
{
"id": "publish_xml",
"function": {
"name": "publish_xml",
"input": {"source": "response", "target": "xmlOutput", "root_element": "Response"}
}
}
]
}
Error Handling
- publish_json: Returns an error if the source field is not found or is null
- publish_xml: Returns an error if the source field is not found or is null
XML Element Name Sanitization
XML has strict rules for element names. The publish_xml function automatically sanitizes invalid names:
| Original | Sanitized |
|---|---|
123field | _123field |
field name | field_name |
field@attr | field_attr |
| `` (empty) | _element |
Next Steps
- Parse Functions - Parse input data
- Map Function - Transform data
- Validation Function - Validate before publishing
WebAssembly Package
The @goplasmatic/dataflow-wasm package provides WebAssembly bindings for dataflow-rs, enabling you to run the same rules engine in the browser that powers your Rust backend.
Installation
npm install @goplasmatic/dataflow-wasm
Quick Start
import init, { WasmEngine } from '@goplasmatic/dataflow-wasm';
// Initialize the WASM module (required once)
await init();
// Define your workflows
const workflows = [
{
id: 'my-workflow',
name: 'My Workflow',
tasks: [
{
id: 'transform',
name: 'Transform Data',
function: {
name: 'map',
input: {
mappings: [
{ path: 'data.output', logic: { var: 'data.input' } }
]
}
}
}
]
}
];
// Create the engine
const engine = new WasmEngine(workflows);
// Process a message
const message = {
data: { input: 'hello world' },
metadata: {}
};
const result = await engine.process(message);
console.log(result.data.output); // 'hello world'
API Reference
WasmEngine
The main class for executing rules.
class WasmEngine {
constructor(workflows: Workflow[]);
// Process a message through all matching rules
process(message: Message): Promise<Message>;
// Process with execution trace for debugging
processWithTrace(message: Message): Promise<ExecutionTrace>;
}
Types
interface Workflow {
id: string;
name: string;
condition?: JsonLogicValue; // Optional condition (evaluated against full context)
tasks: Task[];
}
interface Task {
id: string;
name: string;
condition?: JsonLogicValue; // Optional condition (evaluated against full context)
function: FunctionConfig;
}
interface FunctionConfig {
name: string; // 'map' or 'validation'
input: object;
}
interface Message {
data: object;
metadata: object;
payload?: object;
temp_data?: object;
}
Execution Tracing
For debugging, use processWithTrace to get step-by-step execution details:
const trace = await engine.processWithTrace(message);
console.log('Steps executed:', trace.steps.length);
console.log('Initial message:', trace.initial_message);
console.log('Final message:', trace.final_message);
for (const step of trace.steps) {
console.log(`Task: ${step.task_name}`);
console.log(`Changes: ${step.changes.length}`);
}
Building from Source
Requirements:
- Rust 1.70+
- wasm-pack
cd wasm
wasm-pack build --target web --out-dir pkg
The output will be in wasm/pkg/.
Browser Compatibility
The WASM package works in all modern browsers that support WebAssembly:
- Chrome 57+
- Firefox 52+
- Safari 11+
- Edge 16+
Next Steps
- UI Package - React visualization components
- Built-in Functions - Map, validation, and more
UI Package
The @goplasmatic/dataflow-ui package provides React components for visualizing and debugging dataflow-rs rules and workflows.
Installation
npm install @goplasmatic/dataflow-ui
Peer Dependencies
npm install react react-dom
Supports React 18.x and 19.x.
Quick Start
import { WorkflowVisualizer } from '@goplasmatic/dataflow-ui';
import '@goplasmatic/dataflow-ui/styles.css';
const workflows = [
{
id: 'my-workflow',
name: 'My Workflow',
tasks: [
{
id: 'task-1',
name: 'Transform Data',
function: {
name: 'map',
input: {
mappings: [
{ path: 'data.output', logic: { var: 'data.input' } }
]
}
}
}
]
}
];
function App() {
return (
<WorkflowVisualizer
workflows={workflows}
theme="system"
onTaskSelect={(task, workflow) => {
console.log('Selected task:', task.name);
}}
/>
);
}
Components
WorkflowVisualizer
The main component for displaying rules (workflows) in an interactive tree view.
interface WorkflowVisualizerProps {
/** Array of workflow definitions to display */
workflows: Workflow[];
/** Callback when a workflow is selected */
onWorkflowSelect?: (workflow: Workflow) => void;
/** Callback when a task is selected */
onTaskSelect?: (task: Task, workflow: Workflow) => void;
/** Theme: 'light', 'dark', or 'system' */
theme?: Theme;
/** Additional CSS class */
className?: string;
/** Execution result to display */
executionResult?: Message | null;
/** Enable debug mode */
debugMode?: boolean;
}
TreeView
Standalone tree view component for custom layouts.
import { TreeView } from '@goplasmatic/dataflow-ui';
<TreeView
workflows={workflows}
selection={currentSelection}
onSelect={handleSelect}
debugMode={false}
/>
Debug Mode
Enable step-by-step execution visualization with the debugger components.
import {
WorkflowVisualizer,
DebuggerProvider,
DebuggerControls,
defaultEngineFactory,
useDebugger
} from '@goplasmatic/dataflow-ui';
function DebugView() {
return (
<DebuggerProvider engineFactory={defaultEngineFactory}>
<WorkflowVisualizer
workflows={workflows}
debugMode={true}
/>
<DebuggerControls />
</DebuggerProvider>
);
}
Custom WASM Engine
Use a custom WASM engine with plugins or custom functions for debugging. Implement the DataflowEngine interface:
import {
WorkflowVisualizer,
DebuggerProvider,
DataflowEngine,
Workflow
} from '@goplasmatic/dataflow-ui';
import { MyCustomWasmEngine } from './my-custom-wasm';
class MyEngineAdapter implements DataflowEngine {
private engine: MyCustomWasmEngine;
constructor(workflows: Workflow[]) {
this.engine = new MyCustomWasmEngine(JSON.stringify(workflows));
}
async processWithTrace(payload: Record<string, unknown>) {
const result = await this.engine.process_with_trace(JSON.stringify(payload));
return JSON.parse(result);
}
dispose() {
this.engine.free();
}
}
function CustomDebugView() {
return (
<DebuggerProvider engineFactory={(workflows) => new MyEngineAdapter(workflows)}>
<WorkflowVisualizer workflows={workflows} debugMode={true} />
</DebuggerProvider>
);
}
The engineFactory is called whenever workflows change, ensuring the engine always has the latest workflow definitions.
Debugger Controls
import { DebuggerControls } from '@goplasmatic/dataflow-ui';
// Provides playback controls: play, pause, step forward/back, reset
<DebuggerControls />
useDebugger Hook
Access debugger state programmatically:
import { useDebugger } from '@goplasmatic/dataflow-ui';
function MyComponent() {
const {
state, // Current playback state
hasTrace, // Whether a trace is loaded
currentMessage, // Message at current step
currentChanges, // Changes made at current step
loadTrace, // Load an execution trace
stepForward, // Go to next step
stepBackward, // Go to previous step
play, // Start auto-playback
pause, // Pause playback
reset, // Reset to beginning
} = useDebugger();
// ...
}
Theming
The visualizer supports light, dark, and system themes.
// Light theme
<WorkflowVisualizer workflows={workflows} theme="light" />
// Dark theme
<WorkflowVisualizer workflows={workflows} theme="dark" />
// System preference (default)
<WorkflowVisualizer workflows={workflows} theme="system" />
Custom Theme Access
import { useTheme } from '@goplasmatic/dataflow-ui';
function MyComponent() {
const { theme, setTheme, resolvedTheme } = useTheme();
// resolvedTheme is 'light' or 'dark' (resolved from 'system')
}
Exports
Components
WorkflowVisualizer- Main visualization componentTreeView- Standalone tree viewDebuggerControls- Debug playback controlsDebuggerProvider- Debug context providerMessageInputPanel- Message input for debuggingMessageStatePanel- Message state displayJsonViewer- JSON display componentErrorBoundary- Error boundary wrapper
Hooks
useTheme- Theme state and controlsuseDebugger- Debugger state and controlsuseTaskDebugState- Debug state for a specific taskuseWorkflowDebugState- Debug state for a workflow
Engine
WasmEngineAdapter- Default WASM engine adapterdefaultEngineFactory- Factory function for default engineDataflowEngine- Interface for custom enginesEngineFactory- Type for engine factory functions
Types
All TypeScript types are exported for workflow definitions, tasks, messages, and execution traces.
Building from Source
cd ui
npm install
npm run build:lib
Output will be in ui/dist/.
Next Steps
- WASM Package - Run rules in the browser
- Core Concepts - Understand rules and actions
Custom Functions
Extend dataflow-rs with your own custom processing logic by implementing the AsyncFunctionHandler trait.
Overview
Custom functions allow you to:
- Add domain-specific processing logic
- Integrate with external systems
- Perform async operations (HTTP, database, etc.)
- Implement complex transformations
Implementing AsyncFunctionHandler
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
AsyncFunctionHandler,
FunctionConfig,
error::Result,
message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;
pub struct MyCustomFunction;
#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Your custom logic here
// Access input configuration
let input = &config.input;
// Modify message data
let old_value = message.context["data"]["processed"].clone();
message.context["data"]["processed"] = json!(true);
// Track changes for audit trail
let changes = vec![Change {
path: Arc::from("data.processed"),
old_value: Arc::new(old_value),
new_value: Arc::new(json!(true)),
}];
// Return status code and changes
// 200 = success, 400 = validation error, 500 = execution error
Ok((200, changes))
}
}
}
Registering Custom Functions
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::AsyncFunctionHandler;
// Create custom functions map
let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> =
HashMap::new();
// Register your function
custom_functions.insert(
"my_custom_function".to_string(),
Box::new(MyCustomFunction)
);
// Create engine with custom functions
let engine = Engine::new(workflows, Some(custom_functions));
}
Using Custom Functions in Rules
{
"id": "custom_rule",
"tasks": [
{
"id": "custom_action",
"function": {
"name": "my_custom_function",
"input": {
"option1": "value1",
"option2": 42
}
}
}
]
}
Accessing Configuration
For custom functions, extract configuration from the FunctionConfig::Custom variant:
#![allow(unused)]
fn main() {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Extract input from Custom variant
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => return Err(DataflowError::Validation("Invalid config".into())),
};
// Access input parameters
let option1 = input
.get("option1")
.and_then(Value::as_str)
.unwrap_or("default");
let option2 = input
.get("option2")
.and_then(Value::as_i64)
.unwrap_or(0);
// Use the parameters...
Ok((200, vec![]))
}
}
Using DataLogic
Evaluate JSONLogic expressions using the provided datalogic instance:
#![allow(unused)]
fn main() {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Get the context for evaluation
let context_arc = message.get_context_arc();
// Compile and evaluate a JSONLogic expression
let logic = json!({"var": "data.input"});
let compiled = datalogic.compile(&logic)?;
let result = datalogic.evaluate(&compiled, context_arc)?;
// Use the result...
Ok((200, vec![]))
}
}
Async Operations
Custom functions support async/await for I/O operations:
#![allow(unused)]
fn main() {
use reqwest;
pub struct HttpFetchFunction;
#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Extract URL from config
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => return Err(DataflowError::Validation("Invalid config".into())),
};
let url = input
.get("url")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation("Missing url".to_string()))?;
// Make HTTP request
let response = reqwest::get(url)
.await
.map_err(|e| DataflowError::Processing(e.to_string()))?;
let data: Value = response.json()
.await
.map_err(|e| DataflowError::Processing(e.to_string()))?;
// Store result
let old_value = message.data().get("fetched").cloned().unwrap_or(json!(null));
if let Some(data_obj) = message.data_mut().as_object_mut() {
data_obj.insert("fetched".to_string(), data.clone());
}
message.invalidate_context_cache();
let changes = vec![Change {
path: Arc::from("data.fetched"),
old_value: Arc::new(old_value),
new_value: Arc::new(data),
}];
Ok((200, changes))
}
}
}
Error Handling
Return appropriate errors for different failure modes:
#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
async fn execute(&self, ...) -> Result<(usize, Vec<Change>)> {
// Validation error
if some_validation_fails {
return Err(DataflowError::Validation("Invalid input".to_string()));
}
// Processing error
if some_operation_fails {
return Err(DataflowError::Processing("Operation failed".to_string()));
}
// Configuration error
if config_is_invalid {
return Err(DataflowError::Configuration("Invalid config".to_string()));
}
// Or return status codes without error
// 200 for success
// 400 for validation failure
// 500 for processing failure
Ok((200, vec![]))
}
}
Complete Example
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
AsyncFunctionHandler, FunctionConfig,
error::{DataflowError, Result},
message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;
/// Calculates statistics from numeric array data
pub struct StatisticsFunction;
#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Extract config from Custom variant
let input = match config {
FunctionConfig::Custom { input, .. } => input,
_ => return Err(DataflowError::Configuration("Invalid config".into())),
};
// Get the field to analyze
let field = input
.get("field")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation(
"Missing 'field' in config".to_string()
))?;
// Get the array from message
let data = message.data()
.get(field)
.and_then(Value::as_array)
.ok_or_else(|| DataflowError::Validation(
format!("Field '{}' is not an array", field)
))?
.clone();
// Calculate statistics
let numbers: Vec<f64> = data.iter()
.filter_map(|v| v.as_f64())
.collect();
if numbers.is_empty() {
return Err(DataflowError::Validation(
"No numeric values found".to_string()
));
}
let sum: f64 = numbers.iter().sum();
let count = numbers.len() as f64;
let mean = sum / count;
let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
// Build result
let stats = json!({
"count": count,
"sum": sum,
"mean": mean,
"min": min,
"max": max
});
// Store in message
let old_value = message.data().get("statistics").cloned().unwrap_or(json!(null));
if let Some(data_obj) = message.data_mut().as_object_mut() {
data_obj.insert("statistics".to_string(), stats.clone());
}
message.invalidate_context_cache();
let changes = vec![Change {
path: Arc::from("data.statistics"),
old_value: Arc::new(old_value),
new_value: Arc::new(stats),
}];
Ok((200, changes))
}
}
}
Best Practices
- Track Changes - Record all modifications for audit trail
- Validate Input - Check configuration before processing
- Handle Errors - Return appropriate error types
- Invalidate Cache - Call
message.invalidate_context_cache()after modifications - Document - Add clear documentation for your function
- Test - Write unit tests for your custom functions
JSONLogic
Dataflow-rs uses JSONLogic for conditions and data transformations.
Overview
JSONLogic is a way to write rules as JSON. It’s used in dataflow-rs for:
- Rule Conditions - Control when rules (workflows) execute, evaluated against the full context (
data,metadata,temp_data) - Action Conditions - Control when actions (tasks) execute
- Map Function - Transform and copy data
Basic Syntax
JSONLogic operations are objects with a single key (the operator) and value (the arguments):
{"operator": [argument1, argument2, ...]}
Data Access
var - Access Data
// Access top-level field
{"var": "data.name"}
// Access nested field
{"var": "data.user.profile.email"}
// Access array element
{"var": "data.items.0"}
// Default value if missing
{"var": ["data.optional", "default value"]}
Context Structure
In dataflow-rs, the context available to JSONLogic is:
{
"data": { ... },
"metadata": { ... },
"temp_data": { ... }
}
Access fields with:
{"var": "data.field"}
{"var": "metadata.type"}
{"var": "temp_data.intermediate"}
Comparison Operators
Equality
{"==": [{"var": "data.status"}, "active"]}
{"===": [{"var": "data.count"}, 0]} // Strict equality
{"!=": [{"var": "data.status"}, "deleted"]}
{"!==": [{"var": "data.count"}, null]} // Strict inequality
Numeric Comparisons
{">": [{"var": "data.age"}, 18]}
{">=": [{"var": "data.score"}, 60]}
{"<": [{"var": "data.price"}, 100]}
{"<=": [{"var": "data.quantity"}, 10]}
Between
{"<=": [1, {"var": "data.x"}, 10]} // 1 <= x <= 10
{"<": [1, {"var": "data.x"}, 10]} // 1 < x < 10
Boolean Logic
and, or, not
{"and": [
{">=": [{"var": "data.age"}, 18]},
{"==": [{"var": "data.verified"}, true]}
]}
{"or": [
{"==": [{"var": "data.status"}, "active"]},
{"==": [{"var": "data.status"}, "pending"]}
]}
{"!": {"var": "data.disabled"}}
Truthy/Falsy
// Check if value is truthy (not null, false, 0, "")
{"!!": {"var": "data.email"}}
// Check if value is falsy
{"!": {"var": "data.deleted"}}
Conditionals
if-then-else
{"if": [
{">=": [{"var": "data.score"}, 90]}, "A",
{">=": [{"var": "data.score"}, 80]}, "B",
{">=": [{"var": "data.score"}, 70]}, "C",
"F"
]}
Ternary
{"if": [
{"var": "data.premium"},
"VIP Customer",
"Standard Customer"
]}
String Operations
cat - Concatenation
{"cat": ["Hello, ", {"var": "data.name"}, "!"]}
substr - Substring
{"substr": [{"var": "data.text"}, 0, 10]} // First 10 characters
{"substr": [{"var": "data.text"}, -5]} // Last 5 characters
in - Contains
// Check if substring exists
{"in": ["@", {"var": "data.email"}]}
// Check if value in array
{"in": [{"var": "data.status"}, ["active", "pending"]]}
Numeric Operations
Arithmetic
{"+": [{"var": "data.a"}, {"var": "data.b"}]}
{"-": [{"var": "data.total"}, {"var": "data.discount"}]}
{"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
{"/": [{"var": "data.total"}, {"var": "data.count"}]}
{"%": [{"var": "data.n"}, 2]} // Modulo
Min/Max
{"min": [{"var": "data.a"}, {"var": "data.b"}, 100]}
{"max": [{"var": "data.x"}, 0]} // Ensure non-negative
Array Operations
merge - Combine Arrays
{"merge": [
{"var": "data.list1"},
{"var": "data.list2"}
]}
map - Transform Array
{"map": [
{"var": "data.items"},
{"*": [{"var": ""}, 2]} // Double each item
]}
filter - Filter Array
{"filter": [
{"var": "data.items"},
{">=": [{"var": ""}, 10]} // Items >= 10
]}
reduce - Aggregate Array
{"reduce": [
{"var": "data.items"},
{"+": [{"var": "accumulator"}, {"var": "current"}]},
0 // Initial value
]}
all/some/none
{"all": [{"var": "data.items"}, {">=": [{"var": ""}, 0]}]}
{"some": [{"var": "data.items"}, {"==": [{"var": ""}, "special"]}]}
{"none": [{"var": "data.items"}, {"<": [{"var": ""}, 0]}]}
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Common Patterns
Safe Field Access
// Default to empty string
{"var": ["data.optional", ""]}
// Check existence first
{"if": [
{"!!": {"var": "data.optional"}},
{"var": "data.optional"},
"default"
]}
Null Coalescing
{"if": [
{"!!": {"var": "data.primary"}},
{"var": "data.primary"},
{"var": "data.fallback"}
]}
Type Checking
// Check if string
{"===": [{"typeof": {"var": "data.field"}}, "string"]}
// Check if array
{"===": [{"typeof": {"var": "data.items"}}, "array"]}
Best Practices
- Use var Defaults - Provide defaults for optional fields
- Check Existence - Use
!!to verify field exists before use - Keep It Simple - Complex logic may be better in custom functions
- Test Expressions - Use the playground to test JSONLogic before deploying
Audit Trails
Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.
Overview
Every change to message data is recorded in the audit trail:
- What changed - Path and values (old and new)
- When it changed - Timestamp
- Which action - Rule (workflow) and action (task) identifiers
Audit Trail Structure
#![allow(unused)]
fn main() {
pub struct AuditTrail {
pub task_id: String,
pub workflow_id: String,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
}
pub struct Change {
pub path: Arc<str>,
pub old_value: Arc<Value>,
pub new_value: Arc<Value>,
}
}
Accessing the Audit Trail
After processing, the audit trail is available on the message:
#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;
for entry in &message.audit_trail {
println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
println!("Timestamp: {}", entry.timestamp);
for change in &entry.changes {
println!(" Path: {}", change.path);
println!(" Old: {}", change.old_value);
println!(" New: {}", change.new_value);
}
}
}
JSON Representation
In the playground output, the audit trail appears as:
{
"audit_trail": [
{
"task_id": "transform_data",
"workflow_id": "my_workflow",
"timestamp": "2024-01-01T12:00:00Z",
"changes": [
{
"path": "data.full_name",
"old_value": null,
"new_value": "John Doe"
},
{
"path": "data.greeting",
"old_value": null,
"new_value": "Hello, John Doe!"
}
]
}
]
}
What Gets Tracked
Map Function
Every mapping that modifies data creates a change entry:
{
"mappings": [
{"path": "data.name", "logic": "John"}
]
}
Creates:
{
"path": "data.name",
"old_value": null,
"new_value": "John"
}
Custom Functions
Custom functions should track changes for proper auditing:
#![allow(unused)]
fn main() {
let changes = vec![Change {
path: Arc::from("data.processed"),
old_value: Arc::new(old_value),
new_value: Arc::new(new_value),
}];
Ok((200, changes))
}
Validation Function
Validation is read-only, so it produces no audit trail entries.
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice the audit trail shows each step’s changes.
Use Cases
Debugging
Trace exactly how data was transformed:
#![allow(unused)]
fn main() {
// Find where a value was set
for entry in &message.audit_trail {
for change in &entry.changes {
if change.path.as_ref() == "data.total" {
println!("data.total set by {}/{}",
entry.workflow_id, entry.task_id);
println!("Changed from {} to {}",
change.old_value, change.new_value);
}
}
}
}
Compliance
Log all changes for regulatory compliance:
#![allow(unused)]
fn main() {
for entry in &message.audit_trail {
log_to_audit_system(
entry.timestamp,
entry.workflow_id.clone(),
entry.task_id.clone(),
&entry.changes
);
}
}
Change Detection
Detect if specific fields were modified:
#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
message.audit_trail.iter()
.flat_map(|e| e.changes.iter())
.any(|c| c.path.as_ref() == field)
}
if was_field_modified(&message, "data.price") {
// Price was changed during processing
}
}
Rollback (Conceptual)
The audit trail can be used to implement rollback:
#![allow(unused)]
fn main() {
fn get_original_value(message: &Message, field: &str) -> Option<&Value> {
message.audit_trail.iter()
.flat_map(|e| e.changes.iter())
.find(|c| c.path.as_ref() == field)
.map(|c| c.old_value.as_ref())
}
}
Best Practices
- Track All Changes - Custom functions should record all modifications
- Use Arc - Use
Arc<str>andArc<Value>for efficient sharing - Timestamp Accuracy - Timestamps are UTC for consistency
- Check Audit Trail - Review audit trail during development
- Log for Production - Persist audit trails for production debugging
Performance
Dataflow-rs is designed for high-performance rule evaluation and data processing with minimal overhead.
Architecture for Performance
Pre-compilation
All JSONLogic expressions are compiled once at engine startup:
#![allow(unused)]
fn main() {
// This compiles all logic at creation time
let engine = Engine::new(workflows, None);
// Runtime processing uses pre-compiled logic
// No parsing or compilation overhead
engine.process_message(&mut message).await?;
}
Benefits of Pre-compilation
- Zero runtime parsing - No JSON parsing during message processing
- Cached compiled logic - O(1) access to compiled expressions
- Early validation - Invalid expressions caught at startup
- Consistent latency - Predictable performance per message
Memory Efficiency
- Arc-wrapped compiled logic - Shared without copying
- Immutable workflows - Safe concurrent access
- Context caching - Avoids repeated JSON cloning
Benchmarking
Run the included benchmark:
cargo run --example benchmark --release
Sample Benchmark
#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;
// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::new(vec![workflow], None);
// Benchmark
let iterations = 10_000;
let start = Instant::now();
for _ in 0..iterations {
let mut message = Message::new(&test_data);
engine.process_message(&mut message).await?;
}
let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
}
Optimization Tips
1. Minimize Mappings
Combine related transformations:
// Less efficient: Multiple mappings
{
"mappings": [
{"path": "data.a", "logic": {"var": "data.source.a"}},
{"path": "data.b", "logic": {"var": "data.source.b"}},
{"path": "data.c", "logic": {"var": "data.source.c"}}
]
}
// More efficient: Single object mapping when possible
{
"mappings": [
{"path": "data", "logic": {"var": "data.source"}}
]
}
2. Use Conditions Wisely
Skip unnecessary processing with conditions:
{
"id": "expensive_task",
"condition": {"==": [{"var": "metadata.needs_processing"}, true]},
"function": { ... }
}
3. Order Rules by Frequency
Put frequently-executed rules earlier (lower priority):
{"id": "common_rule", "priority": 1, ...}
{"id": "rare_rule", "priority": 100, ...}
4. Use temp_data
Store intermediate results to avoid recomputation:
{
"mappings": [
{
"path": "temp_data.computed",
"logic": {"expensive": "computation"}
},
{
"path": "data.result1",
"logic": {"var": "temp_data.computed"}
},
{
"path": "data.result2",
"logic": {"var": "temp_data.computed"}
}
]
}
5. Avoid Unnecessary Validation
Validate only what’s necessary:
// Validate at system boundaries
{
"id": "input_validation",
"condition": {"==": [{"var": "metadata.source"}, "external"]},
"tasks": [
{"id": "validate", "function": {"name": "validation", ...}}
]
}
Concurrent Processing
Process multiple messages concurrently:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;
let engine = Arc::new(Engine::new(workflows, None));
let handles: Vec<_> = messages.into_iter()
.map(|mut msg| {
let engine = Arc::clone(&engine);
task::spawn(async move {
engine.process_message(&mut msg).await
})
})
.collect();
// Wait for all
for handle in handles {
handle.await??;
}
}
Thread Safety
- Engine is
Send + Sync - Compiled logic shared via
Arc - Each message processed independently
Memory Considerations
Large Messages
For very large messages, consider:
- Streaming - Process chunks instead of entire payload
- Selective Loading - Load only needed fields
- Cleanup temp_data - Clear intermediate results when done
Many Rules
For many rules:
- Organize by Domain - Group related rules
- Use Conditions - Skip irrelevant rules early
- Profile - Identify bottleneck rules
Profiling
Enable Logging
#![allow(unused)]
fn main() {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("debug")
).init();
}
Custom Metrics
#![allow(unused)]
fn main() {
use std::time::Instant;
let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();
metrics::histogram!("dataflow.processing_time", duration);
}
Production Recommendations
- Build with –release - Debug builds are significantly slower
- Pre-warm - Process a few messages at startup to warm caches
- Monitor - Track processing times and error rates
- Profile - Identify slow rules in production
- Scale Horizontally - Engine is stateless, scale with instances
API Reference
Quick reference for the main dataflow-rs types and methods.
Type Aliases
Dataflow-rs provides rules-engine aliases alongside the original workflow terminology:
| Rules Engine | Workflow Engine | Import |
|---|---|---|
RulesEngine | Engine | use dataflow_rs::RulesEngine; |
Rule | Workflow | use dataflow_rs::Rule; |
Action | Task | use dataflow_rs::Action; |
Both names refer to the same types — use whichever fits your mental model.
Engine (RulesEngine)
The central component that evaluates rules and processes messages.
#![allow(unused)]
fn main() {
use dataflow_rs::Engine; // or: use dataflow_rs::RulesEngine;
}
Constructor
#![allow(unused)]
fn main() {
pub fn new(
workflows: Vec<Workflow>,
custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>
) -> Engine
}
Creates a new engine with the given rules. All JSONLogic is compiled at creation.
Methods
#![allow(unused)]
fn main() {
// Process a message through all matching rules
pub async fn process_message(&self, message: &mut Message) -> Result<()>
// Process with execution trace for debugging
pub async fn process_message_with_trace(&self, message: &mut Message) -> Result<ExecutionTrace>
// Process only workflows on a specific channel (O(1) lookup)
pub async fn process_message_for_channel(&self, channel: &str, message: &mut Message) -> Result<()>
// Channel routing with execution trace
pub async fn process_message_for_channel_with_trace(&self, channel: &str, message: &mut Message) -> Result<ExecutionTrace>
// Get registered rules (sorted by priority)
pub fn workflows(&self) -> &Arc<Vec<Workflow>>
// Find a workflow by ID
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>
// Create a new engine with different workflows, preserving custom functions
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self
}
Workflow (Rule)
A collection of actions with optional conditions and priority.
#![allow(unused)]
fn main() {
use dataflow_rs::Workflow; // or: use dataflow_rs::Rule;
}
Constructors
#![allow(unused)]
fn main() {
// Parse from JSON string
pub fn from_json(json: &str) -> Result<Workflow>
// Load from file
pub fn from_file(path: &str) -> Result<Workflow>
// Convenience constructor for rules-engine pattern
pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self
}
JSON Schema
{
"id": "string (required)",
"name": "string (optional)",
"priority": "number (optional, default: 0)",
"condition": "JSONLogic (optional, evaluated against full context)",
"continue_on_error": "boolean (optional, default: false)",
"tasks": "array of Task (required)",
"channel": "string (optional, default: 'default')",
"version": "number (optional, default: 1)",
"status": "'active' | 'paused' | 'archived' (optional, default: 'active')",
"tags": "array of string (optional, default: [])",
"created_at": "ISO 8601 datetime (optional)",
"updated_at": "ISO 8601 datetime (optional)"
}
Task (Action)
An individual processing unit within a rule.
#![allow(unused)]
fn main() {
use dataflow_rs::Task; // or: use dataflow_rs::Action;
}
Constructor
#![allow(unused)]
fn main() {
// Convenience constructor for rules-engine pattern
pub fn action(id: &str, name: &str, function: FunctionConfig) -> Self
}
JSON Schema
{
"id": "string (required)",
"name": "string (optional)",
"condition": "JSONLogic (optional, evaluated against full context)",
"continue_on_error": "boolean (optional)",
"function": {
"name": "string (required)",
"input": "object (required)"
}
}
Message
The data container that flows through rules.
#![allow(unused)]
fn main() {
use dataflow_rs::Message;
}
Constructors
#![allow(unused)]
fn main() {
// Create from data (goes to context.data)
pub fn new(data: &Value) -> Message
// Create from full context value
pub fn from_value(context: &Value) -> Message
}
Fields
#![allow(unused)]
fn main() {
pub struct Message {
pub id: Uuid,
pub payload: Arc<Value>,
pub context: Value, // Contains data, metadata, temp_data
pub audit_trail: Vec<AuditTrail>,
pub errors: Vec<ErrorInfo>,
}
}
Methods
#![allow(unused)]
fn main() {
// Get context as Arc for efficient sharing
pub fn get_context_arc(&mut self) -> Arc<Value>
// Invalidate context cache after modifications
pub fn invalidate_context_cache(&mut self)
}
AsyncFunctionHandler
Trait for implementing custom action handlers.
#![allow(unused)]
fn main() {
use dataflow_rs::engine::AsyncFunctionHandler;
}
Trait Definition
#![allow(unused)]
fn main() {
#[async_trait]
pub trait AsyncFunctionHandler: Send + Sync {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)>;
}
}
Return Value
usize- Status code (200 = success, 400 = validation error, 500 = execution error)Vec<Change>- List of changes for audit trail
FunctionConfig
Configuration for function execution.
#![allow(unused)]
fn main() {
pub struct FunctionConfig {
pub name: String,
pub input: Value,
}
}
Change
Represents a single data modification.
#![allow(unused)]
fn main() {
pub struct Change {
pub path: Arc<str>,
pub old_value: Arc<Value>,
pub new_value: Arc<Value>,
}
}
AuditTrail
Records changes made by an action.
#![allow(unused)]
fn main() {
pub struct AuditTrail {
pub task_id: String,
pub workflow_id: String,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
}
}
ErrorInfo
Error information recorded in the message.
#![allow(unused)]
fn main() {
pub struct ErrorInfo {
pub code: String,
pub message: String,
pub workflow_id: Option<String>,
pub task_id: Option<String>,
}
}
DataflowError
Main error type for the library.
#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
}
Variants
#![allow(unused)]
fn main() {
pub enum DataflowError {
Validation(String),
Execution(String),
Logic(String),
Io(String),
// ... other variants
}
}
WorkflowStatus
Lifecycle status for workflows.
#![allow(unused)]
fn main() {
use dataflow_rs::WorkflowStatus;
}
Variants
#![allow(unused)]
fn main() {
pub enum WorkflowStatus {
Active, // Default — workflow executes normally
Paused, // Excluded from channel routing
Archived, // Permanently retired
}
}
Built-in Functions
map
Data transformation using JSONLogic.
{
"name": "map",
"input": {
"mappings": [
{
"path": "string",
"logic": "JSONLogic expression"
}
]
}
}
validation
Rule-based data validation.
{
"name": "validation",
"input": {
"rules": [
{
"logic": "JSONLogic expression",
"message": "string"
}
]
}
}
filter
Pipeline control flow — halt workflow or skip task.
{
"name": "filter",
"input": {
"condition": "JSONLogic expression",
"on_reject": "halt | skip (default: halt)"
}
}
Status codes: 200 (pass), 298 (skip), 299 (halt).
log
Structured logging with JSONLogic expressions.
{
"name": "log",
"input": {
"level": "trace | debug | info | warn | error (default: info)",
"message": "JSONLogic expression",
"fields": {
"key": "JSONLogic expression"
}
}
}
Always returns (200, []) — never modifies the message.
WASM API (dataflow-wasm)
For browser/JavaScript usage.
import init, { WasmEngine, process_message } from 'dataflow-wasm';
// Initialize
await init();
// Create engine
const engine = new WasmEngine(workflowsJson);
// Process with payload string (returns Promise)
const result = await engine.process(payloadStr);
// One-off convenience function (no engine needed)
const result2 = await process_message(workflowsJson, payloadStr);
// Get rule info
const count = engine.workflow_count();
const ids = engine.workflow_ids();
Full API Documentation
For complete API documentation, run:
cargo doc --open
This generates detailed documentation from the source code comments.