Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Plasmatic Logo

Dataflow-rs

A high-performance rules engine for IFTTT-style automation in Rust with zero-overhead JSONLogic evaluation.



License: Apache 2.0 Rust Crates.io


Dataflow-rs is a lightweight rules engine that lets you define IF → THEN → THAT automation in JSON. Rules are evaluated using pre-compiled JSONLogic for zero runtime overhead, and actions execute asynchronously for high throughput.

Whether you’re routing events, validating data, building REST APIs, or creating automation pipelines, Dataflow-rs provides enterprise-grade performance with minimal complexity.

Key Features

  • IF → THEN → THAT Model - Define rules with JSONLogic conditions, execute actions, chain with priority ordering
  • Async-First Architecture - Native async/await support with Tokio for high-throughput processing
  • Zero Runtime Compilation - All JSONLogic expressions pre-compiled at startup for optimal performance
  • Full Context Access - Conditions can access any field: data, metadata, temp_data
  • Execution Tracing - Step-by-step debugging with message snapshots after each action
  • Built-in Functions - Parse, Map, Validate, Filter, Log, and Publish for complete data pipelines
  • Pipeline Control Flow - Filter/gate function to halt workflows or skip tasks based on conditions
  • Channel Routing - Route messages to specific workflow channels with O(1) lookup
  • Workflow Lifecycle - Manage workflow status (active/paused/archived), versioning, and tagging
  • Hot Reload - Swap workflows at runtime without re-registering custom functions
  • Extensible - Easily add custom async actions to the engine
  • Typed Integration Configs - Pre-validated configs for HTTP, Enrich, and Kafka integrations
  • WebAssembly Support - Run rules in the browser with @goplasmatic/dataflow-wasm
  • React UI Components - Visualize and debug rules with @goplasmatic/dataflow-ui
  • Auditing - Track all changes to your data as it moves through the pipeline

Try It Now

Experience the power of dataflow-rs directly in your browser. Define a rule and message, then see the processing result instantly.

Want more features? Try the Full Debugger UI with step-by-step execution, breakpoints, and rule visualization.

How It Works

┌─────────────────────────────────────────────────────────────────┐
│  Rule (Workflow)                                                │
│                                                                 │
│  IF    condition matches        →  JSONLogic against any field  │
│  THEN  execute actions (tasks)  →  map, validate, custom logic  │
│  THAT  chain more rules         →  priority-ordered execution   │
└─────────────────────────────────────────────────────────────────┘
  1. Define Rules - Create JSON-based rule definitions with conditions and actions
  2. Create an Engine - Initialize the rules engine (all logic compiled once at startup)
  3. Process Messages - Send messages through the engine for evaluation
  4. Get Results - Receive transformed data with full audit trail

Next Steps

Playground

Try dataflow-rs directly in your browser. Define rules, create messages, and see the processing results in real-time.

Looking for advanced debugging? Try the Full Debugger UI with step-by-step execution, breakpoints, rule visualization, and more!

How to Use

  1. Select an Example - Choose from the dropdown or write your own
  2. Edit Rules - Modify the rule JSON on the left panel
  3. Edit Message - Customize the input message on the right panel
  4. Process - Click “Process Message” or press Ctrl+Enter
  5. View Results - See the processed output with data, metadata, and audit trail

Tips

  • JSONLogic - Use JSONLogic expressions in your rules for dynamic data access and transformation
  • Multiple Actions - Add multiple actions (tasks) to a rule for sequential processing
  • Multiple Rules - Define multiple rules that execute in priority order
  • Conditions - Add conditions to actions or rules to control when they execute (conditions can access data, metadata, and temp_data)
  • Audit Trail - The output shows all changes made during processing

Installation

Add dataflow-rs to your Rust project using Cargo.

Requirements

  • Rust 1.85 or later (Edition 2024)
  • Cargo (comes with Rust)

Add to Cargo.toml

[dependencies]
dataflow-rs = "2.1"
serde_json = "1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }

Verify Installation

Create a simple test to verify the installation:

use dataflow_rs::Engine;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create an empty rules engine via the builder.
    let engine = Engine::builder().build()?;
    println!("Rules engine created with {} rules", engine.workflows().len());
    Ok(())
}

Run with:

cargo run

You should see:

Rules engine created with 0 rules

Optional Dependencies

Depending on your use case, you may want to add:

[dependencies]
# For async operations
async-trait = "0.1"

# For custom error handling
thiserror = "2.0"

# For logging
log = "0.4"
env_logger = "0.11"

Next Steps

Quick Start

Build your first rule in minutes.

Prerequisite reading: rules and mappings are written in JSONLogic. If you’ve never used it, skim the Data Access, Comparison Operators, and Conditionals sections first — they cover everything in the examples below.

Create a Simple Rule

Rules are defined in JSON and consist of actions (tasks) that process data sequentially.

use dataflow_rs::prelude::*;
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define a rule that loads the payload into `data.input` and then
    // transforms it. Letting `parse_json` seed `data` is the idiomatic
    // pattern — handlers don't have to reach into `message.context`.
    let rule_json = r#"{
        "id": "greeting_rule",
        "name": "Greeting Rule",
        "tasks": [
            {
                "id": "load",
                "name": "Load Payload",
                "function": {
                    "name": "parse_json",
                    "input": { "source": "payload", "target": "input" }
                }
            },
            {
                "id": "create_greeting",
                "name": "Create Greeting",
                "function": {
                    "name": "map",
                    "input": {
                        "mappings": [
                            {
                                "path": "data.greeting",
                                "logic": { "cat": ["Hello, ", {"var": "data.input.name"}, "!"] }
                            }
                        ]
                    }
                }
            }
        ]
    }"#;

    let rule = Workflow::from_json(rule_json)?;

    // Builder is the recommended construction path. Compiles all
    // JSONLogic up-front; fails loud on bad config.
    let engine = Engine::builder().with_workflow(rule).build()?;

    // Create a message from a serde_json payload. `parse_json` will copy
    // it into `data.input` at workflow start.
    let mut message = Message::from_value(&json!({"name": "World"}));

    // Process the message.
    engine.process_message(&mut message).await?;

    // Print the result.
    println!("Greeting: {:?}", message.data()["greeting"]);

    Ok(())
}

Try It Interactively

Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.

Understanding the Code

  1. Rule Definition - JSON structure defining actions (tasks) to execute
  2. Engine Creation - Compiles all JSONLogic expressions at startup
  3. Message Creation - Input data wrapped in a Message structure
  4. Processing - Engine evaluates each rule’s condition and executes matching actions
  5. Result - Modified message with transformed data and audit trail

Add Validation

Extend your rule with data validation:

{
    "id": "validated_rule",
    "name": "Validated Rule",
    "tasks": [
        {
            "id": "validate_input",
            "name": "Validate Input",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {
                            "logic": { "!!": {"var": "data.name"} },
                            "message": "Name is required"
                        }
                    ]
                }
            }
        },
        {
            "id": "create_greeting",
            "name": "Create Greeting",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {
                            "path": "data.greeting",
                            "logic": { "cat": ["Hello, ", {"var": "data.name"}, "!"] }
                        }
                    ]
                }
            }
        }
    ]
}

Next Steps

Basic Concepts

Understanding the core components of dataflow-rs.

The IF → THEN → THAT Model

Dataflow-rs follows an IFTTT-style rules engine pattern:

  • IF — Define conditions using JSONLogic (evaluated against data, metadata, temp_data)
  • THEN — Execute actions: data transformation, validation, or custom async logic
  • THAT — Chain multiple actions and rules with priority ordering

Architecture Overview

Dataflow-rs follows a two-phase architecture:

  1. Compilation Phase (Startup) - All JSONLogic expressions are compiled once
  2. Execution Phase (Runtime) - Messages are processed using compiled logic
┌─────────────────────────────────────────────────────────────┐
│                    Compilation Phase                         │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │  Rules   │ -> │ LogicCompiler│ -> │ Compiled Cache   │  │
│  └──────────┘    └──────────────┘    └──────────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
                              v
┌─────────────────────────────────────────────────────────────┐
│                    Execution Phase                           │
│  ┌─────────┐    ┌────────┐    ┌──────────────────────────┐ │
│  │ Message │ -> │ Engine │ -> │ Processed Message        │ │
│  └─────────┘    └────────┘    │ (data + audit trail)     │ │
│                               └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Core Components

Rules EngineWorkflow EngineDescription
RulesEngineEngineCentral async component that evaluates rules and executes actions
RuleWorkflowA condition + actions bundle — IF condition THEN execute actions
ActionTaskAn individual processing step (map, validate, or custom function)

Both naming conventions work — use whichever fits your mental model.

Engine (RulesEngine)

The central orchestrator that processes messages through rules.

#![allow(unused)]
fn main() {
use dataflow_rs::Engine;

// Create engine with rules (compiled at creation). Builder is the
// recommended path; .register("name", handler) chains in any custom
// AsyncFunctionHandler implementations.
let engine = Engine::builder()
    .with_workflows(rules)
    // .register("my_handler", MyHandler)
    .build()?;

// Process messages (uses pre-compiled logic)
engine.process_message(&mut message).await?;
}

Rule (Workflow)

A collection of actions executed sequentially. Rules can have:

  • Priority - Determines execution order (lower = first)
  • Conditions - JSONLogic expression evaluated against the full context (data, metadata, temp_data)
{
    "id": "premium_order",
    "name": "Premium Order Processing",
    "priority": 1,
    "condition": { ">=": [{"var": "data.order.total"}, 1000] },
    "tasks": [...]
}

Action (Task)

An individual processing unit within a rule. Actions can:

  • Execute built-in functions (map, validation)
  • Execute custom functions
  • Have conditions for conditional execution
{
    "id": "apply_discount",
    "name": "Apply Discount",
    "condition": { "!!": {"var": "data.order.total"} },
    "function": {
        "name": "map",
        "input": { ... }
    }
}

Message

The data structure that flows through rules. Contains:

  • context.data - Main data payload
  • context.metadata - Message metadata
  • context.temp_data - Temporary processing data
  • audit_trail - Change history
  • errors - Collected errors
#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

let mut message = Message::from_value(&json!({
    "name": "John",
    "email": "john@example.com"
}));

// Access after processing
println!("Data: {:?}", message.data());
println!("Audit: {:?}", message.audit_trail());
}

Data Flow

  1. Input - Message created with initial data
  2. Rule Selection - Engine evaluates each rule’s condition
  3. Action Execution - Actions run sequentially within each matching rule
  4. Output - Message contains transformed data and audit trail
Message (input)
    │
    v
┌─────────────────────────────────────────┐
│ Rule 1 (priority: 1)                    │
│   Action 1 -> Action 2 -> Action 3     │
└─────────────────────────────────────────┘
    │
    v
┌─────────────────────────────────────────┐
│ Rule 2 (priority: 2)                    │
│   Action 1 -> Action 2                 │
└─────────────────────────────────────────┘
    │
    v
Message (output with audit trail)

JSONLogic

Dataflow-rs uses JSONLogic for:

  • Conditions - Control when rules/actions execute (can access any context field)
  • Data Access - Read values from message context
  • Transformations - Transform and combine data

Common operations:

// Access data
{"var": "data.name"}

// String concatenation
{"cat": ["Hello, ", {"var": "data.name"}]}

// Conditionals
{"if": [{"var": "data.premium"}, "VIP", "Standard"]}

// Comparisons
{">=": [{"var": "data.order.total"}, 1000]}

Next Steps

Core Concepts Overview

Dataflow-rs is built around a small set of core concepts that work together to evaluate rules and execute actions efficiently.

The Big Picture

┌─────────────────────────────────────────────────────────────────────┐
│                     Rules Engine (Engine)                            │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    Compiled Logic Cache                        │  │
│  │  (All JSONLogic pre-compiled at startup)                       │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │
│  │  Rule 1     │  │  Rule 2     │  │  Rule N     │                 │
│  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │                 │
│  │ │Action 1 │ │  │ │Action 1 │ │  │ │Action 1 │ │                 │
│  │ │Action 2 │ │  │ │Action 2 │ │  │ │Action 2 │ │                 │
│  │ │ ...     │ │  │ │ ...     │ │  │ │ ...     │ │                 │
│  │ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │                 │
│  └─────────────┘  └─────────────┘  └─────────────┘                 │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              v
┌─────────────────────────────────────────────────────────────────────┐
│                          Message                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌────────────┐ │
│  │    data     │  │  metadata   │  │  temp_data  │  │ audit_trail│ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Component Summary

Rules EngineWorkflow EnginePurposeKey Features
RulesEngineEngineOrchestrates processingPre-compiled logic, rule management
RuleWorkflowGroups related actionsPriority ordering, conditions
ActionTaskIndividual processing unitBuilt-in or custom functions
MessageData, metadata, audit trail

Processing Flow

  1. Engine Initialization

    • Parse rule definitions
    • Compile all JSONLogic expressions
    • Store in indexed cache
  2. Message Processing

    • Create message with input data
    • Engine evaluates each rule’s condition against the full context
    • Matching rules execute in priority order
  3. Action Execution

    • Actions run sequentially within each rule
    • Each action can modify message data
    • Changes recorded in audit trail
  4. Result

    • Message contains transformed data
    • Audit trail shows all modifications
    • Errors collected (if any)

Key Design Principles

Pre-compilation

All JSONLogic expressions are compiled once at engine creation. This eliminates runtime parsing overhead and ensures consistent, predictable performance.

Immutability

Rules are immutable after engine creation. This enables safe concurrent processing and eliminates race conditions.

Separation of Concerns

  • LogicCompiler handles all compilation
  • InternalExecutor executes built-in functions
  • Engine orchestrates the flow

Audit Trail

Every data modification is recorded, providing complete visibility into processing steps for debugging and compliance.

Detailed Documentation

Rules Engine

The Engine (also available as RulesEngine type alias) is the central component that evaluates rules and orchestrates action execution.

Overview

The Engine is responsible for:

  • Compiling all JSONLogic expressions at initialization
  • Pre-sorting rules by priority at startup (no per-message sorting)
  • Evaluating rule conditions against the full message context
  • Processing messages through matching rules
  • Channel-based routing with O(1) lookup
  • Coordinating action execution
  • Hot-reloading workflows without losing custom functions

Creating an Engine

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow};

// Parse rules from JSON
let rule1 = Workflow::from_json(r#"{
    "id": "rule1",
    "name": "First Rule",
    "priority": 1,
    "tasks": [...]
}"#)?;

let rule2 = Workflow::from_json(r#"{
    "id": "rule2",
    "name": "Second Rule",
    "priority": 2,
    "tasks": [...]
}"#)?;

// Builder is the recommended construction path.
let engine = Engine::builder()
    .with_workflow(rule1)
    .with_workflow(rule2)
    // .register("my_handler", MyHandler)  // chain custom handlers here
    .build()?;

// Engine is now ready — all JSONLogic compiled, Custom inputs typed.
println!("Loaded {} rules", engine.workflows().len());
}

You can also use the RulesEngine type alias:

#![allow(unused)]
fn main() {
use dataflow_rs::RulesEngine;

let engine = RulesEngine::builder()
    .with_workflows([rule1, rule2])
    .build()?;
}

Processing Messages

#![allow(unused)]
fn main() {
use dataflow_rs::engine::message::Message;
use serde_json::json;

// Bridge from serde_json::Value — handiest when payloads come from JSON
let mut message = Message::from_value(&json!({
    "user": "john",
    "action": "login"
}));

// Process through all matching rules
engine.process_message(&mut message).await?;

// Access results
println!("Processed data: {:?}", message.data());
println!("Audit trail: {:?}", message.audit_trail());
}

If you already have an Arc<OwnedDataValue> payload, use Message::new to skip the serde_json bridge:

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
use std::sync::Arc;

let payload = Arc::new(OwnedDataValue::from(&json!({"user": "john"})));
let mut message = Message::new(payload);
}

Execution Tracing

For debugging, use process_message_with_trace to capture step-by-step execution:

#![allow(unused)]
fn main() {
let trace = engine.process_message_with_trace(&mut message).await?;

println!("Steps executed: {}", trace.executed_count());
println!("Steps skipped: {}", trace.skipped_count());

for step in &trace.steps {
    println!("Rule: {}, Action: {:?}, Result: {:?}",
        step.workflow_id, step.task_id, step.result);
}
}

Rule Execution Order

Rules execute in priority order (lowest priority number first):

#![allow(unused)]
fn main() {
// Priority 1 executes first
let high_priority = Workflow::from_json(r#"{
    "id": "high",
    "priority": 1,
    "tasks": [...]
}"#)?;

// Priority 10 executes later
let low_priority = Workflow::from_json(r#"{
    "id": "low",
    "priority": 10,
    "tasks": [...]
}"#)?;
}

Rule Conditions

Rules have conditions that determine if they should execute. Conditions are evaluated against the full message contextdata, metadata, and temp_data:

{
    "id": "premium_order",
    "name": "Premium Order Processing",
    "condition": { ">=": [{"var": "data.order.total"}, 1000] },
    "tasks": [...]
}

The rule only executes if the condition evaluates to true.

Custom Functions

Register custom action handlers via the builder. register("name", handler) accepts any AsyncFunctionHandler and boxes it internally; the engine pre-parses each FunctionConfig::Custom input JSON into the handler’s typed Self::Input at .build() time, so mis-shaped configs fail at startup, not on first message.

#![allow(unused)]
fn main() {
let engine = Engine::builder()
    .with_workflows(rules)
    .register("my_function", MyCustomFunction)
    .build()?;
}

Thread Safety

The Engine is designed for concurrent use:

  • Rules are immutable after creation
  • Compiled logic is shared via Arc
  • Each message is processed independently
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::builder().with_workflows(rules).build()?);

// Process multiple messages concurrently
let handles: Vec<_> = messages.into_iter().map(|mut msg| {
    let engine = Arc::clone(&engine);
    task::spawn(async move {
        engine.process_message(&mut msg).await
    })
}).collect();

// Wait for all to complete
for handle in handles {
    handle.await??;
}
}

API Reference

Engine::builder()

Returns an EngineBuilder. Chain .register("name", handler), .with_workflow(w), .with_workflows(iter), then .build() -> Result<Engine>. Recommended construction path.

Engine::new(workflows, custom_functions)

Lower-level escape hatch — accepts rules and a plain handler HashMap (use HashMap::new() for no custom handlers, or — preferred — go through the builder).

  • workflows: Vec<Workflow> — Rules to register
  • custom_functions: HashMap<String, BoxedFunctionHandler> — Custom action implementations

engine.process_message(&mut message)

Processes a message through all matching rules.

  • Returns Result<()> - Ok if processing succeeded
  • Message is modified in place with results and audit trail

engine.process_message_with_trace(&mut message)

Processes a message and returns an execution trace for debugging.

  • Returns Result<ExecutionTrace> - Contains all execution steps with message snapshots
  • Useful for step-by-step debugging and visualization

engine.workflows()

Returns a reference to the registered rules (sorted by priority).

#![allow(unused)]
fn main() {
let count = engine.workflows().len();
}

engine.workflow_by_id(id)

Find a specific workflow by its ID.

#![allow(unused)]
fn main() {
if let Some(workflow) = engine.workflow_by_id("my_rule") {
    println!("Found: {}", workflow.name.as_deref().unwrap_or("unnamed"));
}
}

engine.process_message_for_channel(channel, message)

Processes a message through only the active workflows on a specific channel. Uses O(1) channel index lookup.

#![allow(unused)]
fn main() {
engine.process_message_for_channel("orders", &mut message).await?;
}

Only workflows with status: "active" are included in channel routing.

engine.process_message_for_channel_with_trace(channel, message)

Same as process_message_for_channel but returns an execution trace for debugging.

#![allow(unused)]
fn main() {
let trace = engine.process_message_for_channel_with_trace("orders", &mut message).await?;
}

engine.with_new_workflows(workflows)

Creates a new engine with different workflows while preserving custom function registrations. Useful for hot-reloading workflow definitions at runtime.

#![allow(unused)]
fn main() {
let new_workflows = vec![Workflow::from_json(r#"{ ... }"#)?];
let new_engine = engine.with_new_workflows(new_workflows);

// Old engine is still valid for in-flight messages
// New engine has freshly compiled logic + same custom functions
}

Rules (Workflows)

A Rule (also called Workflow) is a collection of actions that execute sequentially when a condition is met. This is the core IF → THEN unit: IF condition matches, THEN execute actions.

Overview

Rules provide:

  • Conditional Execution - Only run when JSONLogic conditions are met (against full context: data, metadata, temp_data)
  • Priority Ordering - Control execution order across rules
  • Action Organization - Group related processing steps
  • Error Handling - Continue or stop on errors

Rule Structure

{
    "id": "premium_order",
    "name": "Premium Order Processing",
    "priority": 1,
    "channel": "orders",
    "version": 2,
    "status": "active",
    "tags": ["premium", "high-priority"],
    "condition": { ">=": [{"var": "data.order.total"}, 1000] },
    "continue_on_error": false,
    "tasks": [
        {
            "id": "apply_discount",
            "name": "Apply Discount",
            "function": { ... }
        },
        {
            "id": "notify_manager",
            "name": "Notify Manager",
            "function": { ... }
        }
    ]
}

Fields

FieldTypeRequiredDescription
idstringYesUnique rule identifier
namestringNoHuman-readable name
prioritynumberNoExecution order (default: 0, lower = first)
conditionJSONLogicNoWhen to execute rule (evaluated against full context)
continue_on_errorbooleanNoContinue on action failure (default: false)
tasksarrayYesActions to execute
channelstringNoChannel for message routing (default: "default")
versionnumberNoWorkflow version number (default: 1)
statusstringNoLifecycle status: active, paused, or archived (default: active)
tagsarrayNoArbitrary tags for organization (default: [])
created_atdatetimeNoCreation timestamp (ISO 8601)
updated_atdatetimeNoLast update timestamp (ISO 8601)

Creating Rules

From JSON String

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;

let rule = Workflow::from_json(r#"{
    "id": "my_rule",
    "name": "My Rule",
    "tasks": [...]
}"#)?;
}

Using the Convenience Constructor

#![allow(unused)]
fn main() {
use dataflow_rs::{Rule, Task};
use serde_json::json;

let rule = Rule::rule(
    "premium_discount",
    "Premium Discount",
    json!({">=": [{"var": "data.order.total"}, 1000]}),
    vec![/* actions */],
);
}

From File

#![allow(unused)]
fn main() {
let rule = Workflow::from_file("rules/my_rule.json")?;
}

Priority Ordering

Rules execute in priority order (lowest first). This enables the THAT (chaining) in the IF → THEN → THAT model:

// Executes first (priority 1) — validate input
{
    "id": "validation",
    "priority": 1,
    "tasks": [...]
}

// Executes second (priority 2) — transform data
{
    "id": "transformation",
    "priority": 2,
    "tasks": [...]
}

// Executes last (priority 10) — send notifications
{
    "id": "notification",
    "priority": 10,
    "tasks": [...]
}

Conditional Execution

Use JSONLogic conditions to control when rules run. Conditions evaluate against the full message contextdata, metadata, and temp_data:

{
    "id": "premium_user_rule",
    "condition": {
        "and": [
            {">=": [{"var": "data.order.total"}, 500]},
            {"==": [{"var": "data.user.is_vip"}, true]}
        ]
    },
    "tasks": [...]
}

Common Condition Patterns

// Match on data fields
{">=": [{"var": "data.order.total"}, 1000]}

// Check data exists
{"!!": {"var": "data.email"}}

// Multiple conditions
{"and": [
    {">=": [{"var": "data.amount"}, 100]},
    {"==": [{"var": "data.currency"}, "USD"]}
]}

// Either condition
{"or": [
    {"==": [{"var": "metadata.source"}, "api"]},
    {"==": [{"var": "metadata.source"}, "webhook"]}
]}

Error Handling

Stop on Error (Default)

{
    "id": "strict_rule",
    "continue_on_error": false,
    "tasks": [...]
}

If any action fails, the rule stops and the error is recorded.

Continue on Error

{
    "id": "resilient_rule",
    "continue_on_error": true,
    "tasks": [...]
}

Actions continue executing even if previous actions fail. Errors are collected in message.errors().

Action Dependencies

Actions within a rule execute sequentially, allowing later actions to depend on earlier results:

{
    "id": "pipeline",
    "tasks": [
        {
            "id": "fetch_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.fetched", "logic": {"var": "data.source"}}
                    ]
                }
            }
        },
        {
            "id": "process_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": {"var": "temp_data.fetched"}}
                    ]
                }
            }
        }
    ]
}

Workflow Lifecycle

Workflows support lifecycle management with status, versioning, and tagging. All lifecycle fields are optional and backward-compatible.

Status

Control whether a workflow is active using the status field:

{"id": "my_rule", "status": "active", "tasks": [...]}
{"id": "old_rule", "status": "paused", "tasks": [...]}
{"id": "legacy_rule", "status": "archived", "tasks": [...]}
  • active (default) — the workflow executes normally and is included in channel routing
  • paused — the workflow is excluded from channel routing but still runs via process_message()
  • archived — same as paused; used to indicate permanently retired workflows

Channel Routing

Group workflows by channel for efficient message routing:

[
    {"id": "order_validate", "channel": "orders", "priority": 1, "tasks": [...]},
    {"id": "order_process", "channel": "orders", "priority": 2, "tasks": [...]},
    {"id": "user_notify", "channel": "notifications", "priority": 1, "tasks": [...]}
]

Then route messages to a specific channel:

#![allow(unused)]
fn main() {
// Only runs workflows on the "orders" channel
engine.process_message_for_channel("orders", &mut message).await?;
}

Version and Tags

Use version and tags for workflow organization:

{
    "id": "discount_rule",
    "version": 3,
    "tags": ["finance", "discount", "v3"],
    "tasks": [...]
}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.

Try changing role to something other than “admin” to see the conditional rule skip.

Actions (Tasks)

An Action (also called Task) is an individual processing unit within a rule that executes a function. Actions are the THEN in the IF → THEN model.

Overview

Actions are the building blocks of rules. Each action:

  • Executes a single function (built-in or custom)
  • Can have a condition for conditional execution
  • Can modify message data
  • Records changes in the audit trail

Action Structure

{
    "id": "apply_discount",
    "name": "Apply Discount",
    "condition": { ">=": [{"var": "data.order.total"}, 100] },
    "continue_on_error": false,
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {
                    "path": "data.order.discount",
                    "logic": {"*": [{"var": "data.order.total"}, 0.1]}
                }
            ]
        }
    }
}

Fields

FieldTypeRequiredDescription
idstringYesUnique action identifier within rule
namestringNoHuman-readable name
conditionJSONLogicNoWhen to execute action (evaluated against full context)
continue_on_errorbooleanNoContinue rule on failure
functionobjectYesFunction to execute

Creating Actions Programmatically

#![allow(unused)]
fn main() {
use dataflow_rs::{Action, FunctionConfig};

let action = Action::action(
    "apply_discount",
    "Apply Discount",
    function_config,
);
}

Function Configuration

The function object specifies what the action does:

{
    "function": {
        "name": "function_name",
        "input": { ... }
    }
}

Built-in Functions

FunctionPurpose
mapData transformation and field mapping
validationData validation with custom error messages
filterPipeline control flow — halt workflow or skip task
logStructured logging with JSONLogic expressions
parse_jsonParse JSON from payload into data context
parse_xmlParse XML string into JSON data structure
publish_jsonSerialize data to JSON string
publish_xmlSerialize data to XML string

Custom Functions

Register custom handlers via the engine builder:

#![allow(unused)]
fn main() {
let engine = Engine::builder()
    .with_workflows(rules)
    .register("my_custom_function", MyCustomFunction)
    .build()?;
}

Then reference them by name in actions:

{
    "function": {
        "name": "my_custom_function",
        "input": { ... }
    }
}

Conditional Execution

Actions can have conditions that determine if they should run. Conditions evaluate against the full context (data, metadata, temp_data):

{
    "id": "premium_greeting",
    "condition": { "==": [{"var": "data.tier"}, "premium"] },
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {"path": "data.greeting", "logic": "Welcome, VIP member!"}
            ]
        }
    }
}

Common Patterns

// Only if field exists
{"!!": {"var": "data.email"}}

// Only if field equals value
{"==": [{"var": "data.status"}, "active"]}

// Only if numeric condition
{">=": [{"var": "data.amount"}, 100]}

// Combine conditions
{"and": [
    {"!!": {"var": "data.email"}},
    {"==": [{"var": "data.verified"}, true]}
]}

Error Handling

Action-Level Error Handling

{
    "id": "optional_action",
    "continue_on_error": true,
    "function": { ... }
}

When continue_on_error is true:

  • Action errors are recorded in message.errors()
  • Rule continues to the next action

Rule-Level Error Handling

The rule’s continue_on_error setting applies to all actions unless overridden.

Sequential Execution

Actions execute in order within a rule. Later actions can use results from earlier actions:

{
    "tasks": [
        {
            "id": "step1",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.intermediate", "logic": {"var": "data.raw"}}
                    ]
                }
            }
        },
        {
            "id": "step2",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.final", "logic": {"var": "temp_data.intermediate"}}
                    ]
                }
            }
        }
    ]
}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and rule visualization.

Try changing tier to “standard” to see different discount applied.

Best Practices

  1. Unique IDs - Use descriptive, unique IDs for debugging
  2. Single Responsibility - Each action should do one thing well
  3. Use temp_data - Store intermediate results in temp_data
  4. Conditions - Add conditions to skip unnecessary processing
  5. Error Handling - Use continue_on_error for optional actions

Message

A Message is the data container that flows through rules, carrying data, metadata, and an audit trail.

Overview

The Message structure contains:

  • context.data - Main data payload
  • context.metadata - Message metadata (routing, source info)
  • context.temp_data - Temporary processing data
  • audit_trail - Record of all changes
  • errors - Collected errors during processing

Message Structure

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
use std::sync::Arc;

pub struct Message {
    // Read via accessors: id(), payload(), payload_arc(), audit_trail(),
    // errors(), capture_changes(). Mutate `errors` via add_error(...).
    // `context` is the only public field — it's the legitimate read
    // surface for tests (e.g. `message.context["data"]["x"]`); inside a
    // handler, prefer `TaskContext::set` so audit-trail changes are
    // recorded automatically.
    pub context: OwnedDataValue,          // Always an Object {data, metadata, temp_data}
    // ... encapsulated fields ...
}
}

The context is structured as:

{
    "data": { ... },
    "metadata": { ... },
    "temp_data": { ... }
}

Creating Messages

Basic Creation

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

// `from_value` bridges from serde_json::Value — handiest when you already
// have JSON literals. The payload lands on the message; the context
// starts empty with the canonical {data, metadata, temp_data} shape.
let mut message = Message::from_value(&json!({
    "name": "John",
    "email": "john@example.com"
}));
}

Native Construction (Zero-Conversion)

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
use std::sync::Arc;

let payload = Arc::new(OwnedDataValue::from(&json!({
    "name": "John"
})));
let mut message = Message::new(payload);
}

Builder

For the richer cases — caller-supplied id (correlation), capture-off fast path — use Message::builder():

#![allow(unused)]
fn main() {
let mut message = Message::builder()
    .id("correlation-123")
    .payload_json(&json!({"name": "John"}))
    .capture_changes(false) // skip per-write Change capture
    .build();
}

Populating the Context

In practice you don’t mutate message.context directly from Rust — the parse_json / map / validation built-ins are how your workflows populate it. Inside a custom AsyncFunctionHandler, use TaskContext::set which records audit-trail changes automatically:

ctx.set("metadata.source", OwnedDataValue::from(&json!("api")));
ctx.set("metadata.type",   OwnedDataValue::from(&json!("user")));

Context Fields

data

The main data payload. This is where your primary data lives and is transformed. Workflows populate it via parse_json / map tasks; handlers read it through ctx.data(). The example below shows the read accessors:

// Inside an AsyncFunctionHandler — TaskContext::data() returns
// &OwnedDataValue (Null if missing, matching serde_json::Value index
// semantics).
let name = ctx.data().get("name");

// Outside a handler (e.g. inspecting a processed message in tests):
let name = &message.data()["name"];

metadata

Information about the message itself (not the data). Commonly used for:

  • Routing decisions (rule conditions)
  • Source tracking
  • Timestamps
  • Message type classification

From a handler, ctx.set("metadata.X", v) is the canonical write path. The engine also stamps metadata.processed_at and metadata.engine_version automatically on every process_message call.

temp_data

Temporary storage for intermediate processing results — useful for values threaded between tasks within the same workflow. From a handler:

ctx.set("temp_data.calculated_value", OwnedDataValue::from(&json!(42)));

// Later tasks read it via JSONLogic:
//   {"var": "temp_data.calculated_value"}

Audit Trail

Every modification to message data is recorded:

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,  // owned (not Arc) — one fewer heap alloc per Change
    pub new_value: OwnedDataValue,
}
}

To skip per-write Change capture (bulk-pipeline fast path), build the message with capture_changes(false):

#![allow(unused)]
fn main() {
let m = Message::builder()
    .payload_json(&json!({}))
    .capture_changes(false)
    .build();
}

Audit-trail entries are still recorded — just with empty changes lists. The wire shape is unchanged either way.

Accessing Audit Trail

#![allow(unused)]
fn main() {
// After processing — audit_trail() returns &[AuditTrail].
for entry in message.audit_trail() {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    for change in &entry.changes {
        println!("  {} -> {} at {}", change.old_value, change.new_value, change.path);
    }
}
}

Error Handling

Errors are collected in message.errors() (the always-on channel, even when Engine::process_message returns Result::Err):

#![allow(unused)]
fn main() {
for error in message.errors() {
    println!("Error in {}/{}: {}",
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown"),
        error.message
    );
}
}

See Error Handling for the unified-channel contract in detail.

JSONLogic Access

In rule conditions and mappings, access message fields using JSONLogic:

// Access data fields
{"var": "data.name"}
{"var": "data.user.email"}

// Access metadata
{"var": "metadata.type"}
{"var": "metadata.source"}

// Access temp_data
{"var": "temp_data.intermediate_result"}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice how temp_data is used to store an intermediate result.

Best Practices

  1. Separate Concerns

    • Use data for business data
    • Use metadata for routing and rule conditions
    • Use temp_data for intermediate results
  2. Don’t Modify metadata in Tasks

    • Metadata should remain stable for routing decisions
  3. Clean temp_data

    • Use temp_data for values only needed during processing
  4. Check Audit Trail

    • Use the audit trail for debugging and compliance

Error Handling

Dataflow-rs provides flexible error handling at multiple levels to build resilient automation rules.

Two complementary error channels

Every error encountered during process_message flows through two complementary channels:

  • message.errors()always contains every error encountered: validation failures, task panics, 5xx-status outcomes, workflow wrappers. Callers that want a uniform view scan this list.
  • Result::Err from process_message — signals only that the engine stopped before processing every workflow. Callers that want fail-fast match on it; the error pushed to message.errors() for the same failure carries the workflow context that the bare Err doesn’t.

A workflow with continue_on_error: true records its errors to message.errors() and returns Ok(()). A workflow with continue_on_error: false records to message.errors() and returns Result::Err (which short-circuits the rest of process_message).

Error Levels

Errors can be handled at three levels:

  1. Action Level - Individual action (task) error handling
  2. Rule Level - Rule-wide (workflow) error policy
  3. Engine Level - Processing errors

Action-Level Error Handling

Stop on Error (Default)

{
    "id": "critical_action",
    "continue_on_error": false,
    "function": { ... }
}

If the action fails:

  • Error is recorded in message.errors()
  • Rule execution stops
  • No further actions execute

Continue on Error

{
    "id": "optional_action",
    "continue_on_error": true,
    "function": { ... }
}

If the action fails:

  • Error is recorded in message.errors()
  • Rule continues to next action

Rule-Level Error Handling

The rule’s continue_on_error applies to all actions by default:

{
    "id": "resilient_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "action1", "function": { ... }},
        {"id": "action2", "function": { ... }},
        {"id": "action3", "function": { ... }}
    ]
}

All actions will continue even if earlier actions fail.

Override at Action Level

{
    "id": "mixed_rule",
    "continue_on_error": true,
    "tasks": [
        {"id": "optional_action", "function": { ... }},
        {
            "id": "critical_action",
            "continue_on_error": false,
            "function": { ... }
        }
    ]
}

Accessing Errors

After processing, walk message.errors():

#![allow(unused)]
fn main() {
let result = engine.process_message(&mut message).await;

for error in message.errors() {
    println!("Error: {} in {}/{}",
        error.message,
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown")
    );
}

// Fail-fast signal — true when the engine stopped before all workflows ran.
if let Err(e) = result {
    eprintln!("engine stopped early: {e}");
}
}

Common error codes you’ll see:

  • VALIDATION_ERROR — from the validation built-in
  • TASK_ERROR — handler returned Result::Err
  • TASK_STATUS_ERROR — handler returned TaskOutcome::Status(s) with s >= 500
  • WORKFLOW_ERROR — wrapper recording workflow context for the failure above

Error Types

Validation Errors

Generated by the validation function when rules fail:

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "condition": {"!!": {"var": "data.email"}},
                    "error_message": "Email is required"
                }
            ]
        }
    }
}

Execution Errors

Generated when function execution fails:

  • JSONLogic evaluation errors
  • Data type mismatches
  • Missing required fields

Custom Function Errors

Return errors from custom functions via Result::Err:

use dataflow_rs::prelude::*;

impl AsyncFunctionHandler for MyFunction {
    type Input = serde_json::Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        _input: &serde_json::Value,
    ) -> Result<TaskOutcome> {
        if some_condition {
            return Err(DataflowError::Task(
                "Custom error message".to_string()
            ));
        }
        Ok(TaskOutcome::Success)
    }
}

DataflowError provides typed variants for the most common cases — Validation, Task, Workflow, FunctionExecution, FunctionNotFound, Http, Timeout, Io, LogicEvaluation, Deserialization, Unknown. See the API reference for the full list.

Error Recovery Patterns

Fallback Values

Use conditions to provide fallback values:

{
    "tasks": [
        {
            "id": "try_primary",
            "continue_on_error": true,
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.result", "logic": {"var": "data.primary"}}
                    ]
                }
            }
        },
        {
            "id": "use_fallback",
            "condition": {"!": {"var": "temp_data.result"}},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": "default_value"}
                    ]
                }
            }
        }
    ]
}

Validation Before Processing

Validate data before critical operations:

{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
                    ]
                }
            }
        },
        {
            "id": "process",
            "function": { ... }
        }
    ]
}

If validation fails, the rule stops before further processing.

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice the validation error is recorded but processing continues.

Best Practices

  1. Validate Early

    • Add validation actions at the start of rules
    • Fail fast on invalid data
  2. Use continue_on_error Wisely

    • Only for truly optional actions
    • Critical operations should stop on error
  3. Check Errors

    • Always check message.errors() after processing
    • Log errors for monitoring
  4. Provide Context

    • Include meaningful error messages
    • Include field paths in validation errors

Built-in Functions Overview

Dataflow-rs comes with built-in action functions for common data processing tasks, covering the complete lifecycle from parsing input to publishing output.

Available Functions

FunctionPurposeModifies Data
parse_jsonParse JSON from payload into data contextYes
parse_xmlParse XML string into JSON data structureYes
mapData transformation and field mappingYes
validationRule-based data validationNo (read-only)
filterPipeline control flow — halt workflow or skip taskNo
logStructured logging with JSONLogic expressionsNo
publish_jsonSerialize data to JSON stringYes
publish_xmlSerialize data to XML stringYes

In addition, dataflow-rs ships typed config schemas for three common service-layer integrations — http_call, enrich, and publish_kafka. These are not pre-registered: register an AsyncFunctionHandler under the matching name and the engine handles config validation and JSONLogic pre-compilation for you. See Integrations.

Common Patterns

Complete Pipeline: Parse → Transform → Validate → Publish

{
    "tasks": [
        {
            "id": "parse_input",
            "function": {
                "name": "parse_json",
                "input": {
                    "source": "payload",
                    "target": "input"
                }
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.user.fullName", "logic": {"cat": [{"var": "data.input.firstName"}, " ", {"var": "data.input.lastName"}]}}
                    ]
                }
            }
        },
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"logic": {"!!": {"var": "data.user.fullName"}}, "message": "Full name required"}
                    ]
                }
            }
        },
        {
            "id": "publish",
            "function": {
                "name": "publish_json",
                "input": {
                    "source": "user",
                    "target": "response",
                    "pretty": true
                }
            }
        }
    ]
}

Conditional Transformation

{
    "tasks": [
        {
            "id": "conditional_map",
            "condition": {"==": [{"var": "data.tier"}, "premium"]},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.discount", "logic": 20}
                    ]
                }
            }
        }
    ]
}

XML Processing Pipeline

{
    "tasks": [
        {
            "id": "parse_xml_input",
            "function": {
                "name": "parse_xml",
                "input": {
                    "source": "payload",
                    "target": "xmlData"
                }
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.response.status", "logic": "processed"}
                    ]
                }
            }
        },
        {
            "id": "publish_xml_output",
            "function": {
                "name": "publish_xml",
                "input": {
                    "source": "response",
                    "target": "xmlOutput",
                    "root_element": "Response"
                }
            }
        }
    ]
}

Function Configuration

All functions use this structure:

{
    "function": {
        "name": "function_name",
        "input": {
            // Function-specific configuration
        }
    }
}

Custom Functions

For operations beyond built-in functions, implement the AsyncFunctionHandler trait. See Custom Functions.

Learn More

Parse Functions

The parse functions convert payload data into structured context data. They are typically used at the start of a workflow to load input data into the processing context.

parse_json

Extracts JSON data from the payload or data context and stores it in a target field.

Configuration

{
    "function": {
        "name": "parse_json",
        "input": {
            "source": "payload",
            "target": "input_data"
        }
    }
}

Parameters

ParameterTypeRequiredDescription
sourcestringYesPath to read from: payload, payload.field, or data.field
targetstringYesField name in data where the result will be stored

Examples

Parse Entire Payload

{
    "id": "load_payload",
    "function": {
        "name": "parse_json",
        "input": {
            "source": "payload",
            "target": "request"
        }
    }
}

Input:

{
    "payload": {"name": "Alice", "age": 30}
}

Result:

{
    "data": {
        "request": {"name": "Alice", "age": 30}
    }
}

Parse Nested Payload Field

{
    "id": "extract_body",
    "function": {
        "name": "parse_json",
        "input": {
            "source": "payload.body.user",
            "target": "user_data"
        }
    }
}

Input:

{
    "payload": {
        "headers": {},
        "body": {
            "user": {"id": 123, "name": "Bob"}
        }
    }
}

Result:

{
    "data": {
        "user_data": {"id": 123, "name": "Bob"}
    }
}

parse_xml

Parses an XML string from the source path, converts it to JSON, and stores it in the target field.

Configuration

{
    "function": {
        "name": "parse_xml",
        "input": {
            "source": "payload",
            "target": "xml_data"
        }
    }
}

Parameters

ParameterTypeRequiredDescription
sourcestringYesPath to XML string: payload, payload.field, or data.field
targetstringYesField name in data where the parsed JSON will be stored

XML to JSON Conversion

The XML parser follows these conventions:

  • Element names become object keys
  • Text content is stored under the element key
  • Attributes are preserved in the JSON structure
  • Multiple child elements with the same name become arrays

Examples

Parse XML Payload

{
    "id": "parse_xml_request",
    "function": {
        "name": "parse_xml",
        "input": {
            "source": "payload",
            "target": "request"
        }
    }
}

Input:

{
    "payload": "<user><name>Alice</name><email>alice@example.com</email></user>"
}

Result:

{
    "data": {
        "request": {
            "name": "Alice",
            "email": "alice@example.com"
        }
    }
}

Parse Nested XML String

{
    "id": "parse_xml_body",
    "function": {
        "name": "parse_xml",
        "input": {
            "source": "payload.xmlContent",
            "target": "parsed"
        }
    }
}

Common Patterns

Load and Transform Pipeline

{
    "tasks": [
        {
            "id": "load",
            "function": {
                "name": "parse_json",
                "input": {"source": "payload", "target": "input"}
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.output.name", "logic": {"var": "data.input.name"}}
                    ]
                }
            }
        }
    ]
}

Handle XML API Response

{
    "tasks": [
        {
            "id": "parse_response",
            "function": {
                "name": "parse_xml",
                "input": {"source": "payload.response", "target": "apiResponse"}
            }
        },
        {
            "id": "extract_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": {"var": "data.apiResponse.result"}}
                    ]
                }
            }
        }
    ]
}

Error Handling

  • parse_json: Returns the source value as-is (even if null or not JSON)
  • parse_xml: Returns an error if the source is not a string or if XML parsing fails

Next Steps

Map Function

The map function transforms and reorganizes data using JSONLogic expressions.

Overview

The map function:

  • Evaluates JSONLogic expressions against message context
  • Assigns results to specified paths
  • Supports nested path creation
  • Tracks changes for audit trail

Basic Usage

{
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {
                    "path": "data.full_name",
                    "logic": {"cat": [{"var": "data.first_name"}, " ", {"var": "data.last_name"}]}
                }
            ]
        }
    }
}

Configuration

FieldTypeRequiredDescription
mappingsarrayYesList of mapping operations

Mapping Object

FieldTypeRequiredDescription
pathstringYesTarget path (e.g., “data.user.name”)
logicJSONLogicYesExpression to evaluate

Path Syntax

Dot Notation

Access and create nested structures:

{"path": "data.user.profile.name", "logic": "John"}

Creates: {"data": {"user": {"profile": {"name": "John"}}}}

Numeric Field Names

Use # prefix for numeric keys:

{"path": "data.items.#0", "logic": "first item"}

Creates: {"data": {"items": {"0": "first item"}}}

Root Field Assignment

Assigning to root fields (data, metadata, temp_data) merges objects:

{"path": "data", "logic": {"new_field": "value"}}

Merges into existing data rather than replacing it.

JSONLogic Expressions

Copy Value

{"path": "data.copy", "logic": {"var": "data.original"}}

Static Value

{"path": "data.status", "logic": "active"}

String Concatenation

{
    "path": "data.greeting",
    "logic": {"cat": ["Hello, ", {"var": "data.name"}, "!"]}
}

Conditional Value

{
    "path": "data.tier",
    "logic": {"if": [
        {">=": [{"var": "data.points"}, 1000]}, "gold",
        {">=": [{"var": "data.points"}, 500]}, "silver",
        "bronze"
    ]}
}

Arithmetic

{
    "path": "data.total",
    "logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}

Array Operations

{
    "path": "data.count",
    "logic": {"reduce": [
        {"var": "data.items"},
        {"+": [{"var": "accumulator"}, 1]},
        0
    ]}
}

Null Handling

If a JSONLogic expression evaluates to null, the mapping is skipped:

// If data.optional doesn't exist, this mapping is skipped
{"path": "data.copy", "logic": {"var": "data.optional"}}

Sequential Mappings

Mappings execute in order, allowing later mappings to use earlier results:

{
    "mappings": [
        {
            "path": "temp_data.full_name",
            "logic": {"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}
        },
        {
            "path": "data.greeting",
            "logic": {"cat": ["Hello, ", {"var": "temp_data.full_name"}]}
        }
    ]
}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Common Patterns

Copy Between Contexts

// Copy from data to metadata
{"path": "metadata.user_id", "logic": {"var": "data.id"}}

// Copy from data to temp_data
{"path": "temp_data.original", "logic": {"var": "data.value"}}

Default Values

{
    "path": "data.name",
    "logic": {"if": [
        {"!!": {"var": "data.name"}},
        {"var": "data.name"},
        "Unknown"
    ]}
}

Computed Fields

{
    "path": "data.subtotal",
    "logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}

Best Practices

  1. Use temp_data - Store intermediate results in temp_data
  2. Order Matters - Place dependencies before dependent mappings
  3. Check for Null - Handle missing fields with if or !! checks
  4. Merge Root Fields - Use root assignment to merge, not replace

Validation Function

The validation function evaluates rules against message data and collects validation errors.

Overview

The validation function:

  • Evaluates JSONLogic rules against message context
  • Collects errors for failed validations
  • Is read-only (doesn’t modify message data)
  • Returns status 200 (pass) or 400 (fail)

Basic Usage

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "logic": {"!!": {"var": "data.email"}},
                    "message": "Email is required"
                },
                {
                    "logic": {">": [{"var": "data.age"}, 0]},
                    "message": "Age must be positive"
                }
            ]
        }
    }
}

Configuration

FieldTypeRequiredDescription
rulesarrayYesList of validation rules

Rule Object

FieldTypeRequiredDescription
logicJSONLogicYesExpression that must evaluate to true
messagestringNoError message (default: “Validation failed”)

How Validation Works

  1. Each rule’s logic is evaluated against the message context
  2. If the result is exactly true, the rule passes
  3. Any other result (false, null, etc.) is a failure
  4. Failed rules add errors to message.errors()

Common Validation Patterns

Required Field

{
    "logic": {"!!": {"var": "data.email"}},
    "message": "Email is required"
}

Numeric Range

{
    "logic": {"and": [
        {">=": [{"var": "data.age"}, 18]},
        {"<=": [{"var": "data.age"}, 120]}
    ]},
    "message": "Age must be between 18 and 120"
}

String Length

{
    "logic": {">=": [
        {"length": {"var": "data.password"}},
        8
    ]},
    "message": "Password must be at least 8 characters"
}

Pattern Matching (with Regex)

{
    "logic": {"regex_match": [
        {"var": "data.email"},
        "^[^@]+@[^@]+\\.[^@]+$"
    ]},
    "message": "Invalid email format"
}

Conditional Required

{
    "logic": {"or": [
        {"!": {"var": "data.is_business"}},
        {"!!": {"var": "data.company_name"}}
    ]},
    "message": "Company name required for business accounts"
}

Value in List

{
    "logic": {"in": [
        {"var": "data.status"},
        ["active", "pending", "suspended"]
    ]},
    "message": "Invalid status value"
}

Multiple Rules

All rules are evaluated, collecting all errors:

{
    "rules": [
        {
            "logic": {"!!": {"var": "data.name"}},
            "message": "Name is required"
        },
        {
            "logic": {"!!": {"var": "data.email"}},
            "message": "Email is required"
        },
        {
            "logic": {">": [{"var": "data.amount"}, 0]},
            "message": "Amount must be positive"
        }
    ]
}

Accessing Errors

After processing, check message.errors():

#![allow(unused)]
fn main() {
for error in message.errors() {
    println!("{}: {}", error.code, error.message);
}
}

Error structure:

  • code: “VALIDATION_ERROR” for failed rules
  • message: The error message from the rule

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice the validation errors in the output.

Validation with Continue on Error

Combine validation with data transformation:

{
    "id": "validated_transform",
    "continue_on_error": true,
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [...]
                }
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [...]
                }
            }
        }
    ]
}

With continue_on_error: true, transformation proceeds even if validation fails.

Stop on Validation Failure

For strict validation (stop on failure):

{
    "continue_on_error": false,
    "tasks": [
        {
            "id": "validate",
            "function": {"name": "validation", "input": {...}}
        },
        {
            "id": "process",
            "function": {"name": "map", "input": {...}}
        }
    ]
}

If validation fails, subsequent tasks are skipped.

Best Practices

  1. Validate Early - Add validation as the first task
  2. Clear Messages - Write specific, actionable error messages
  3. Check All Rules - Validation evaluates all rules (doesn’t short-circuit)
  4. Use continue_on_error - Decide if processing should continue on failure
  5. Handle Errors - Always check message.errors() after processing

Filter (Pipeline Control Flow)

The filter function provides pipeline control flow by evaluating a JSONLogic condition and either halting the workflow or skipping the task when the condition is false.

Overview

Filter is a gate function — it doesn’t modify data but controls whether subsequent tasks execute. This enables patterns like:

  • Guard clauses — halt a workflow early if prerequisites aren’t met
  • Conditional branches — skip optional processing steps
  • Data quality gates — stop processing if data doesn’t meet criteria

Configuration

{
    "function": {
        "name": "filter",
        "input": {
            "condition": { "JSONLogic expression" },
            "on_reject": "halt | skip"
        }
    }
}

Fields

FieldTypeRequiredDescription
conditionJSONLogicYesCondition to evaluate against the full message context
on_rejectstringNoWhat to do when condition is false: "halt" (default) or "skip"

Rejection Behavior

halt (default)

When the condition is false, the entire workflow stops — no further tasks in the workflow execute.

{
    "id": "guard_active_status",
    "name": "Check Active Status",
    "function": {
        "name": "filter",
        "input": {
            "condition": {"==": [{"var": "data.status"}, "active"]},
            "on_reject": "halt"
        }
    }
}

If data.status is not "active", the workflow halts immediately. The halt is recorded in the audit trail.

skip

When the condition is false, only the current task is skipped — the workflow continues with the next task.

{
    "id": "optional_premium_check",
    "name": "Check Premium Tier",
    "function": {
        "name": "filter",
        "input": {
            "condition": {"==": [{"var": "data.tier"}, "premium"]},
            "on_reject": "skip"
        }
    }
}

If the user is not premium, this task is skipped silently and the next task runs.

Examples

Guard Clause Pattern

Stop processing if required data is missing:

{
    "id": "validation_pipeline",
    "tasks": [
        {
            "id": "parse",
            "function": { "name": "parse_json", "input": {"source": "payload", "target": "input"} }
        },
        {
            "id": "require_email",
            "function": {
                "name": "filter",
                "input": {
                    "condition": {"!!": {"var": "data.input.email"}},
                    "on_reject": "halt"
                }
            }
        },
        {
            "id": "process",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": {"cat": ["Processed: ", {"var": "data.input.email"}]}}
                    ]
                }
            }
        }
    ]
}

Multi-Condition Gate

Combine conditions with JSONLogic and/or:

{
    "id": "complex_gate",
    "function": {
        "name": "filter",
        "input": {
            "condition": {
                "and": [
                    {">=": [{"var": "data.order.total"}, 100]},
                    {"==": [{"var": "data.order.currency"}, "USD"]},
                    {"!!": {"var": "data.order.shipping_address"}}
                ]
            },
            "on_reject": "halt"
        }
    }
}

Optional Processing Step

Use skip for non-critical conditional logic:

{
    "tasks": [
        {
            "id": "apply_coupon",
            "function": {
                "name": "filter",
                "input": {
                    "condition": {"!!": {"var": "data.coupon_code"}},
                    "on_reject": "skip"
                }
            }
        },
        {
            "id": "process_coupon",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.discount", "logic": 10}
                    ]
                }
            }
        }
    ]
}

Status Codes

CodeMeaningBehavior
200PassCondition was true, continue normally
298SkipCondition false + on_reject: skip — skip task, continue workflow
299HaltCondition false + on_reject: halt — stop workflow execution

Notes

  • The filter condition is pre-compiled at engine startup for zero runtime overhead
  • Filter never modifies the message — it only controls execution flow
  • When a workflow halts, the halt is recorded in the audit trail for debugging
  • When a task is skipped, no audit trail entry is created

Log (Structured Logging)

The log function provides structured logging within workflows using the Rust log crate. Log messages and fields support JSONLogic expressions for dynamic content.

Overview

The log function allows you to:

  • Emit structured log messages at any point in a workflow
  • Use JSONLogic expressions for dynamic message content
  • Attach structured fields for machine-readable log data
  • Debug data flow without modifying the message

Configuration

{
    "function": {
        "name": "log",
        "input": {
            "level": "info",
            "message": "JSONLogic expression or static string",
            "fields": {
                "field_name": "JSONLogic expression"
            }
        }
    }
}

Fields

FieldTypeRequiredDescription
levelstringNoLog level: trace, debug, info (default), warn, error
messageJSONLogicYesThe log message (evaluated as JSONLogic against message context)
fieldsobjectNoKey-value pairs where values are JSONLogic expressions

Log Levels

LevelUse Case
traceVery detailed debugging (function entry/exit, variable values)
debugDebugging information (intermediate processing state)
infoGeneral informational messages (processing milestones)
warnWarning conditions (unusual but not erroneous states)
errorError conditions (failures that are handled)

Examples

Simple Static Message

{
    "id": "log_start",
    "function": {
        "name": "log",
        "input": {
            "level": "info",
            "message": "Starting order processing"
        }
    }
}

Dynamic Message with JSONLogic

{
    "id": "log_order",
    "function": {
        "name": "log",
        "input": {
            "level": "info",
            "message": {"cat": ["Processing order ", {"var": "data.order.id"}, " for $", {"var": "data.order.total"}]},
            "fields": {
                "order_id": {"var": "data.order.id"},
                "customer": {"var": "data.customer.name"},
                "total": {"var": "data.order.total"}
            }
        }
    }
}

Debug Logging

{
    "id": "debug_state",
    "function": {
        "name": "log",
        "input": {
            "level": "debug",
            "message": {"cat": ["Current data state: ", {"var": "data"}]},
            "fields": {
                "has_email": {"!!": {"var": "data.email"}},
                "item_count": {"var": "data.items.length"}
            }
        }
    }
}

Warning on Edge Cases

{
    "id": "warn_missing",
    "condition": {"!": {"var": "data.shipping_address"}},
    "function": {
        "name": "log",
        "input": {
            "level": "warn",
            "message": {"cat": ["Order ", {"var": "data.order.id"}, " has no shipping address"]}
        }
    }
}

Log Target

All log messages are emitted with the target dataflow::log, making it easy to filter in your logging configuration:

#![allow(unused)]
fn main() {
// Using env_logger
RUST_LOG=dataflow::log=info cargo run

// Or filter specifically for dataflow logs
env_logger::Builder::new()
    .filter_module("dataflow::log", log::LevelFilter::Debug)
    .init();
}

Notes

  • The log function never modifies the message — it is read-only
  • The log function never fails — it always returns status 200 with no changes
  • All JSONLogic expressions in message and fields are pre-compiled at engine startup
  • If a JSONLogic expression fails to evaluate, the raw expression value is logged instead
  • The fields are formatted as key=value pairs appended to the log message

Publish Functions

The publish functions serialize structured data into string formats (JSON or XML). They are typically used at the end of a workflow to prepare output data for transmission or storage.

publish_json

Serializes data from the source field to a JSON string.

Configuration

{
    "function": {
        "name": "publish_json",
        "input": {
            "source": "output",
            "target": "json_string"
        }
    }
}

Parameters

ParameterTypeRequiredDefaultDescription
sourcestringYes-Field name in data to serialize (e.g., output or nested.field)
targetstringYes-Field name where the JSON string will be stored
prettybooleanNofalseWhether to pretty-print the JSON output

Examples

Serialize Data to JSON

{
    "id": "publish_response",
    "function": {
        "name": "publish_json",
        "input": {
            "source": "response",
            "target": "responseBody"
        }
    }
}

Input:

{
    "data": {
        "response": {"status": "success", "count": 42}
    }
}

Result:

{
    "data": {
        "response": {"status": "success", "count": 42},
        "responseBody": "{\"status\":\"success\",\"count\":42}"
    }
}

Pretty-Print JSON

{
    "id": "publish_pretty",
    "function": {
        "name": "publish_json",
        "input": {
            "source": "user",
            "target": "userJson",
            "pretty": true
        }
    }
}

Result:

{
    "data": {
        "userJson": "{\n  \"name\": \"Alice\",\n  \"age\": 30\n}"
    }
}

publish_xml

Serializes data from the source field to an XML string.

Configuration

{
    "function": {
        "name": "publish_xml",
        "input": {
            "source": "output",
            "target": "xml_string",
            "root_element": "Response"
        }
    }
}

Parameters

ParameterTypeRequiredDefaultDescription
sourcestringYes-Field name in data to serialize
targetstringYes-Field name where the XML string will be stored
root_elementstringNorootName of the root XML element

JSON to XML Conversion

The serializer follows these rules:

  • Object keys become XML element names
  • Array items are wrapped in <item> elements
  • Special characters are properly escaped (<, >, &, ", ')
  • Invalid XML element names are sanitized (e.g., names starting with numbers get an underscore prefix)

Examples

Serialize Data to XML

{
    "id": "publish_xml_response",
    "function": {
        "name": "publish_xml",
        "input": {
            "source": "user",
            "target": "userXml",
            "root_element": "User"
        }
    }
}

Input:

{
    "data": {
        "user": {"name": "Alice", "age": 30}
    }
}

Result:

{
    "data": {
        "user": {"name": "Alice", "age": 30},
        "userXml": "<User><name>Alice</name><age>30</age></User>"
    }
}

Serialize Nested Data

{
    "id": "publish_nested",
    "function": {
        "name": "publish_xml",
        "input": {
            "source": "response.data",
            "target": "xmlOutput",
            "root_element": "Data"
        }
    }
}

Common Patterns

Complete API Pipeline

{
    "tasks": [
        {
            "id": "parse_request",
            "function": {
                "name": "parse_json",
                "input": {"source": "payload", "target": "request"}
            }
        },
        {
            "id": "process",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.response.message", "logic": {"cat": ["Hello, ", {"var": "data.request.name"}]}}
                    ]
                }
            }
        },
        {
            "id": "publish_response",
            "function": {
                "name": "publish_json",
                "input": {"source": "response", "target": "body"}
            }
        }
    ]
}

XML-to-XML Transformation

{
    "tasks": [
        {
            "id": "parse_xml",
            "function": {
                "name": "parse_xml",
                "input": {"source": "payload", "target": "input"}
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.output.result", "logic": {"var": "data.input.value"}}
                    ]
                }
            }
        },
        {
            "id": "publish_xml",
            "function": {
                "name": "publish_xml",
                "input": {"source": "output", "target": "xmlResponse", "root_element": "Result"}
            }
        }
    ]
}

Generate Both JSON and XML Outputs

{
    "tasks": [
        {
            "id": "publish_json",
            "function": {
                "name": "publish_json",
                "input": {"source": "response", "target": "jsonOutput"}
            }
        },
        {
            "id": "publish_xml",
            "function": {
                "name": "publish_xml",
                "input": {"source": "response", "target": "xmlOutput", "root_element": "Response"}
            }
        }
    ]
}

Error Handling

  • publish_json: Returns an error if the source field is not found or is null
  • publish_xml: Returns an error if the source field is not found or is null

XML Element Name Sanitization

XML has strict rules for element names. The publish_xml function automatically sanitizes invalid names:

OriginalSanitized
123field_123field
field namefield_name
field@attrfield_attr
`` (empty)_element

Next Steps

Integration Functions

The http_call, enrich, and publish_kafka functions provide typed configuration schemas for the three most common service-layer integration patterns. Unlike map or validation, they do not ship with a built-in handler — the actual I/O is provided by your application via AsyncFunctionHandler.

Why a config schema without an implementation?

The engine itself is I/O-agnostic: it doesn’t bundle an HTTP client, a Kafka producer, or any other transport. But the shape of these integrations is predictable enough that dataflow-rs provides typed config structs so that:

  • JSONLogic expressions inside the config (path_logic, body_logic, key_logic, …) are pre-compiled at engine startup — same fail-loud behaviour as map rules
  • Misshapen config fails at Engine::new(), not at first message
  • Your handler receives an already-validated HttpCallConfig / EnrichConfig / PublishKafkaConfig — no per-call JSON parse

How to use them

For each integration variant you want to use, register a handler under the matching name when building the engine:

use dataflow_rs::prelude::*;
use dataflow_rs::HttpCallConfig;
use async_trait::async_trait;

struct HttpCallHandler { /* reqwest::Client, connector registry, etc. */ }

#[async_trait]
impl AsyncFunctionHandler for HttpCallHandler {
    type Input = HttpCallConfig;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        cfg: &HttpCallConfig,
    ) -> Result<TaskOutcome> {
        // Resolve cfg.connector, evaluate cfg.compiled_path_logic if set,
        // make the call, merge response into ctx via cfg.response_path…
        Ok(TaskOutcome::Success)
    }
}

let engine = Engine::builder()
    .register("http_call", HttpCallHandler { /* … */ })
    .with_workflow(workflow)
    .build()?;

Skip the registration step and any workflow that uses these variants will fail with DataflowError::FunctionNotFound("http_call") at dispatch time.


http_call

Issue an HTTP request and optionally merge the response into the message context.

Configuration

{
    "function": {
        "name": "http_call",
        "input": {
            "connector": "user_service",
            "method": "GET",
            "path_logic": { "cat": ["/users/", {"var": "data.user_id"}] },
            "headers": { "X-Request-Id": "abc" },
            "response_path": "data.user_profile",
            "timeout_ms": 5000
        }
    }
}

Parameters

ParameterTypeRequiredDescription
connectorstringYesNamed reference resolved by your service layer
methodstringNoGET (default), POST, PUT, PATCH, DELETE
pathstringNoStatic request path
path_logicJSONLogicNoComputed path; pre-compiled at startup
headersobjectNoStatic request headers
bodyanyNoStatic request body
body_logicJSONLogicNoComputed body; pre-compiled at startup
response_pathstringNoDot-path to merge response into the message context
timeout_msu64NoRequest timeout in milliseconds (default: 30000)

Use path or path_logic, not both. Same for body / body_logic.


enrich

Fetch external data and merge it into the message context at a specified path. A specialization of http_call aimed at the “look up and attach” pattern.

Configuration

{
    "function": {
        "name": "enrich",
        "input": {
            "connector": "customer_lookup",
            "method": "GET",
            "path_logic": { "cat": ["/customers/", {"var": "data.customer_id"}] },
            "merge_path": "data.customer",
            "timeout_ms": 5000,
            "on_error": "skip"
        }
    }
}

Parameters

ParameterTypeRequiredDescription
connectorstringYesNamed reference resolved by your service layer
methodstringNoHTTP method (default GET)
pathstringNoStatic request path
path_logicJSONLogicNoComputed request path
merge_pathstringYesDot-path where the response is merged into the context
timeout_msu64NoRequest timeout in milliseconds (default: 30000)
on_error"fail" | "skip"NoBehaviour on lookup failure (default: fail)

on_error: skip is useful when enrichment is best-effort and an absent upstream service shouldn’t fail the workflow.


publish_kafka

Emit the message (or a derived value) to a Kafka topic.

Configuration

{
    "function": {
        "name": "publish_kafka",
        "input": {
            "connector": "events_cluster",
            "topic": "orders.processed",
            "key_logic": { "var": "data.order_id" },
            "value_logic": { "var": "data" }
        }
    }
}

Parameters

ParameterTypeRequiredDescription
connectorstringYesNamed reference resolved by your service layer
topicstringYesTarget Kafka topic
key_logicJSONLogicNoComputed message key
value_logicJSONLogicNoComputed message value (default: serialize the message)

The handler decides exactly how to render the produced value — for example, sending the entire message JSON when value_logic is omitted.


Connectors

The connector field is a string that your handler resolves into a concrete client (HTTP client + base URL, Kafka producer + cluster config, …). The engine does not interpret it. A typical layout:

struct HttpCallHandler {
    connectors: HashMap<String, HttpConnector>,  // "user_service" -> &Client + base_url
}

This separation keeps secrets out of workflow JSON and lets you swap endpoints (staging / prod) without touching rule definitions.

Why typed configs matter

Compared to free-form Custom configs:

  • Startup-time validation — bad config fails at Engine::new()
  • Pre-compiled JSONLogicpath_logic, body_logic, key_logic, value_logic are all compiled once; the handler reads Arc<Logic> from the config and evaluates at zero allocation cost in the hot path
  • Stable shape — the same config struct is shared by every handler in the ecosystem, so handlers from different crates can be swapped without rewriting workflows

WebAssembly Package

The @goplasmatic/dataflow-wasm package provides WebAssembly bindings for dataflow-rs, enabling you to run the same rules engine in the browser that powers your Rust backend.

Installation

npm install @goplasmatic/dataflow-wasm

Quick Start

import init, { WasmEngine } from '@goplasmatic/dataflow-wasm';

// Initialize the WASM module (required once)
await init();

// Define your workflows
const workflows = [
  {
    id: 'my-workflow',
    name: 'My Workflow',
    tasks: [
      {
        id: 'transform',
        name: 'Transform Data',
        function: {
          name: 'map',
          input: {
            mappings: [
              { path: 'data.output', logic: { var: 'data.input' } }
            ]
          }
        }
      }
    ]
  }
];

// Create the engine
const engine = new WasmEngine(workflows);

// Process a message
const message = {
  data: { input: 'hello world' },
  metadata: {}
};

const result = await engine.process(message);
console.log(result.data.output); // 'hello world'

API Reference

WasmEngine

The main class for executing rules.

class WasmEngine {
  constructor(workflows: Workflow[]);

  // Process a message through all matching rules
  process(message: Message): Promise<Message>;

  // Process with execution trace for debugging
  processWithTrace(message: Message): Promise<ExecutionTrace>;
}

Types

interface Workflow {
  id: string;
  name: string;
  condition?: JsonLogicValue;  // Optional condition (evaluated against full context)
  tasks: Task[];
}

interface Task {
  id: string;
  name: string;
  condition?: JsonLogicValue;  // Optional condition (evaluated against full context)
  function: FunctionConfig;
}

interface FunctionConfig {
  name: string;  // 'map' or 'validation'
  input: object;
}

interface Message {
  data: object;
  metadata: object;
  payload?: object;
  temp_data?: object;
}

Execution Tracing

For debugging, use processWithTrace to get step-by-step execution details:

const trace = await engine.processWithTrace(message);

console.log('Steps executed:', trace.steps.length);
console.log('Initial message:', trace.initial_message);
console.log('Final message:', trace.final_message);

for (const step of trace.steps) {
  console.log(`Task: ${step.task_name}`);
  console.log(`Changes: ${step.changes.length}`);
}

Building from Source

Requirements:

  • Rust 1.70+
  • wasm-pack
cd wasm
wasm-pack build --target web --out-dir pkg

The output will be in wasm/pkg/.

Browser Compatibility

The WASM package works in all modern browsers that support WebAssembly:

  • Chrome 57+
  • Firefox 52+
  • Safari 11+
  • Edge 16+

Next Steps

UI Package

The @goplasmatic/dataflow-ui package provides React components for visualizing and debugging dataflow-rs rules and workflows.

Installation

npm install @goplasmatic/dataflow-ui

Peer Dependencies

npm install react react-dom

Supports React 18.x and 19.x.

Quick Start

import { WorkflowVisualizer } from '@goplasmatic/dataflow-ui';
import '@goplasmatic/dataflow-ui/styles.css';

const workflows = [
  {
    id: 'my-workflow',
    name: 'My Workflow',
    tasks: [
      {
        id: 'task-1',
        name: 'Transform Data',
        function: {
          name: 'map',
          input: {
            mappings: [
              { path: 'data.output', logic: { var: 'data.input' } }
            ]
          }
        }
      }
    ]
  }
];

function App() {
  return (
    <WorkflowVisualizer
      workflows={workflows}
      theme="system"
      onTaskSelect={(task, workflow) => {
        console.log('Selected task:', task.name);
      }}
    />
  );
}

Components

WorkflowVisualizer

The main component for displaying rules (workflows) in an interactive tree view.

interface WorkflowVisualizerProps {
  /** Array of workflow definitions to display */
  workflows: Workflow[];
  /** Callback when a workflow is selected */
  onWorkflowSelect?: (workflow: Workflow) => void;
  /** Callback when a task is selected */
  onTaskSelect?: (task: Task, workflow: Workflow) => void;
  /** Theme: 'light', 'dark', or 'system' */
  theme?: Theme;
  /** Additional CSS class */
  className?: string;
  /** Execution result to display */
  executionResult?: Message | null;
  /** Enable debug mode */
  debugMode?: boolean;
}

TreeView

Standalone tree view component for custom layouts.

import { TreeView } from '@goplasmatic/dataflow-ui';

<TreeView
  workflows={workflows}
  selection={currentSelection}
  onSelect={handleSelect}
  debugMode={false}
/>

Debug Mode

Enable step-by-step execution visualization with the debugger components.

import {
  WorkflowVisualizer,
  DebuggerProvider,
  DebuggerControls,
  defaultEngineFactory,
  useDebugger
} from '@goplasmatic/dataflow-ui';

function DebugView() {
  return (
    <DebuggerProvider engineFactory={defaultEngineFactory}>
      <WorkflowVisualizer
        workflows={workflows}
        debugMode={true}
      />
      <DebuggerControls />
    </DebuggerProvider>
  );
}

Custom WASM Engine

Use a custom WASM engine with plugins or custom functions for debugging. Implement the DataflowEngine interface:

import {
  WorkflowVisualizer,
  DebuggerProvider,
  DataflowEngine,
  Workflow
} from '@goplasmatic/dataflow-ui';
import { MyCustomWasmEngine } from './my-custom-wasm';

class MyEngineAdapter implements DataflowEngine {
  private engine: MyCustomWasmEngine;

  constructor(workflows: Workflow[]) {
    this.engine = new MyCustomWasmEngine(JSON.stringify(workflows));
  }

  async processWithTrace(payload: Record<string, unknown>) {
    const result = await this.engine.process_with_trace(JSON.stringify(payload));
    return JSON.parse(result);
  }

  dispose() {
    this.engine.free();
  }
}

function CustomDebugView() {
  return (
    <DebuggerProvider engineFactory={(workflows) => new MyEngineAdapter(workflows)}>
      <WorkflowVisualizer workflows={workflows} debugMode={true} />
    </DebuggerProvider>
  );
}

The engineFactory is called whenever workflows change, ensuring the engine always has the latest workflow definitions.

Debugger Controls

import { DebuggerControls } from '@goplasmatic/dataflow-ui';

// Provides playback controls: play, pause, step forward/back, reset
<DebuggerControls />

useDebugger Hook

Access debugger state programmatically:

import { useDebugger } from '@goplasmatic/dataflow-ui';

function MyComponent() {
  const {
    state,           // Current playback state
    hasTrace,        // Whether a trace is loaded
    currentMessage,  // Message at current step
    currentChanges,  // Changes made at current step
    loadTrace,       // Load an execution trace
    stepForward,     // Go to next step
    stepBackward,    // Go to previous step
    play,            // Start auto-playback
    pause,           // Pause playback
    reset,           // Reset to beginning
  } = useDebugger();

  // ...
}

Theming

The visualizer supports light, dark, and system themes.

// Light theme
<WorkflowVisualizer workflows={workflows} theme="light" />

// Dark theme
<WorkflowVisualizer workflows={workflows} theme="dark" />

// System preference (default)
<WorkflowVisualizer workflows={workflows} theme="system" />

Custom Theme Access

import { useTheme } from '@goplasmatic/dataflow-ui';

function MyComponent() {
  const { theme, setTheme, resolvedTheme } = useTheme();
  // resolvedTheme is 'light' or 'dark' (resolved from 'system')
}

Exports

Components

  • WorkflowVisualizer - Main visualization component
  • TreeView - Standalone tree view
  • DebuggerControls - Debug playback controls
  • DebuggerProvider - Debug context provider
  • MessageInputPanel - Message input for debugging
  • MessageStatePanel - Message state display
  • JsonViewer - JSON display component
  • ErrorBoundary - Error boundary wrapper

Hooks

  • useTheme - Theme state and controls
  • useDebugger - Debugger state and controls
  • useTaskDebugState - Debug state for a specific task
  • useWorkflowDebugState - Debug state for a workflow

Engine

  • WasmEngineAdapter - Default WASM engine adapter
  • defaultEngineFactory - Factory function for default engine
  • DataflowEngine - Interface for custom engines
  • EngineFactory - Type for engine factory functions

Types

All TypeScript types are exported for workflow definitions, tasks, messages, and execution traces.

Building from Source

cd ui
npm install
npm run build:lib

Output will be in ui/dist/.

Next Steps

Custom Functions

Extend dataflow-rs with your own custom processing logic by implementing the AsyncFunctionHandler trait.

Overview

Custom functions allow you to:

  • Add domain-specific processing logic
  • Integrate with external systems
  • Perform async operations (HTTP, database, etc.)
  • Implement complex transformations

The trait has three moving parts:

  • type Input — your typed config shape. The engine deserializes each task’s FunctionConfig::Custom { input } JSON into this type once at Engine::builder().build(), not per message. Misshapen config fails at startup.
  • TaskContext — handed to every call. Read the message context (ctx.data(), ctx.metadata(), ctx.temp_data(), ctx.get(path)), mutate it through ctx.set(path, value) which records audit-trail changes automatically, and append errors via ctx.add_error(...).
  • TaskOutcome — the return value: Success, Status(u16), Skip, or Halt. Replaces the magic-number usize of earlier versions.

Implementing AsyncFunctionHandler

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;

/// Typed config for the handler. The engine deserializes the task's
/// `FunctionConfig::Custom { input }` JSON into this struct at startup;
/// misshapen config fails there, not on first message.
#[derive(Deserialize)]
pub struct MyInput {
    target: String,
}

pub struct MyCustomFunction;

#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
    type Input = MyInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &MyInput,
    ) -> Result<TaskOutcome> {
        // Write into the context. `ctx.set` auto-creates intermediate
        // objects/arrays and records a `Change` on the audit trail
        // when `message.capture_changes` is on.
        ctx.set(&input.target, OwnedDataValue::from(&json!(true)));
        Ok(TaskOutcome::Success)
    }
}
}

Three concrete things the new shape removes:

  1. No match config { Custom { input, .. } => ..., _ => Err(...) } block — input is the typed parameter directly.
  2. No hand-built Change entries — ctx.set does that.
  3. No magic Ok((200, vec![])) return — TaskOutcome::Success is self-documenting.

Registering Custom Functions

#![allow(unused)]
fn main() {
let engine = Engine::builder()
    .with_workflows(workflows)
    .register("my_custom_function", MyCustomFunction)
    .build()?;
}

register("name", handler) accepts any AsyncFunctionHandler and boxes it internally. The dyn-trait name (BoxedFunctionHandler) stays out of user code.

Using Custom Functions in Rules

{
    "id": "custom_rule",
    "tasks": [
        {
            "id": "custom_action",
            "function": {
                "name": "my_custom_function",
                "input": {
                    "target": "data.processed"
                }
            }
        }
    ]
}

The input shape on the wire must match your handler’s Input struct. serde does the parse at engine init time.

Accessing Configuration

Because the engine pre-parses the JSON, configuration is just the input parameter — no extraction step. For freeform JSON, set type Input = serde_json::Value;:

use serde_json::Value;

#[async_trait]
impl AsyncFunctionHandler for FreeformHandler {
    type Input = Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &Value,
    ) -> Result<TaskOutcome> {
        let option1 = input.get("option1").and_then(Value::as_str).unwrap_or("default");
        let option2 = input.get("option2").and_then(Value::as_i64).unwrap_or(0);
        // ...
        Ok(TaskOutcome::Success)
    }
}

Evaluating JSONLogic from a handler

Custom handlers can compile and evaluate ad-hoc JSONLogic using the shared datalogic engine exposed by TaskContext::datalogic():

use bumpalo::Bump;
use dataflow_rs::prelude::*;
use serde_json::json;

#[async_trait]
impl AsyncFunctionHandler for EvalDemo {
    type Input = serde_json::Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        _input: &serde_json::Value,
    ) -> Result<TaskOutcome> {
        // Compile the expression — Arc<Logic> so it can be cached/shared.
        let compiled = ctx
            .datalogic()
            .compile_arc(&json!({"var": "data.input"}))
            .map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;

        // Evaluate against the current message context.
        let arena = Bump::new();
        let av = ctx.message().context.to_arena(&arena);
        let result = ctx
            .datalogic()
            .evaluate(&compiled, av, &arena)
            .map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;

        // `result` is a `DataValue<'_>` borrowed from the arena.
        let _owned = result.to_owned();
        Ok(TaskOutcome::Success)
    }
}

If your handler evaluates many expressions against the same context, build the DataValue<'_> once via to_arena and reuse it.

Async Operations

The trait is async/await all the way through. Real I/O works naturally:

use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::Value;

#[derive(Deserialize)]
pub struct HttpFetchInput {
    url: String,
}

pub struct HttpFetchFunction;

#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
    type Input = HttpFetchInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &HttpFetchInput,
    ) -> Result<TaskOutcome> {
        let response = reqwest::get(&input.url)
            .await
            .map_err(|e| DataflowError::http(0, e.to_string()))?;

        let body: Value = response
            .json()
            .await
            .map_err(|e| DataflowError::http(0, e.to_string()))?;

        ctx.set("data.fetched", OwnedDataValue::from(&body));
        Ok(TaskOutcome::Success)
    }
}

Error Handling

Return appropriate errors for different failure modes:

async fn execute(
    &self,
    ctx: &mut TaskContext<'_>,
    _input: &Self::Input,
) -> Result<TaskOutcome> {
    if some_validation_fails {
        return Err(DataflowError::Validation("Invalid input".to_string()));
    }

    if some_operation_fails {
        return Err(DataflowError::Task("Operation failed".to_string()));
    }

    if downstream_call_failed {
        return Err(DataflowError::function_execution(
            "HTTP call failed",
            Some(DataflowError::http(503, "Service Unavailable")),
        ));
    }

    // Or return a status code for an HTTP-style outcome that isn't an Err:
    // 200 for success, 400 for validation failure, 500 for processing failure.
    Ok(TaskOutcome::Status(500))
}

The engine routes errors and 5xx statuses through message.errors() — see Error Handling for the unified-channel contract.

Complete Example

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;

/// Calculates statistics from numeric array data
#[derive(Deserialize)]
pub struct StatisticsInput {
    /// Field inside `data` whose value is the array to summarize.
    field: String,
}

pub struct StatisticsFunction;

#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
    type Input = StatisticsInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &StatisticsInput,
    ) -> Result<TaskOutcome> {
        let numbers: Vec<f64> = ctx
            .data()
            .get(input.field.as_str())
            .and_then(|v| v.as_array())
            .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
            .unwrap_or_default();

        if numbers.is_empty() {
            return Err(DataflowError::Validation(format!(
                "Field '{}' has no numeric values",
                input.field
            )));
        }

        let sum: f64 = numbers.iter().sum();
        let count = numbers.len() as f64;
        let mean = sum / count;
        let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
        let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);

        ctx.set(
            "data.statistics",
            OwnedDataValue::from(&json!({
                "count": count,
                "sum": sum,
                "mean": mean,
                "min": min,
                "max": max,
            })),
        );
        Ok(TaskOutcome::Success)
    }
}
}

Best Practices

  1. Use a typed Input — let serde validate at startup. Reach for serde_json::Value only when the input genuinely is freeform.
  2. Mutate via ctx.set — it auto-records the audit trail. Reaching into message.context directly bypasses change capture.
  3. Return TaskOutcome cleanlySuccess for the happy path, Status(u16) for HTTP-like codes (5xx pushes a TASK_STATUS_ERROR to message.errors()), Skip for “did nothing, continue”, Halt for “stop this workflow”.
  4. Use the right error typeDataflowError::retryable looks at the variant to decide whether transient errors are worth retrying.
  5. Document — your handler’s Input struct is its contract; docstring it.
  6. Test — drive the handler with TaskContext::new(&mut message, &datalogic) and assert on the outcome and ctx.into_changes().

JSONLogic

Dataflow-rs uses JSONLogic for conditions and data transformations.

Overview

JSONLogic is a way to write rules as JSON. It’s used in dataflow-rs for:

  • Rule Conditions - Control when rules (workflows) execute, evaluated against the full context (data, metadata, temp_data)
  • Action Conditions - Control when actions (tasks) execute
  • Map Function - Transform and copy data

Basic Syntax

JSONLogic operations are objects with a single key (the operator) and value (the arguments):

{"operator": [argument1, argument2, ...]}

Data Access

var - Access Data

// Access top-level field
{"var": "data.name"}

// Access nested field
{"var": "data.user.profile.email"}

// Access array element
{"var": "data.items.0"}

// Default value if missing
{"var": ["data.optional", "default value"]}

Context Structure

In dataflow-rs, the context available to JSONLogic is:

{
    "data": { ... },
    "metadata": { ... },
    "temp_data": { ... }
}

Access fields with:

{"var": "data.field"}
{"var": "metadata.type"}
{"var": "temp_data.intermediate"}

Comparison Operators

Equality

{"==": [{"var": "data.status"}, "active"]}
{"===": [{"var": "data.count"}, 0]}  // Strict equality
{"!=": [{"var": "data.status"}, "deleted"]}
{"!==": [{"var": "data.count"}, null]}  // Strict inequality

Numeric Comparisons

{">": [{"var": "data.age"}, 18]}
{">=": [{"var": "data.score"}, 60]}
{"<": [{"var": "data.price"}, 100]}
{"<=": [{"var": "data.quantity"}, 10]}

Between

{"<=": [1, {"var": "data.x"}, 10]}  // 1 <= x <= 10
{"<": [1, {"var": "data.x"}, 10]}   // 1 < x < 10

Boolean Logic

and, or, not

{"and": [
    {">=": [{"var": "data.age"}, 18]},
    {"==": [{"var": "data.verified"}, true]}
]}

{"or": [
    {"==": [{"var": "data.status"}, "active"]},
    {"==": [{"var": "data.status"}, "pending"]}
]}

{"!": {"var": "data.disabled"}}

Truthy/Falsy

// Check if value is truthy (not null, false, 0, "")
{"!!": {"var": "data.email"}}

// Check if value is falsy
{"!": {"var": "data.deleted"}}

Conditionals

if-then-else

{"if": [
    {">=": [{"var": "data.score"}, 90]}, "A",
    {">=": [{"var": "data.score"}, 80]}, "B",
    {">=": [{"var": "data.score"}, 70]}, "C",
    "F"
]}

Ternary

{"if": [
    {"var": "data.premium"},
    "VIP Customer",
    "Standard Customer"
]}

String Operations

cat - Concatenation

{"cat": ["Hello, ", {"var": "data.name"}, "!"]}

substr - Substring

{"substr": [{"var": "data.text"}, 0, 10]}  // First 10 characters
{"substr": [{"var": "data.text"}, -5]}     // Last 5 characters

in - Contains

// Check if substring exists
{"in": ["@", {"var": "data.email"}]}

// Check if value in array
{"in": [{"var": "data.status"}, ["active", "pending"]]}

Numeric Operations

Arithmetic

{"+": [{"var": "data.a"}, {"var": "data.b"}]}
{"-": [{"var": "data.total"}, {"var": "data.discount"}]}
{"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
{"/": [{"var": "data.total"}, {"var": "data.count"}]}
{"%": [{"var": "data.n"}, 2]}  // Modulo

Min/Max

{"min": [{"var": "data.a"}, {"var": "data.b"}, 100]}
{"max": [{"var": "data.x"}, 0]}  // Ensure non-negative

Array Operations

merge - Combine Arrays

{"merge": [
    {"var": "data.list1"},
    {"var": "data.list2"}
]}

map - Transform Array

{"map": [
    {"var": "data.items"},
    {"*": [{"var": ""}, 2]}  // Double each item
]}

filter - Filter Array

{"filter": [
    {"var": "data.items"},
    {">=": [{"var": ""}, 10]}  // Items >= 10
]}

reduce - Aggregate Array

{"reduce": [
    {"var": "data.items"},
    {"+": [{"var": "accumulator"}, {"var": "current"}]},
    0  // Initial value
]}

all/some/none

{"all": [{"var": "data.items"}, {">=": [{"var": ""}, 0]}]}
{"some": [{"var": "data.items"}, {"==": [{"var": ""}, "special"]}]}
{"none": [{"var": "data.items"}, {"<": [{"var": ""}, 0]}]}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Common Patterns

Safe Field Access

// Default to empty string
{"var": ["data.optional", ""]}

// Check existence first
{"if": [
    {"!!": {"var": "data.optional"}},
    {"var": "data.optional"},
    "default"
]}

Null Coalescing

{"if": [
    {"!!": {"var": "data.primary"}},
    {"var": "data.primary"},
    {"var": "data.fallback"}
]}

Type Checking

// Check if string
{"===": [{"typeof": {"var": "data.field"}}, "string"]}

// Check if array
{"===": [{"typeof": {"var": "data.items"}}, "array"]}

Best Practices

  1. Use var Defaults - Provide defaults for optional fields
  2. Check Existence - Use !! to verify field exists before use
  3. Keep It Simple - Complex logic may be better in custom functions
  4. Test Expressions - Use the playground to test JSONLogic before deploying

Audit Trails

Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.

Overview

Every change to message data is recorded in the audit trail:

  • What changed - Path and values (old and new)
  • When it changed - Timestamp
  • Which action - Rule (workflow) and action (task) identifiers

Audit Trail Structure

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,
    pub new_value: OwnedDataValue,
}
}

old_value / new_value are owned (not Arc<OwnedDataValue>) — one less heap allocation per recorded mutation. workflow_id / task_id are Arc<str> mirrors of the workflow/task ids — the engine clones them by refcount bump rather than allocating per audit entry. status mirrors the TaskOutcome variant returned by the task: 200 for Success, the supplied code for Status(u16), and 299 (HALT_STATUS_CODE) for Halt. TaskOutcome::Skip is recorded as no audit entry at all.

Accessing the Audit Trail

After processing, the audit trail is available on the message:

#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;

for entry in message.audit_trail() {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    println!("Timestamp: {}", entry.timestamp);

    for change in &entry.changes {
        println!("  Path: {}", change.path);
        println!("  Old: {}", change.old_value);
        println!("  New: {}", change.new_value);
    }
}
}

JSON Representation

In the playground output, the audit trail appears as:

{
    "audit_trail": [
        {
            "task_id": "transform_data",
            "workflow_id": "my_workflow",
            "timestamp": "2024-01-01T12:00:00Z",
            "changes": [
                {
                    "path": "data.full_name",
                    "old_value": null,
                    "new_value": "John Doe"
                },
                {
                    "path": "data.greeting",
                    "old_value": null,
                    "new_value": "Hello, John Doe!"
                }
            ]
        }
    ]
}

What Gets Tracked

Map Function

Every mapping that modifies data creates a change entry:

{
    "mappings": [
        {"path": "data.name", "logic": "John"}
    ]
}

Creates:

{
    "path": "data.name",
    "old_value": null,
    "new_value": "John"
}

Custom Functions

Custom functions don’t build Change entries by hand — TaskContext::set records them automatically when capture_changes is on. The handler just writes the value and returns TaskOutcome::Success:

ctx.set("data.processed", OwnedDataValue::Bool(true));
Ok(TaskOutcome::Success)

Validation Function

Validation is read-only, so it produces no audit trail entries.

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice the audit trail shows each step’s changes.

Use Cases

Debugging

Trace exactly how data was transformed:

#![allow(unused)]
fn main() {
// Find where a value was set
for entry in message.audit_trail() {
    for change in &entry.changes {
        if change.path.as_ref() == "data.total" {
            println!("data.total set by {}/{}",
                entry.workflow_id, entry.task_id);
            println!("Changed from {} to {}",
                change.old_value, change.new_value);
        }
    }
}
}

Compliance

Log all changes for regulatory compliance:

#![allow(unused)]
fn main() {
for entry in message.audit_trail() {
    log_to_audit_system(
        entry.timestamp,
        entry.workflow_id.clone(),
        entry.task_id.clone(),
        &entry.changes
    );
}
}

Change Detection

Detect if specific fields were modified:

#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
    message.audit_trail().iter()
        .flat_map(|e| e.changes.iter())
        .any(|c| c.path.as_ref() == field)
}

if was_field_modified(&message, "data.price") {
    // Price was changed during processing
}
}

Rollback (Conceptual)

The audit trail can be used to implement rollback:

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;

fn get_original_value<'a>(message: &'a Message, field: &str) -> Option<&'a OwnedDataValue> {
    message.audit_trail().iter()
        .flat_map(|e| e.changes.iter())
        .find(|c| c.path.as_ref() == field)
        .map(|c| &c.old_value)
}
}

Best Practices

  1. Track All Changes - Custom functions should record all modifications
  2. Use Arc for ids - workflow_id / task_id clone via refcount bump
  3. Timestamp Accuracy - Timestamps are UTC for consistency
  4. Check Audit Trail - Review audit trail during development
  5. Log for Production - Persist audit trails for production debugging
  6. Bulk Pipelines - Build the message with Message::builder().capture_changes(false).build() to skip per-write change capture in throughput-critical pipelines (audit entries are still recorded with empty changes).

Performance

Dataflow-rs is designed for high-performance rule evaluation and data processing with minimal overhead.

Architecture for Performance

Pre-compilation

All JSONLogic expressions are compiled once at engine startup:

#![allow(unused)]
fn main() {
// Builder is the recommended construction path; compiles all
// JSONLogic at .build() and pre-parses Custom-task inputs into
// their typed Self::Input.
let engine = Engine::builder()
    .with_workflows(workflows)
    .build()?;

// Runtime processing uses pre-compiled logic — no parsing or
// compilation overhead.
engine.process_message(&mut message).await?;
}

Benefits of Pre-compilation

  • Zero runtime parsing - No JSON parsing during message processing
  • Cached compiled logic - O(1) access to compiled expressions
  • Early validation - Invalid expressions caught at startup
  • Consistent latency - Predictable performance per message

Memory Efficiency

  • Arc-wrapped compiled logic - Shared without copying
  • Immutable workflows - Safe concurrent access
  • Context caching - Avoids repeated JSON cloning

Benchmarking

Run the included benchmark:

cargo run --example benchmark --release

Sample Benchmark

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;

// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::builder().with_workflow(workflow).build()?;

// Benchmark
let iterations = 10_000;
let start = Instant::now();

for _ in 0..iterations {
    let mut message = Message::from_value(&test_data);
    engine.process_message(&mut message).await?;
}

let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
}

Optimization Tips

1. Minimize Mappings

Combine related transformations:

// Less efficient: Multiple mappings
{
    "mappings": [
        {"path": "data.a", "logic": {"var": "data.source.a"}},
        {"path": "data.b", "logic": {"var": "data.source.b"}},
        {"path": "data.c", "logic": {"var": "data.source.c"}}
    ]
}

// More efficient: Single object mapping when possible
{
    "mappings": [
        {"path": "data", "logic": {"var": "data.source"}}
    ]
}

2. Use Conditions Wisely

Skip unnecessary processing with conditions:

{
    "id": "expensive_task",
    "condition": {"==": [{"var": "metadata.needs_processing"}, true]},
    "function": { ... }
}

3. Order Rules by Frequency

Put frequently-executed rules earlier (lower priority):

{"id": "common_rule", "priority": 1, ...}
{"id": "rare_rule", "priority": 100, ...}

4. Use temp_data

Store intermediate results to avoid recomputation:

{
    "mappings": [
        {
            "path": "temp_data.computed",
            "logic": {"expensive": "computation"}
        },
        {
            "path": "data.result1",
            "logic": {"var": "temp_data.computed"}
        },
        {
            "path": "data.result2",
            "logic": {"var": "temp_data.computed"}
        }
    ]
}

5. Avoid Unnecessary Validation

Validate only what’s necessary:

// Validate at system boundaries
{
    "id": "input_validation",
    "condition": {"==": [{"var": "metadata.source"}, "external"]},
    "tasks": [
        {"id": "validate", "function": {"name": "validation", ...}}
    ]
}

Concurrent Processing

Process multiple messages concurrently:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::builder().with_workflows(workflows).build()?);

let handles: Vec<_> = messages.into_iter()
    .map(|mut msg| {
        let engine = Arc::clone(&engine);
        task::spawn(async move {
            engine.process_message(&mut msg).await
        })
    })
    .collect();

// Wait for all
for handle in handles {
    handle.await??;
}
}

Thread Safety

  • Engine is Send + Sync
  • Compiled logic shared via Arc
  • Each message processed independently

Memory Considerations

Large Messages

For very large messages, consider:

  1. Streaming - Process chunks instead of entire payload
  2. Selective Loading - Load only needed fields
  3. Cleanup temp_data - Clear intermediate results when done

Many Rules

For many rules:

  1. Organize by Domain - Group related rules
  2. Use Conditions - Skip irrelevant rules early
  3. Profile - Identify bottleneck rules

Profiling

Enable Logging

#![allow(unused)]
fn main() {
env_logger::Builder::from_env(
    env_logger::Env::default().default_filter_or("debug")
).init();
}

Custom Metrics

#![allow(unused)]
fn main() {
use std::time::Instant;

let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();

metrics::histogram!("dataflow.processing_time", duration);
}

Production Recommendations

  1. Build with –release - Debug builds are significantly slower
  2. Pre-warm - Process a few messages at startup to warm caches
  3. Monitor - Track processing times and error rates
  4. Profile - Identify slow rules in production
  5. Scale Horizontally - Engine is stateless, scale with instances

API Reference

Quick reference for the main dataflow-rs types and methods.

Type Aliases

Dataflow-rs provides rules-engine aliases alongside the original workflow terminology:

Rules EngineWorkflow EngineImport
RulesEngineEngineuse dataflow_rs::RulesEngine;
RuleWorkflowuse dataflow_rs::Rule;
ActionTaskuse dataflow_rs::Action;

Both names refer to the same types — use whichever fits your mental model.

Engine (RulesEngine)

The central component that evaluates rules and processes messages.

#![allow(unused)]
fn main() {
use dataflow_rs::Engine;  // or: use dataflow_rs::RulesEngine;
}

Constructors

#![allow(unused)]
fn main() {
// Recommended path — fluent builder.
pub fn builder() -> EngineBuilder

// Lower-level entry. Use HashMap::new() for the no-handler case.
pub fn new(
    workflows: Vec<Workflow>,
    custom_functions: HashMap<String, BoxedFunctionHandler>,
) -> Result<Engine>
}

EngineBuilder (#[must_use]) chains .register("name", handler), .register_boxed(name, boxed), .with_workflow(w), .with_workflows(iter), then .build() -> Result<Engine>. All JSONLogic is compiled and Custom inputs are pre-parsed into their typed Self::Input at .build() — config-shape errors fail there, not on first message.

Methods

#![allow(unused)]
fn main() {
// Process a message through all matching rules
pub async fn process_message(&self, message: &mut Message) -> Result<()>

// Process with execution trace for debugging
pub async fn process_message_with_trace(&self, message: &mut Message) -> Result<ExecutionTrace>

// Process only workflows on a specific channel (O(1) lookup)
pub async fn process_message_for_channel(&self, channel: &str, message: &mut Message) -> Result<()>

// Channel routing with execution trace
pub async fn process_message_for_channel_with_trace(&self, channel: &str, message: &mut Message) -> Result<ExecutionTrace>

// Get registered rules (sorted by priority)
pub fn workflows(&self) -> &Arc<Vec<Workflow>>

// Find a workflow by ID
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>

// Create a new engine with different workflows, preserving custom functions
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self
}

Workflow (Rule)

A collection of actions with optional conditions and priority.

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;  // or: use dataflow_rs::Rule;
}

Constructors

#![allow(unused)]
fn main() {
// Parse from JSON string
pub fn from_json(json: &str) -> Result<Workflow>

// Load from file
pub fn from_file(path: &str) -> Result<Workflow>

// Convenience constructor for rules-engine pattern
pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self
}

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "priority": "number (optional, default: 0)",
    "condition": "JSONLogic (optional, evaluated against full context)",
    "continue_on_error": "boolean (optional, default: false)",
    "tasks": "array of Task (required)",
    "channel": "string (optional, default: 'default')",
    "version": "number (optional, default: 1)",
    "status": "'active' | 'paused' | 'archived' (optional, default: 'active')",
    "tags": "array of string (optional, default: [])",
    "created_at": "ISO 8601 datetime (optional)",
    "updated_at": "ISO 8601 datetime (optional)"
}

Task (Action)

An individual processing unit within a rule.

#![allow(unused)]
fn main() {
use dataflow_rs::Task;  // or: use dataflow_rs::Action;
}

Constructor

#![allow(unused)]
fn main() {
// Convenience constructor for rules-engine pattern
pub fn action(id: &str, name: &str, function: FunctionConfig) -> Self
}

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "condition": "JSONLogic (optional, evaluated against full context)",
    "continue_on_error": "boolean (optional)",
    "function": {
        "name": "string (required)",
        "input": "object (required)"
    }
}

Message

The data container that flows through rules. The context tree is held as datavalue::OwnedDataValue (not serde_json::Value) so the JSONLogic evaluator can borrow it into its arena without a serde_json round-trip.

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use datavalue::OwnedDataValue;
use std::sync::Arc;
}

Constructors

#![allow(unused)]
fn main() {
// Fluent builder — recommended path for richer cases (custom id,
// capture_changes off, etc.).
pub fn builder() -> MessageBuilder

// Native zero-conversion entry point — perf path.
pub fn new(payload: Arc<OwnedDataValue>) -> Message

// Convenience: bridge from a serde_json::Value payload.
pub fn from_value(payload: &serde_json::Value) -> Message
}

MessageBuilder (#[must_use]) chains .id(...), .payload(Arc<OwnedDataValue>) / .payload_json(&serde_json::Value), .capture_changes(bool), then .build() -> Message.

Structure

#![allow(unused)]
fn main() {
pub struct Message {
    pub context: OwnedDataValue,    // Always Object {data, metadata, temp_data}
    // ... encapsulated fields ...
}
}

context is the only pub field — it’s the legitimate read surface (tests do message.context["data"]["x"] lookups). Every other field is read via accessors and mutated via add_error (errors) or TaskContext::set (context) so audit-trail changes are recorded.

Methods

#![allow(unused)]
fn main() {
// Identity + payload
pub fn id(&self) -> &str
pub fn payload(&self) -> &OwnedDataValue
pub fn payload_arc(&self) -> &Arc<OwnedDataValue>

// Context accessors
pub fn data(&self) -> &OwnedDataValue
pub fn metadata(&self) -> &OwnedDataValue
pub fn temp_data(&self) -> &OwnedDataValue

// Error + audit views
pub fn errors(&self) -> &[ErrorInfo]
pub fn audit_trail(&self) -> &[AuditTrail]
pub fn capture_changes(&self) -> bool

// Mutation (constructive)
pub fn add_error(&mut self, error: ErrorInfo)

// Predicates
pub fn has_errors(&self) -> bool
}

Inside a custom AsyncFunctionHandler, mutate the context via TaskContext::set — it records audit-trail changes automatically.

AsyncFunctionHandler

Trait for implementing custom action handlers. See Custom Functions for the full walk-through.

#![allow(unused)]
fn main() {
use dataflow_rs::prelude::*;
}

Trait Definition

#![allow(unused)]
fn main() {
use serde::de::DeserializeOwned;

#[async_trait]
pub trait AsyncFunctionHandler: Send + Sync + 'static {
    /// Typed configuration shape for this handler. Use
    /// `serde_json::Value` for freeform JSON.
    type Input: DeserializeOwned + Send + Sync + 'static;

    /// Parse the raw `FunctionConfig::Custom { input }` JSON into
    /// `Self::Input`. Default impl uses `serde_json::from_value`;
    /// override only for custom validation beyond what serde provides.
    fn parse_input(input: &serde_json::Value) -> Result<Self::Input> { ... }

    /// Execute the handler.
    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &Self::Input,
    ) -> Result<TaskOutcome>;
}
}

The engine pre-parses each FunctionConfig::Custom { input } JSON into the registered handler’s typed Self::Input at Engine::builder().build() (or Engine::new) — config-shape errors fail there, not on first message.

Boxing

#![allow(unused)]
fn main() {
pub type BoxedFunctionHandler = Box<dyn DynAsyncFunctionHandler + Send + Sync>;
}

Stored in the engine’s registry. Users construct these via Box::new(handler) (or via Engine::builder().register("name", handler)) — the dyn-trait plumbing stays out of user code.

TaskContext

Per-call context handed to every AsyncFunctionHandler::execute call.

#![allow(unused)]
fn main() {
pub struct TaskContext<'a> { /* ... */ }

impl<'a> TaskContext<'a> {
    // Read accessors
    pub fn message(&self) -> &Message
    pub fn message_mut(&mut self) -> &mut Message
    pub fn datalogic(&self) -> &Arc<datalogic_rs::Engine>
    pub fn data(&self) -> &OwnedDataValue
    pub fn metadata(&self) -> &OwnedDataValue
    pub fn temp_data(&self) -> &OwnedDataValue
    pub fn get(&self, path: &str) -> Option<&OwnedDataValue>

    // Audit-trail-aware mutation
    pub fn set(&mut self, path: &str, value: OwnedDataValue)
    pub fn set_json(&mut self, path: &str, value: &serde_json::Value)
    pub fn add_error(&mut self, error: ErrorInfo)
}
}

set records a Change on the audit trail when message.capture_changes is true, then writes through set_nested_value (auto-creates intermediate objects/arrays, handles #-prefix escapes).

TaskOutcome

Return value of every handler:

#![allow(unused)]
fn main() {
pub enum TaskOutcome {
    Success,         // audit status 200, continue
    Status(u16),     // audit status = code; 5xx pushes TASK_STATUS_ERROR
    Skip,            // no audit entry, continue
    Halt,            // audit status 299 (HALT_STATUS_CODE), stop workflow
}
}

FunctionConfig

FunctionConfig is an enum: every built-in is a typed variant, and unknown function names deserialize into Custom { name, input }. Custom handlers typically destructure the Custom variant to access their config.

#![allow(unused)]
fn main() {
pub enum FunctionConfig {
    Map { input: MapConfig, .. },
    Validation { input: ValidationConfig, .. },
    ParseJson { input: ParseConfig, .. },
    ParseXml { input: ParseConfig, .. },
    PublishJson { input: PublishConfig, .. },
    PublishXml { input: PublishConfig, .. },
    Filter { input: FilterConfig, .. },
    Log { input: LogConfig, .. },
    HttpCall { input: HttpCallConfig, .. },
    Enrich { input: EnrichConfig, .. },
    PublishKafka { input: PublishKafkaConfig, .. },
    Custom {
        name: String,
        input: serde_json::Value,
        // #[serde(skip)] — populated by the engine at .build() with the
        // typed Self::Input for the registered handler.
        compiled_input: Option<CompiledCustomInput>,
    },
}
}

Change

Represents a single data modification recorded in the audit trail.

#![allow(unused)]
fn main() {
pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,
    pub new_value: OwnedDataValue,
}
}

old_value and new_value are owned (not Arc<OwnedDataValue>) — one less heap allocation per recorded mutation. Wrap them yourself if you need to share a Change across threads.

AuditTrail

Records changes made by an action. workflow_id / task_id are Arc<str> mirrors of the workflow/task ids — the engine clones them by refcount bump rather than allocating per audit entry.

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}
}

ErrorInfo

Error information recorded in the message.

#![allow(unused)]
fn main() {
pub struct ErrorInfo {
    pub code: String,
    pub message: String,
    pub path: Option<String>,
    pub workflow_id: Option<String>,
    pub task_id: Option<String>,
    pub timestamp: Option<String>,
    pub retry_attempted: Option<bool>,
    pub retry_count: Option<u32>,
}
}

DataflowError

Main error type for the library.

#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
}

Variants

#![allow(unused)]
fn main() {
pub enum DataflowError {
    Validation(String),
    FunctionExecution { context: String, source: Option<Box<DataflowError>> },
    Workflow(String),
    Task(String),
    FunctionNotFound(String),
    Deserialization(String),
    Io(String),
    LogicEvaluation(String),
    Http { status: u16, message: String },
    Timeout(String),
    Unknown(String),
}
}

DataflowError::retryable() returns true for transient infrastructure failures (5xx HTTP, 429, 408, timeouts, IO) and false for data/logic/ configuration errors.

WorkflowStatus

Lifecycle status for workflows.

#![allow(unused)]
fn main() {
use dataflow_rs::WorkflowStatus;
}

Variants

#![allow(unused)]
fn main() {
pub enum WorkflowStatus {
    Active,    // Default — workflow executes normally
    Paused,    // Excluded from channel routing
    Archived,  // Permanently retired
}
}

Built-in Functions

map

Data transformation using JSONLogic.

{
    "name": "map",
    "input": {
        "mappings": [
            {
                "path": "string",
                "logic": "JSONLogic expression"
            }
        ]
    }
}

validation

Rule-based data validation.

{
    "name": "validation",
    "input": {
        "rules": [
            {
                "logic": "JSONLogic expression",
                "message": "string"
            }
        ]
    }
}

filter

Pipeline control flow — halt workflow or skip task.

{
    "name": "filter",
    "input": {
        "condition": "JSONLogic expression",
        "on_reject": "halt | skip (default: halt)"
    }
}

Returns TaskOutcome::Success (pass), TaskOutcome::Skip (no audit entry, continue), or TaskOutcome::Halt (audit status 299, stop workflow) depending on the condition and on_reject.

log

Structured logging with JSONLogic expressions.

{
    "name": "log",
    "input": {
        "level": "trace | debug | info | warn | error (default: info)",
        "message": "JSONLogic expression",
        "fields": {
            "key": "JSONLogic expression"
        }
    }
}

Always returns TaskOutcome::Success — never modifies the message.

WASM API (dataflow-wasm)

For browser/JavaScript usage.

import init, { WasmEngine, process_message } from 'dataflow-wasm';

// Initialize
await init();

// Create engine
const engine = new WasmEngine(workflowsJson);

// Process with payload string (returns Promise)
const result = await engine.process(payloadStr);

// One-off convenience function (no engine needed)
const result2 = await process_message(workflowsJson, payloadStr);

// Get rule info
const count = engine.workflow_count();
const ids = engine.workflow_ids();

Full API Documentation

For complete API documentation, run:

cargo doc --open

This generates detailed documentation from the source code comments.