Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Plasmatic Logo

Dataflow-rs

A high-performance workflow engine for building data processing pipelines in Rust with zero-overhead JSONLogic evaluation.



License: Apache 2.0 Rust Crates.io


Dataflow-rs is a high-performance workflow engine for building data processing pipelines in Rust with zero-overhead JSONLogic evaluation.

Whether you’re building REST APIs, processing Kafka streams, or creating sophisticated data transformation pipelines, Dataflow-rs provides enterprise-grade performance with minimal complexity.

Key Features

  • Zero Runtime Compilation - All JSONLogic expressions pre-compiled at startup for optimal performance
  • Modular Architecture - Clear separation between compilation and execution phases
  • Dynamic Workflows - Use JSONLogic to control workflow execution based on your data
  • Extensible - Easily add custom processing steps (tasks) to the engine
  • Built-in Functions - Thread-safe implementations of data mapping and validation
  • Resilient - Built-in error handling and retry mechanisms for transient failures
  • Auditing - Track all changes to your data as it moves through the pipeline

Try It Now

Experience the power of dataflow-rs directly in your browser. Define a workflow and message, then see the processing result instantly.

How It Works

  1. Define Workflows - Create JSON-based workflow definitions with tasks
  2. Create an Engine - Initialize the engine with your workflows (compiled once at startup)
  3. Process Messages - Send messages through the engine for processing
  4. Get Results - Receive transformed data with full audit trail

Next Steps

Playground

Try dataflow-rs directly in your browser. Define workflows, create messages, and see the processing results in real-time.

How to Use

  1. Select an Example - Choose from the dropdown or write your own
  2. Edit Workflows - Modify the workflow JSON on the left panel
  3. Edit Message - Customize the input message on the right panel
  4. Process - Click “Process Message” or press Ctrl+Enter
  5. View Results - See the processed output with data, metadata, and audit trail

Tips

  • JSONLogic - Use JSONLogic expressions in your workflows for dynamic data access and transformation
  • Multiple Tasks - Add multiple tasks to a workflow for sequential processing
  • Multiple Workflows - Define multiple workflows that execute in priority order
  • Conditions - Add conditions to tasks or workflows to control when they execute
  • Audit Trail - The output shows all changes made during processing

Installation

Add dataflow-rs to your Rust project using Cargo.

Requirements

  • Rust 1.70 or later
  • Cargo (comes with Rust)

Add to Cargo.toml

[dependencies]
dataflow-rs = "2.0"
serde_json = "1.0"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros"] }

Verify Installation

Create a simple test to verify the installation:

use dataflow_rs::Engine;

fn main() {
    // Create an empty engine
    let engine = Engine::new(vec![], None);
    println!("Engine created with {} workflows", engine.workflows().len());
}

Run with:

cargo run

You should see:

Engine created with 0 workflows

Optional Dependencies

Depending on your use case, you may want to add:

[dependencies]
# For async operations
async-trait = "0.1"

# For custom error handling
thiserror = "2.0"

# For logging
log = "0.4"
env_logger = "0.11"

Next Steps

Quick Start

Build your first data processing workflow in minutes.

Create a Simple Workflow

Workflows are defined in JSON and consist of tasks that process data sequentially.

use dataflow_rs::{Engine, Workflow, Message};
use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Define a workflow that transforms data
    let workflow_json = r#"{
        "id": "greeting_workflow",
        "name": "Greeting Workflow",
        "tasks": [
            {
                "id": "create_greeting",
                "name": "Create Greeting",
                "function": {
                    "name": "map",
                    "input": {
                        "mappings": [
                            {
                                "path": "data.greeting",
                                "logic": { "cat": ["Hello, ", {"var": "data.name"}, "!"] }
                            }
                        ]
                    }
                }
            }
        ]
    }"#;

    // Parse the workflow
    let workflow = Workflow::from_json(workflow_json)?;

    // Create the engine (compiles all logic at startup)
    let engine = Engine::new(vec![workflow], None);

    // Create a message to process
    let mut message = Message::new(&json!({
        "name": "World"
    }));

    // Process the message
    engine.process_message(&mut message).await?;

    // Print the result
    println!("Result: {}", serde_json::to_string_pretty(&message.context)?);

    Ok(())
}

Try It Interactively

Understanding the Code

  1. Workflow Definition - JSON structure defining tasks to execute
  2. Engine Creation - Compiles all JSONLogic expressions at startup
  3. Message Creation - Input data wrapped in a Message structure
  4. Processing - Engine runs the message through all matching workflows
  5. Result - Modified message with transformed data and audit trail

Add Validation

Extend your workflow with data validation:

{
    "id": "validated_workflow",
    "name": "Validated Workflow",
    "tasks": [
        {
            "id": "validate_input",
            "name": "Validate Input",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {
                            "condition": { "!!": {"var": "data.name"} },
                            "error_message": "Name is required"
                        }
                    ]
                }
            }
        },
        {
            "id": "create_greeting",
            "name": "Create Greeting",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {
                            "path": "data.greeting",
                            "logic": { "cat": ["Hello, ", {"var": "data.name"}, "!"] }
                        }
                    ]
                }
            }
        }
    ]
}

Next Steps

Basic Concepts

Understanding the core components of dataflow-rs.

Architecture Overview

Dataflow-rs follows a two-phase architecture:

  1. Compilation Phase (Startup) - All JSONLogic expressions are compiled once
  2. Execution Phase (Runtime) - Messages are processed using compiled logic
┌─────────────────────────────────────────────────────────────┐
│                    Compilation Phase                         │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────┐  │
│  │ Workflows│ -> │ LogicCompiler│ -> │ Compiled Cache   │  │
│  └──────────┘    └──────────────┘    └──────────────────┘  │
└─────────────────────────────────────────────────────────────┘
                              │
                              v
┌─────────────────────────────────────────────────────────────┐
│                    Execution Phase                           │
│  ┌─────────┐    ┌────────┐    ┌──────────────────────────┐ │
│  │ Message │ -> │ Engine │ -> │ Processed Message        │ │
│  └─────────┘    └────────┘    │ (data + audit trail)     │ │
│                               └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Core Components

Engine

The central orchestrator that processes messages through workflows.

#![allow(unused)]
fn main() {
use dataflow_rs::Engine;

// Create engine with workflows (compiled at creation)
let engine = Engine::new(workflows, custom_functions);

// Process messages (uses pre-compiled logic)
engine.process_message(&mut message).await?;
}

Workflow

A collection of tasks executed sequentially. Workflows can have:

  • Priority - Determines execution order (lower = first)
  • Conditions - JSONLogic expression to control when workflow runs
{
    "id": "my_workflow",
    "name": "My Workflow",
    "priority": 1,
    "condition": { "==": [{"var": "metadata.type"}, "user"] },
    "tasks": [...]
}

Task

An individual processing unit within a workflow. Tasks can:

  • Execute built-in functions (map, validation)
  • Execute custom functions
  • Have conditions for conditional execution
{
    "id": "transform_data",
    "name": "Transform Data",
    "condition": { "!!": {"var": "data.name"} },
    "function": {
        "name": "map",
        "input": { ... }
    }
}

Message

The data structure that flows through workflows. Contains:

  • context.data - Main data payload
  • context.metadata - Message metadata
  • context.temp_data - Temporary processing data
  • audit_trail - Change history
  • errors - Collected errors
#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

let mut message = Message::new(&json!({
    "name": "John",
    "email": "john@example.com"
}));

// Access after processing
println!("Data: {:?}", message.context["data"]);
println!("Audit: {:?}", message.audit_trail);
}

Data Flow

  1. Input - Message created with initial data
  2. Workflow Selection - Engine selects matching workflows by condition
  3. Task Execution - Tasks run sequentially within each workflow
  4. Output - Message contains transformed data and audit trail
Message (input)
    │
    v
┌─────────────────────────────────────────┐
│ Workflow 1 (priority: 1)                │
│   Task 1 -> Task 2 -> Task 3            │
└─────────────────────────────────────────┘
    │
    v
┌─────────────────────────────────────────┐
│ Workflow 2 (priority: 2)                │
│   Task 1 -> Task 2                      │
└─────────────────────────────────────────┘
    │
    v
Message (output with audit trail)

JSONLogic

Dataflow-rs uses JSONLogic for:

  • Conditions - Control when workflows/tasks execute
  • Data Access - Read values from message context
  • Transformations - Transform and combine data

Common operations:

// Access data
{"var": "data.name"}

// String concatenation
{"cat": ["Hello, ", {"var": "data.name"}]}

// Conditionals
{"if": [{"var": "data.premium"}, "VIP", "Standard"]}

// Comparisons
{"==": [{"var": "metadata.type"}, "user"]}

Next Steps

Core Concepts Overview

Dataflow-rs is built around a small set of core concepts that work together to process data efficiently.

The Big Picture

┌─────────────────────────────────────────────────────────────────────┐
│                           Engine                                     │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    Compiled Logic Cache                        │  │
│  │  (All JSONLogic pre-compiled at startup)                       │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │
│  │ Workflow 1  │  │ Workflow 2  │  │ Workflow N  │                 │
│  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │                 │
│  │ │ Task 1  │ │  │ │ Task 1  │ │  │ │ Task 1  │ │                 │
│  │ │ Task 2  │ │  │ │ Task 2  │ │  │ │ Task 2  │ │                 │
│  │ │ ...     │ │  │ │ ...     │ │  │ │ ...     │ │                 │
│  │ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │                 │
│  └─────────────┘  └─────────────┘  └─────────────┘                 │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              v
┌─────────────────────────────────────────────────────────────────────┐
│                          Message                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌────────────┐ │
│  │    data     │  │  metadata   │  │  temp_data  │  │ audit_trail│ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Component Summary

ComponentPurposeKey Features
EngineOrchestrates processingPre-compiled logic, workflow management
WorkflowGroups related tasksPriority ordering, conditions
TaskIndividual processing unitBuilt-in or custom functions
MessageData containerData, metadata, audit trail

Processing Flow

  1. Engine Initialization

    • Parse workflow definitions
    • Compile all JSONLogic expressions
    • Store in indexed cache
  2. Message Processing

    • Create message with input data
    • Engine evaluates workflow conditions
    • Matching workflows execute in priority order
  3. Task Execution

    • Tasks run sequentially within workflows
    • Each task can modify message data
    • Changes recorded in audit trail
  4. Result

    • Message contains transformed data
    • Audit trail shows all modifications
    • Errors collected (if any)

Key Design Principles

Pre-compilation

All JSONLogic expressions are compiled once at engine creation. This eliminates runtime parsing overhead and ensures consistent, predictable performance.

Immutability

Workflows are immutable after engine creation. This enables safe concurrent processing and eliminates race conditions.

Separation of Concerns

  • LogicCompiler handles all compilation
  • InternalExecutor executes built-in functions
  • Engine orchestrates the flow

Audit Trail

Every data modification is recorded, providing complete visibility into processing steps for debugging and compliance.

Detailed Documentation

  • Engine - The central orchestrator
  • Workflow - Task collections with conditions
  • Task - Individual processing units
  • Message - Data container with audit trail
  • Error Handling - Managing failures gracefully

Engine

The Engine is the central component that orchestrates message processing through workflows.

Overview

The Engine is responsible for:

  • Compiling all JSONLogic expressions at initialization
  • Managing workflow execution order
  • Processing messages through matching workflows
  • Coordinating task execution

Creating an Engine

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow};
use std::collections::HashMap;

// Parse workflows from JSON
let workflow1 = Workflow::from_json(r#"{
    "id": "workflow1",
    "name": "First Workflow",
    "priority": 1,
    "tasks": [...]
}"#)?;

let workflow2 = Workflow::from_json(r#"{
    "id": "workflow2",
    "name": "Second Workflow",
    "priority": 2,
    "tasks": [...]
}"#)?;

// Create engine with workflows
let engine = Engine::new(
    vec![workflow1, workflow2],
    None  // Optional custom functions
);

// Engine is now ready - all logic compiled
println!("Loaded {} workflows", engine.workflows().len());
}

Processing Messages

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

// Create a message
let mut message = Message::new(&json!({
    "user": "john",
    "action": "login"
}));

// Process through all matching workflows
engine.process_message(&mut message).await?;

// Access results
println!("Processed data: {:?}", message.context["data"]);
println!("Audit trail: {:?}", message.audit_trail);
}

Workflow Execution Order

Workflows execute in priority order (lowest priority number first):

#![allow(unused)]
fn main() {
// Priority 1 executes first
let high_priority = Workflow::from_json(r#"{
    "id": "high",
    "priority": 1,
    "tasks": [...]
}"#)?;

// Priority 10 executes later
let low_priority = Workflow::from_json(r#"{
    "id": "low",
    "priority": 10,
    "tasks": [...]
}"#)?;
}

Workflow Conditions

Workflows can have conditions that determine if they should execute:

{
    "id": "user_workflow",
    "name": "User Workflow",
    "condition": { "==": [{"var": "metadata.type"}, "user"] },
    "tasks": [...]
}

The workflow only executes if the condition evaluates to true.

Custom Functions

Register custom functions when creating the engine:

#![allow(unused)]
fn main() {
use dataflow_rs::engine::AsyncFunctionHandler;
use std::collections::HashMap;

let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> = HashMap::new();
custom_functions.insert("my_function".to_string(), Box::new(MyCustomFunction));

let engine = Engine::new(workflows, Some(custom_functions));
}

Thread Safety

The Engine is designed for concurrent use:

  • Workflows are immutable after creation
  • Compiled logic is shared via Arc
  • Each message is processed independently
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::new(workflows, None));

// Process multiple messages concurrently
let handles: Vec<_> = messages.into_iter().map(|mut msg| {
    let engine = Arc::clone(&engine);
    task::spawn(async move {
        engine.process_message(&mut msg).await
    })
}).collect();

// Wait for all to complete
for handle in handles {
    handle.await??;
}
}

API Reference

Engine::new(workflows, custom_functions)

Creates a new engine with the given workflows and optional custom functions.

  • workflows: Vec<Workflow> - Workflows to register
  • custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler>>> - Custom function implementations

engine.process_message(&mut message)

Processes a message through all matching workflows.

  • Returns Result<()> - Ok if processing succeeded
  • Message is modified in place with results and audit trail

engine.workflows()

Returns a reference to the registered workflows.

#![allow(unused)]
fn main() {
let workflow_ids: Vec<&String> = engine.workflows().keys().collect();
}

Workflow

A Workflow is a collection of tasks that execute sequentially to process data.

Overview

Workflows provide:

  • Task Organization - Group related processing steps
  • Priority Ordering - Control execution order across workflows
  • Conditional Execution - Only run when conditions are met
  • Error Handling - Continue or stop on errors

Workflow Structure

{
    "id": "user_processor",
    "name": "User Processor",
    "priority": 1,
    "condition": { "==": [{"var": "metadata.type"}, "user"] },
    "continue_on_error": false,
    "tasks": [
        {
            "id": "validate_user",
            "name": "Validate User",
            "function": { ... }
        },
        {
            "id": "enrich_user",
            "name": "Enrich User Data",
            "function": { ... }
        }
    ]
}

Fields

FieldTypeRequiredDescription
idstringYesUnique workflow identifier
namestringNoHuman-readable name
prioritynumberNoExecution order (default: 0)
conditionJSONLogicNoWhen to execute workflow
continue_on_errorbooleanNoContinue on task failure (default: false)
tasksarrayYesTasks to execute

Creating Workflows

From JSON String

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;

let workflow = Workflow::from_json(r#"{
    "id": "my_workflow",
    "name": "My Workflow",
    "tasks": [...]
}"#)?;
}

From File

#![allow(unused)]
fn main() {
let workflow = Workflow::from_file("workflows/my_workflow.json")?;
}

Priority Ordering

Workflows execute in priority order (lowest first):

// Executes first (priority 1)
{
    "id": "validation",
    "priority": 1,
    "tasks": [...]
}

// Executes second (priority 2)
{
    "id": "transformation",
    "priority": 2,
    "tasks": [...]
}

// Executes last (priority 10)
{
    "id": "notification",
    "priority": 10,
    "tasks": [...]
}

Conditional Execution

Use JSONLogic conditions to control when workflows run:

{
    "id": "premium_user_workflow",
    "condition": {
        "and": [
            {"==": [{"var": "metadata.type"}, "user"]},
            {"==": [{"var": "data.premium"}, true]}
        ]
    },
    "tasks": [...]
}

Common Condition Patterns

// Match metadata type
{"==": [{"var": "metadata.type"}, "order"]}

// Check data exists
{"!!": {"var": "data.email"}}

// Multiple conditions
{"and": [
    {">=": [{"var": "data.amount"}, 100]},
    {"==": [{"var": "data.currency"}, "USD"]}
]}

// Either condition
{"or": [
    {"==": [{"var": "metadata.source"}, "api"]},
    {"==": [{"var": "metadata.source"}, "webhook"]}
]}

Error Handling

Stop on Error (Default)

{
    "id": "strict_workflow",
    "continue_on_error": false,
    "tasks": [...]
}

If any task fails, the workflow stops and the error is recorded.

Continue on Error

{
    "id": "resilient_workflow",
    "continue_on_error": true,
    "tasks": [...]
}

Tasks continue executing even if previous tasks fail. Errors are collected in message.errors.

Task Dependencies

Tasks within a workflow execute sequentially, allowing later tasks to depend on earlier results:

{
    "id": "pipeline",
    "tasks": [
        {
            "id": "fetch_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.fetched", "logic": {"var": "data.source"}}
                    ]
                }
            }
        },
        {
            "id": "process_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": {"var": "temp_data.fetched"}}
                    ]
                }
            }
        }
    ]
}

Try It

Try changing metadata.type to something other than “user” to see the workflow skip.

Task

A Task is an individual processing unit within a workflow that executes a function.

Overview

Tasks are the building blocks of workflows. Each task:

  • Executes a single function (built-in or custom)
  • Can have a condition for conditional execution
  • Can modify message data
  • Records changes in the audit trail

Task Structure

{
    "id": "transform_data",
    "name": "Transform Data",
    "condition": { "!!": {"var": "data.input"} },
    "continue_on_error": false,
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {
                    "path": "data.output",
                    "logic": {"var": "data.input"}
                }
            ]
        }
    }
}

Fields

FieldTypeRequiredDescription
idstringYesUnique task identifier within workflow
namestringNoHuman-readable name
conditionJSONLogicNoWhen to execute task
continue_on_errorbooleanNoContinue workflow on failure
functionobjectYesFunction to execute

Function Configuration

The function object specifies what the task does:

{
    "function": {
        "name": "function_name",
        "input": { ... }
    }
}

Built-in Functions

FunctionPurpose
mapData transformation and field mapping
validationData validation with custom error messages

Custom Functions

Register custom functions when creating the engine:

#![allow(unused)]
fn main() {
let engine = Engine::new(workflows, Some(custom_functions));
}

Then reference them by name in tasks:

{
    "function": {
        "name": "my_custom_function",
        "input": { ... }
    }
}

Conditional Execution

Tasks can have conditions that determine if they should run:

{
    "id": "premium_greeting",
    "condition": { "==": [{"var": "data.tier"}, "premium"] },
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {"path": "data.greeting", "logic": "Welcome, VIP member!"}
            ]
        }
    }
}

Common Patterns

// Only if field exists
{"!!": {"var": "data.email"}}

// Only if field equals value
{"==": [{"var": "data.status"}, "active"]}

// Only if numeric condition
{">=": [{"var": "data.amount"}, 100]}

// Combine conditions
{"and": [
    {"!!": {"var": "data.email"}},
    {"==": [{"var": "data.verified"}, true]}
]}

Error Handling

Task-Level Error Handling

{
    "id": "optional_task",
    "continue_on_error": true,
    "function": { ... }
}

When continue_on_error is true:

  • Task errors are recorded in message.errors
  • Workflow continues to the next task

Workflow-Level Error Handling

The workflow’s continue_on_error setting applies to all tasks unless overridden.

Sequential Execution

Tasks execute in order within a workflow. Later tasks can use results from earlier tasks:

{
    "tasks": [
        {
            "id": "step1",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.intermediate", "logic": {"var": "data.raw"}}
                    ]
                }
            }
        },
        {
            "id": "step2",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.final", "logic": {"var": "temp_data.intermediate"}}
                    ]
                }
            }
        }
    ]
}

Try It

Try changing tier to “standard” to see different discount applied.

Best Practices

  1. Unique IDs - Use descriptive, unique IDs for debugging
  2. Single Responsibility - Each task should do one thing well
  3. Use temp_data - Store intermediate results in temp_data
  4. Conditions - Add conditions to skip unnecessary processing
  5. Error Handling - Use continue_on_error for optional tasks

Message

A Message is the data container that flows through workflows, carrying data, metadata, and an audit trail.

Overview

The Message structure contains:

  • context.data - Main data payload
  • context.metadata - Message metadata (routing, source info)
  • context.temp_data - Temporary processing data
  • audit_trail - Record of all changes
  • errors - Collected errors during processing

Message Structure

#![allow(unused)]
fn main() {
pub struct Message {
    pub id: Uuid,
    pub payload: Arc<Value>,
    pub context: Value,       // Contains data, metadata, temp_data
    pub audit_trail: Vec<AuditTrail>,
    pub errors: Vec<ErrorInfo>,
}
}

The context is structured as:

{
    "data": { ... },
    "metadata": { ... },
    "temp_data": { ... }
}

Creating Messages

Basic Creation

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

let mut message = Message::new(&json!({
    "name": "John",
    "email": "john@example.com"
}));
}

With Metadata

#![allow(unused)]
fn main() {
let mut message = Message::new(&json!({
    "name": "John"
}));

message.context["metadata"] = json!({
    "source": "api",
    "type": "user",
    "timestamp": "2024-01-01T00:00:00Z"
});
}

From Full Context

#![allow(unused)]
fn main() {
let mut message = Message::from_value(&json!({
    "data": {
        "name": "John"
    },
    "metadata": {
        "source": "api"
    },
    "temp_data": {}
}));
}

Context Fields

data

The main data payload. This is where your primary data lives and is transformed.

#![allow(unused)]
fn main() {
// Set data
message.context["data"]["name"] = json!("John");

// Read data
let name = &message.context["data"]["name"];
}

metadata

Information about the message itself (not the data). Commonly used for:

  • Routing decisions (workflow conditions)
  • Source tracking
  • Timestamps
  • Message type classification
#![allow(unused)]
fn main() {
message.context["metadata"] = json!({
    "type": "user",
    "source": "webhook",
    "received_at": "2024-01-01T00:00:00Z"
});
}

temp_data

Temporary storage for intermediate processing results. Cleared between processing runs if needed.

#![allow(unused)]
fn main() {
// Store intermediate result
message.context["temp_data"]["calculated_value"] = json!(42);

// Use in later task
// {"var": "temp_data.calculated_value"}
}

Audit Trail

Every modification to message data is recorded:

#![allow(unused)]
fn main() {
#[derive(Debug)]
pub struct AuditTrail {
    pub task_id: String,
    pub workflow_id: String,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: Arc<Value>,
    pub new_value: Arc<Value>,
}
}

Accessing Audit Trail

#![allow(unused)]
fn main() {
// After processing
for entry in &message.audit_trail {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    for change in &entry.changes {
        println!("  {} -> {} at {}", change.old_value, change.new_value, change.path);
    }
}
}

Error Handling

Errors are collected in message.errors:

#![allow(unused)]
fn main() {
for error in &message.errors {
    println!("Error in {}/{}: {}",
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown"),
        error.message
    );
}
}

JSONLogic Access

In workflow conditions and mappings, access message fields using JSONLogic:

// Access data fields
{"var": "data.name"}
{"var": "data.user.email"}

// Access metadata
{"var": "metadata.type"}
{"var": "metadata.source"}

// Access temp_data
{"var": "temp_data.intermediate_result"}

Try It

Notice how temp_data is used to store an intermediate result.

Best Practices

  1. Separate Concerns

    • Use data for business data
    • Use metadata for routing and tracking
    • Use temp_data for intermediate results
  2. Don’t Modify metadata in Tasks

    • Metadata should remain stable for routing decisions
  3. Clean temp_data

    • Use temp_data for values only needed during processing
  4. Check Audit Trail

    • Use the audit trail for debugging and compliance

Error Handling

Dataflow-rs provides flexible error handling at multiple levels to build resilient data pipelines.

Error Levels

Errors can be handled at three levels:

  1. Task Level - Individual task error handling
  2. Workflow Level - Workflow-wide error policy
  3. Engine Level - Processing errors

Task-Level Error Handling

Stop on Error (Default)

{
    "id": "critical_task",
    "continue_on_error": false,
    "function": { ... }
}

If the task fails:

  • Error is recorded in message.errors
  • Workflow execution stops
  • No further tasks execute

Continue on Error

{
    "id": "optional_task",
    "continue_on_error": true,
    "function": { ... }
}

If the task fails:

  • Error is recorded in message.errors
  • Workflow continues to next task

Workflow-Level Error Handling

The workflow’s continue_on_error applies to all tasks by default:

{
    "id": "resilient_workflow",
    "continue_on_error": true,
    "tasks": [
        {"id": "task1", "function": { ... }},
        {"id": "task2", "function": { ... }},
        {"id": "task3", "function": { ... }}
    ]
}

All tasks will continue even if earlier tasks fail.

Override at Task Level

{
    "id": "mixed_workflow",
    "continue_on_error": true,
    "tasks": [
        {"id": "optional_task", "function": { ... }},
        {
            "id": "critical_task",
            "continue_on_error": false,
            "function": { ... }
        }
    ]
}

Accessing Errors

After processing, check message.errors:

#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;

if !message.errors.is_empty() {
    for error in &message.errors {
        println!("Error: {} in {}/{}",
            error.message,
            error.workflow_id.as_deref().unwrap_or("unknown"),
            error.task_id.as_deref().unwrap_or("unknown")
        );
    }
}
}

Error Types

Validation Errors

Generated by the validation function when rules fail:

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "condition": {"!!": {"var": "data.email"}},
                    "error_message": "Email is required"
                }
            ]
        }
    }
}

Execution Errors

Generated when function execution fails:

  • JSONLogic evaluation errors
  • Data type mismatches
  • Missing required fields

Custom Function Errors

Return errors from custom functions:

#![allow(unused)]
fn main() {
impl AsyncFunctionHandler for MyFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        if some_condition {
            return Err(DataflowError::ExecutionError(
                "Custom error message".to_string()
            ));
        }
        Ok((200, vec![]))
    }
}
}

Error Recovery Patterns

Fallback Values

Use conditions to provide fallback values:

{
    "tasks": [
        {
            "id": "try_primary",
            "continue_on_error": true,
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.result", "logic": {"var": "data.primary"}}
                    ]
                }
            }
        },
        {
            "id": "use_fallback",
            "condition": {"!": {"var": "temp_data.result"}},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": "default_value"}
                    ]
                }
            }
        }
    ]
}

Validation Before Processing

Validate data before critical operations:

{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"condition": {"!!": {"var": "data.required_field"}}, "error_message": "Required field missing"}
                    ]
                }
            }
        },
        {
            "id": "process",
            "function": { ... }
        }
    ]
}

If validation fails, the workflow stops before processing.

Try It

Notice the validation error is recorded but processing continues.

Best Practices

  1. Validate Early

    • Add validation tasks at the start of workflows
    • Fail fast on invalid data
  2. Use continue_on_error Wisely

    • Only for truly optional operations
    • Critical operations should stop on error
  3. Check Errors

    • Always check message.errors after processing
    • Log errors for monitoring
  4. Provide Context

    • Include meaningful error messages
    • Include field paths in validation errors

Built-in Functions Overview

Dataflow-rs comes with built-in functions for common data processing tasks.

Available Functions

FunctionPurposeModifies Data
mapData transformation and field mappingYes
validationRule-based data validationNo (read-only)

Common Patterns

Data Transformation Pipeline

{
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [
                        {"logic": {"!!": {"var": "data.email"}}, "message": "Email required"}
                    ]
                }
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.processed", "logic": true}
                    ]
                }
            }
        }
    ]
}

Conditional Transformation

{
    "tasks": [
        {
            "id": "conditional_map",
            "condition": {"==": [{"var": "metadata.type"}, "premium"]},
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.discount", "logic": 20}
                    ]
                }
            }
        }
    ]
}

Function Configuration

All functions use this structure:

{
    "function": {
        "name": "function_name",
        "input": {
            // Function-specific configuration
        }
    }
}

Custom Functions

For operations beyond built-in functions, implement the AsyncFunctionHandler trait. See Custom Functions.

Learn More

Map Function

The map function transforms and reorganizes data using JSONLogic expressions.

Overview

The map function:

  • Evaluates JSONLogic expressions against message context
  • Assigns results to specified paths
  • Supports nested path creation
  • Tracks changes for audit trail

Basic Usage

{
    "function": {
        "name": "map",
        "input": {
            "mappings": [
                {
                    "path": "data.full_name",
                    "logic": {"cat": [{"var": "data.first_name"}, " ", {"var": "data.last_name"}]}
                }
            ]
        }
    }
}

Configuration

FieldTypeRequiredDescription
mappingsarrayYesList of mapping operations

Mapping Object

FieldTypeRequiredDescription
pathstringYesTarget path (e.g., “data.user.name”)
logicJSONLogicYesExpression to evaluate

Path Syntax

Dot Notation

Access and create nested structures:

{"path": "data.user.profile.name", "logic": "John"}

Creates: {"data": {"user": {"profile": {"name": "John"}}}}

Numeric Field Names

Use # prefix for numeric keys:

{"path": "data.items.#0", "logic": "first item"}

Creates: {"data": {"items": {"0": "first item"}}}

Root Field Assignment

Assigning to root fields (data, metadata, temp_data) merges objects:

{"path": "data", "logic": {"new_field": "value"}}

Merges into existing data rather than replacing it.

JSONLogic Expressions

Copy Value

{"path": "data.copy", "logic": {"var": "data.original"}}

Static Value

{"path": "data.status", "logic": "active"}

String Concatenation

{
    "path": "data.greeting",
    "logic": {"cat": ["Hello, ", {"var": "data.name"}, "!"]}
}

Conditional Value

{
    "path": "data.tier",
    "logic": {"if": [
        {">=": [{"var": "data.points"}, 1000]}, "gold",
        {">=": [{"var": "data.points"}, 500]}, "silver",
        "bronze"
    ]}
}

Arithmetic

{
    "path": "data.total",
    "logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}

Array Operations

{
    "path": "data.count",
    "logic": {"reduce": [
        {"var": "data.items"},
        {"+": [{"var": "accumulator"}, 1]},
        0
    ]}
}

Null Handling

If a JSONLogic expression evaluates to null, the mapping is skipped:

// If data.optional doesn't exist, this mapping is skipped
{"path": "data.copy", "logic": {"var": "data.optional"}}

Sequential Mappings

Mappings execute in order, allowing later mappings to use earlier results:

{
    "mappings": [
        {
            "path": "temp_data.full_name",
            "logic": {"cat": [{"var": "data.first"}, " ", {"var": "data.last"}]}
        },
        {
            "path": "data.greeting",
            "logic": {"cat": ["Hello, ", {"var": "temp_data.full_name"}]}
        }
    ]
}

Try It

Common Patterns

Copy Between Contexts

// Copy from data to metadata
{"path": "metadata.user_id", "logic": {"var": "data.id"}}

// Copy from data to temp_data
{"path": "temp_data.original", "logic": {"var": "data.value"}}

Default Values

{
    "path": "data.name",
    "logic": {"if": [
        {"!!": {"var": "data.name"}},
        {"var": "data.name"},
        "Unknown"
    ]}
}

Computed Fields

{
    "path": "data.subtotal",
    "logic": {"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
}

Best Practices

  1. Use temp_data - Store intermediate results in temp_data
  2. Order Matters - Place dependencies before dependent mappings
  3. Check for Null - Handle missing fields with if or !! checks
  4. Merge Root Fields - Use root assignment to merge, not replace

Validation Function

The validation function evaluates rules against message data and collects validation errors.

Overview

The validation function:

  • Evaluates JSONLogic rules against message context
  • Collects errors for failed validations
  • Is read-only (doesn’t modify message data)
  • Returns status 200 (pass) or 400 (fail)

Basic Usage

{
    "function": {
        "name": "validation",
        "input": {
            "rules": [
                {
                    "logic": {"!!": {"var": "data.email"}},
                    "message": "Email is required"
                },
                {
                    "logic": {">": [{"var": "data.age"}, 0]},
                    "message": "Age must be positive"
                }
            ]
        }
    }
}

Configuration

FieldTypeRequiredDescription
rulesarrayYesList of validation rules

Rule Object

FieldTypeRequiredDescription
logicJSONLogicYesExpression that must evaluate to true
messagestringNoError message (default: “Validation failed”)

How Validation Works

  1. Each rule’s logic is evaluated against the message context
  2. If the result is exactly true, the rule passes
  3. Any other result (false, null, etc.) is a failure
  4. Failed rules add errors to message.errors

Common Validation Patterns

Required Field

{
    "logic": {"!!": {"var": "data.email"}},
    "message": "Email is required"
}

Numeric Range

{
    "logic": {"and": [
        {">=": [{"var": "data.age"}, 18]},
        {"<=": [{"var": "data.age"}, 120]}
    ]},
    "message": "Age must be between 18 and 120"
}

String Length

{
    "logic": {">=": [
        {"strlen": {"var": "data.password"}},
        8
    ]},
    "message": "Password must be at least 8 characters"
}

Pattern Matching (with Regex)

{
    "logic": {"regex_match": [
        {"var": "data.email"},
        "^[^@]+@[^@]+\\.[^@]+$"
    ]},
    "message": "Invalid email format"
}

Conditional Required

{
    "logic": {"or": [
        {"!": {"var": "data.is_business"}},
        {"!!": {"var": "data.company_name"}}
    ]},
    "message": "Company name required for business accounts"
}

Value in List

{
    "logic": {"in": [
        {"var": "data.status"},
        ["active", "pending", "suspended"]
    ]},
    "message": "Invalid status value"
}

Multiple Rules

All rules are evaluated, collecting all errors:

{
    "rules": [
        {
            "logic": {"!!": {"var": "data.name"}},
            "message": "Name is required"
        },
        {
            "logic": {"!!": {"var": "data.email"}},
            "message": "Email is required"
        },
        {
            "logic": {">": [{"var": "data.amount"}, 0]},
            "message": "Amount must be positive"
        }
    ]
}

Accessing Errors

After processing, check message.errors:

#![allow(unused)]
fn main() {
for error in &message.errors {
    println!("{}: {}", error.code, error.message);
}
}

Error structure:

  • code: “VALIDATION_ERROR” for failed rules
  • message: The error message from the rule

Try It

Notice the validation errors in the output.

Validation with Continue on Error

Combine validation with data transformation:

{
    "id": "validated_transform",
    "continue_on_error": true,
    "tasks": [
        {
            "id": "validate",
            "function": {
                "name": "validation",
                "input": {
                    "rules": [...]
                }
            }
        },
        {
            "id": "transform",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [...]
                }
            }
        }
    ]
}

With continue_on_error: true, transformation proceeds even if validation fails.

Stop on Validation Failure

For strict validation (stop on failure):

{
    "continue_on_error": false,
    "tasks": [
        {
            "id": "validate",
            "function": {"name": "validation", "input": {...}}
        },
        {
            "id": "process",
            "function": {"name": "map", "input": {...}}
        }
    ]
}

If validation fails, subsequent tasks are skipped.

Best Practices

  1. Validate Early - Add validation as the first task
  2. Clear Messages - Write specific, actionable error messages
  3. Check All Rules - Validation evaluates all rules (doesn’t short-circuit)
  4. Use continue_on_error - Decide if processing should continue on failure
  5. Handle Errors - Always check message.errors after processing

Custom Functions

Extend dataflow-rs with your own custom processing logic by implementing the AsyncFunctionHandler trait.

Overview

Custom functions allow you to:

  • Add domain-specific processing logic
  • Integrate with external systems
  • Perform async operations (HTTP, database, etc.)
  • Implement complex transformations

Implementing AsyncFunctionHandler

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
    AsyncFunctionHandler,
    FunctionConfig,
    error::Result,
    message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;

pub struct MyCustomFunction;

#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        // Your custom logic here

        // Access input configuration
        let input = &config.input;

        // Modify message data
        let old_value = message.context["data"]["processed"].clone();
        message.context["data"]["processed"] = json!(true);

        // Track changes for audit trail
        let changes = vec![Change {
            path: Arc::from("data.processed"),
            old_value: Arc::new(old_value),
            new_value: Arc::new(json!(true)),
        }];

        // Return status code and changes
        // 200 = success, 400 = validation error, 500 = execution error
        Ok((200, changes))
    }
}
}

Registering Custom Functions

#![allow(unused)]
fn main() {
use std::collections::HashMap;
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::AsyncFunctionHandler;

// Create custom functions map
let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> =
    HashMap::new();

// Register your function
custom_functions.insert(
    "my_custom_function".to_string(),
    Box::new(MyCustomFunction)
);

// Create engine with custom functions
let engine = Engine::new(workflows, Some(custom_functions));
}

Using Custom Functions in Workflows

{
    "id": "custom_workflow",
    "tasks": [
        {
            "id": "custom_task",
            "function": {
                "name": "my_custom_function",
                "input": {
                    "option1": "value1",
                    "option2": 42
                }
            }
        }
    ]
}

Accessing Configuration

The config.input contains the function’s input configuration:

#![allow(unused)]
fn main() {
async fn execute(
    &self,
    message: &mut Message,
    config: &FunctionConfig,
    datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
    // Access input parameters
    let option1 = config.input
        .get("option1")
        .and_then(Value::as_str)
        .unwrap_or("default");

    let option2 = config.input
        .get("option2")
        .and_then(Value::as_i64)
        .unwrap_or(0);

    // Use the parameters...
    Ok((200, vec![]))
}
}

Using DataLogic

Evaluate JSONLogic expressions using the provided datalogic instance:

#![allow(unused)]
fn main() {
async fn execute(
    &self,
    message: &mut Message,
    config: &FunctionConfig,
    datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
    // Get the context for evaluation
    let context_arc = message.get_context_arc();

    // Compile and evaluate a JSONLogic expression
    let logic = json!({"var": "data.input"});
    let compiled = datalogic.compile(&logic)?;
    let result = datalogic.evaluate(&compiled, context_arc)?;

    // Use the result...
    Ok((200, vec![]))
}
}

Async Operations

Custom functions support async/await for I/O operations:

#![allow(unused)]
fn main() {
use reqwest;

pub struct HttpFetchFunction;

#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        _datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        let url = config.input
            .get("url")
            .and_then(Value::as_str)
            .ok_or_else(|| DataflowError::Validation("Missing url".to_string()))?;

        // Make HTTP request
        let response = reqwest::get(url)
            .await
            .map_err(|e| DataflowError::ExecutionError(e.to_string()))?;

        let data: Value = response.json()
            .await
            .map_err(|e| DataflowError::ExecutionError(e.to_string()))?;

        // Store result
        let old_value = message.context["data"]["fetched"].clone();
        message.context["data"]["fetched"] = data.clone();

        let changes = vec![Change {
            path: Arc::from("data.fetched"),
            old_value: Arc::new(old_value),
            new_value: Arc::new(data),
        }];

        Ok((200, changes))
    }
}
}

Error Handling

Return appropriate errors for different failure modes:

#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;

async fn execute(&self, ...) -> Result<(usize, Vec<Change>)> {
    // Validation error
    if some_validation_fails {
        return Err(DataflowError::Validation("Invalid input".to_string()));
    }

    // Execution error
    if some_operation_fails {
        return Err(DataflowError::ExecutionError("Operation failed".to_string()));
    }

    // Or return status codes without error
    // 400 for validation failure (errors added to message)
    // 500 for execution failure
    Ok((200, vec![]))
}
}

Complete Example

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
    AsyncFunctionHandler, FunctionConfig,
    error::{DataflowError, Result},
    message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;

/// Calculates statistics from numeric array data
pub struct StatisticsFunction;

#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        _datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)> {
        // Get the field to analyze
        let field = config.input
            .get("field")
            .and_then(Value::as_str)
            .ok_or_else(|| DataflowError::Validation(
                "Missing 'field' in config".to_string()
            ))?;

        // Get the array from message
        let data = message.context["data"]
            .get(field)
            .and_then(Value::as_array)
            .ok_or_else(|| DataflowError::Validation(
                format!("Field '{}' is not an array", field)
            ))?;

        // Calculate statistics
        let numbers: Vec<f64> = data.iter()
            .filter_map(|v| v.as_f64())
            .collect();

        if numbers.is_empty() {
            return Err(DataflowError::Validation(
                "No numeric values found".to_string()
            ));
        }

        let sum: f64 = numbers.iter().sum();
        let count = numbers.len() as f64;
        let mean = sum / count;
        let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
        let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);

        // Build result
        let stats = json!({
            "count": count,
            "sum": sum,
            "mean": mean,
            "min": min,
            "max": max
        });

        // Store in message
        let old_value = message.context["data"]["statistics"].clone();
        message.context["data"]["statistics"] = stats.clone();
        message.invalidate_context_cache();

        let changes = vec![Change {
            path: Arc::from("data.statistics"),
            old_value: Arc::new(old_value),
            new_value: Arc::new(stats),
        }];

        Ok((200, changes))
    }
}
}

Best Practices

  1. Track Changes - Record all modifications for audit trail
  2. Validate Input - Check configuration before processing
  3. Handle Errors - Return appropriate error types
  4. Invalidate Cache - Call message.invalidate_context_cache() after modifications
  5. Document - Add clear documentation for your function
  6. Test - Write unit tests for your custom functions

JSONLogic

Dataflow-rs uses JSONLogic for conditions and data transformations.

Overview

JSONLogic is a way to write rules as JSON. It’s used in dataflow-rs for:

  • Workflow Conditions - Control when workflows execute
  • Task Conditions - Control when tasks execute
  • Map Function - Transform and copy data

Basic Syntax

JSONLogic operations are objects with a single key (the operator) and value (the arguments):

{"operator": [argument1, argument2, ...]}

Data Access

var - Access Data

// Access top-level field
{"var": "data.name"}

// Access nested field
{"var": "data.user.profile.email"}

// Access array element
{"var": "data.items.0"}

// Default value if missing
{"var": ["data.optional", "default value"]}

Context Structure

In dataflow-rs, the context available to JSONLogic is:

{
    "data": { ... },
    "metadata": { ... },
    "temp_data": { ... }
}

Access fields with:

{"var": "data.field"}
{"var": "metadata.type"}
{"var": "temp_data.intermediate"}

Comparison Operators

Equality

{"==": [{"var": "data.status"}, "active"]}
{"===": [{"var": "data.count"}, 0]}  // Strict equality
{"!=": [{"var": "data.status"}, "deleted"]}
{"!==": [{"var": "data.count"}, null]}  // Strict inequality

Numeric Comparisons

{">": [{"var": "data.age"}, 18]}
{">=": [{"var": "data.score"}, 60]}
{"<": [{"var": "data.price"}, 100]}
{"<=": [{"var": "data.quantity"}, 10]}

Between

{"<=": [1, {"var": "data.x"}, 10]}  // 1 <= x <= 10
{"<": [1, {"var": "data.x"}, 10]}   // 1 < x < 10

Boolean Logic

and, or, not

{"and": [
    {">=": [{"var": "data.age"}, 18]},
    {"==": [{"var": "data.verified"}, true]}
]}

{"or": [
    {"==": [{"var": "data.status"}, "active"]},
    {"==": [{"var": "data.status"}, "pending"]}
]}

{"!": {"var": "data.disabled"}}

Truthy/Falsy

// Check if value is truthy (not null, false, 0, "")
{"!!": {"var": "data.email"}}

// Check if value is falsy
{"!": {"var": "data.deleted"}}

Conditionals

if-then-else

{"if": [
    {">=": [{"var": "data.score"}, 90]}, "A",
    {">=": [{"var": "data.score"}, 80]}, "B",
    {">=": [{"var": "data.score"}, 70]}, "C",
    "F"
]}

Ternary

{"if": [
    {"var": "data.premium"},
    "VIP Customer",
    "Standard Customer"
]}

String Operations

cat - Concatenation

{"cat": ["Hello, ", {"var": "data.name"}, "!"]}

substr - Substring

{"substr": [{"var": "data.text"}, 0, 10]}  // First 10 characters
{"substr": [{"var": "data.text"}, -5]}     // Last 5 characters

in - Contains

// Check if substring exists
{"in": ["@", {"var": "data.email"}]}

// Check if value in array
{"in": [{"var": "data.status"}, ["active", "pending"]]}

Numeric Operations

Arithmetic

{"+": [{"var": "data.a"}, {"var": "data.b"}]}
{"-": [{"var": "data.total"}, {"var": "data.discount"}]}
{"*": [{"var": "data.price"}, {"var": "data.quantity"}]}
{"/": [{"var": "data.total"}, {"var": "data.count"}]}
{"%": [{"var": "data.n"}, 2]}  // Modulo

Min/Max

{"min": [{"var": "data.a"}, {"var": "data.b"}, 100]}
{"max": [{"var": "data.x"}, 0]}  // Ensure non-negative

Array Operations

merge - Combine Arrays

{"merge": [
    {"var": "data.list1"},
    {"var": "data.list2"}
]}

map - Transform Array

{"map": [
    {"var": "data.items"},
    {"*": [{"var": ""}, 2]}  // Double each item
]}

filter - Filter Array

{"filter": [
    {"var": "data.items"},
    {">=": [{"var": ""}, 10]}  // Items >= 10
]}

reduce - Aggregate Array

{"reduce": [
    {"var": "data.items"},
    {"+": [{"var": "accumulator"}, {"var": "current"}]},
    0  // Initial value
]}

all/some/none

{"all": [{"var": "data.items"}, {">=": [{"var": ""}, 0]}]}
{"some": [{"var": "data.items"}, {"==": [{"var": ""}, "special"]}]}
{"none": [{"var": "data.items"}, {"<": [{"var": ""}, 0]}]}

Try It

Common Patterns

Safe Field Access

// Default to empty string
{"var": ["data.optional", ""]}

// Check existence first
{"if": [
    {"!!": {"var": "data.optional"}},
    {"var": "data.optional"},
    "default"
]}

Null Coalescing

{"if": [
    {"!!": {"var": "data.primary"}},
    {"var": "data.primary"},
    {"var": "data.fallback"}
]}

Type Checking

// Check if string
{"===": [{"typeof": {"var": "data.field"}}, "string"]}

// Check if array
{"===": [{"typeof": {"var": "data.items"}}, "array"]}

Best Practices

  1. Use var Defaults - Provide defaults for optional fields
  2. Check Existence - Use !! to verify field exists before use
  3. Keep It Simple - Complex logic may be better in custom functions
  4. Test Expressions - Use the playground to test JSONLogic before deploying

Audit Trails

Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.

Overview

Every change to message data is recorded in the audit trail:

  • What changed - Path and values (old and new)
  • When it changed - Timestamp
  • Which task - Workflow and task identifiers

Audit Trail Structure

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub task_id: String,
    pub workflow_id: String,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: Arc<Value>,
    pub new_value: Arc<Value>,
}
}

Accessing the Audit Trail

After processing, the audit trail is available on the message:

#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;

for entry in &message.audit_trail {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    println!("Timestamp: {}", entry.timestamp);

    for change in &entry.changes {
        println!("  Path: {}", change.path);
        println!("  Old: {}", change.old_value);
        println!("  New: {}", change.new_value);
    }
}
}

JSON Representation

In the playground output, the audit trail appears as:

{
    "audit_trail": [
        {
            "task_id": "transform_data",
            "workflow_id": "my_workflow",
            "timestamp": "2024-01-01T12:00:00Z",
            "changes": [
                {
                    "path": "data.full_name",
                    "old_value": null,
                    "new_value": "John Doe"
                },
                {
                    "path": "data.greeting",
                    "old_value": null,
                    "new_value": "Hello, John Doe!"
                }
            ]
        }
    ]
}

What Gets Tracked

Map Function

Every mapping that modifies data creates a change entry:

{
    "mappings": [
        {"path": "data.name", "logic": "John"}
    ]
}

Creates:

{
    "path": "data.name",
    "old_value": null,
    "new_value": "John"
}

Custom Functions

Custom functions should track changes for proper auditing:

#![allow(unused)]
fn main() {
let changes = vec![Change {
    path: Arc::from("data.processed"),
    old_value: Arc::new(old_value),
    new_value: Arc::new(new_value),
}];

Ok((200, changes))
}

Validation Function

Validation is read-only, so it produces no audit trail entries.

Try It

Notice the audit trail shows each step’s changes.

Use Cases

Debugging

Trace exactly how data was transformed:

#![allow(unused)]
fn main() {
// Find where a value was set
for entry in &message.audit_trail {
    for change in &entry.changes {
        if change.path.as_ref() == "data.total" {
            println!("data.total set by {}/{}",
                entry.workflow_id, entry.task_id);
            println!("Changed from {} to {}",
                change.old_value, change.new_value);
        }
    }
}
}

Compliance

Log all changes for regulatory compliance:

#![allow(unused)]
fn main() {
for entry in &message.audit_trail {
    log_to_audit_system(
        entry.timestamp,
        entry.workflow_id.clone(),
        entry.task_id.clone(),
        &entry.changes
    );
}
}

Change Detection

Detect if specific fields were modified:

#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
    message.audit_trail.iter()
        .flat_map(|e| e.changes.iter())
        .any(|c| c.path.as_ref() == field)
}

if was_field_modified(&message, "data.price") {
    // Price was changed during processing
}
}

Rollback (Conceptual)

The audit trail can be used to implement rollback:

#![allow(unused)]
fn main() {
fn get_original_value(message: &Message, field: &str) -> Option<&Value> {
    message.audit_trail.iter()
        .flat_map(|e| e.changes.iter())
        .find(|c| c.path.as_ref() == field)
        .map(|c| c.old_value.as_ref())
}
}

Best Practices

  1. Track All Changes - Custom functions should record all modifications
  2. Use Arc - Use Arc<str> and Arc<Value> for efficient sharing
  3. Timestamp Accuracy - Timestamps are UTC for consistency
  4. Check Audit Trail - Review audit trail during development
  5. Log for Production - Persist audit trails for production debugging

Performance

Dataflow-rs is designed for high-performance data processing with minimal overhead.

Architecture for Performance

Pre-compilation

All JSONLogic expressions are compiled once at engine startup:

#![allow(unused)]
fn main() {
// This compiles all logic at creation time
let engine = Engine::new(workflows, None);

// Runtime processing uses pre-compiled logic
// No parsing or compilation overhead
engine.process_message(&mut message).await?;
}

Benefits of Pre-compilation

  • Zero runtime parsing - No JSON parsing during message processing
  • Cached compiled logic - O(1) access to compiled expressions
  • Early validation - Invalid expressions caught at startup
  • Consistent latency - Predictable performance per message

Memory Efficiency

  • Arc-wrapped compiled logic - Shared without copying
  • Immutable workflows - Safe concurrent access
  • Context caching - Avoids repeated JSON cloning

Benchmarking

Run the included benchmark:

cargo run --example benchmark --release

Sample Benchmark

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;

// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::new(vec![workflow], None);

// Benchmark
let iterations = 10_000;
let start = Instant::now();

for _ in 0..iterations {
    let mut message = Message::new(&test_data);
    engine.process_message(&mut message).await?;
}

let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
}

Optimization Tips

1. Minimize Mappings

Combine related transformations:

// Less efficient: Multiple mappings
{
    "mappings": [
        {"path": "data.a", "logic": {"var": "data.source.a"}},
        {"path": "data.b", "logic": {"var": "data.source.b"}},
        {"path": "data.c", "logic": {"var": "data.source.c"}}
    ]
}

// More efficient: Single object mapping when possible
{
    "mappings": [
        {"path": "data", "logic": {"var": "data.source"}}
    ]
}

2. Use Conditions Wisely

Skip unnecessary processing with conditions:

{
    "id": "expensive_task",
    "condition": {"==": [{"var": "metadata.needs_processing"}, true]},
    "function": { ... }
}

3. Order Workflows by Frequency

Put frequently-executed workflows earlier (lower priority):

{"id": "common_workflow", "priority": 1, ...}
{"id": "rare_workflow", "priority": 100, ...}

4. Use temp_data

Store intermediate results to avoid recomputation:

{
    "mappings": [
        {
            "path": "temp_data.computed",
            "logic": {"expensive": "computation"}
        },
        {
            "path": "data.result1",
            "logic": {"var": "temp_data.computed"}
        },
        {
            "path": "data.result2",
            "logic": {"var": "temp_data.computed"}
        }
    ]
}

5. Avoid Unnecessary Validation

Validate only what’s necessary:

// Validate at system boundaries
{
    "id": "input_validation",
    "condition": {"==": [{"var": "metadata.source"}, "external"]},
    "tasks": [
        {"id": "validate", "function": {"name": "validation", ...}}
    ]
}

Concurrent Processing

Process multiple messages concurrently:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::new(workflows, None));

let handles: Vec<_> = messages.into_iter()
    .map(|mut msg| {
        let engine = Arc::clone(&engine);
        task::spawn(async move {
            engine.process_message(&mut msg).await
        })
    })
    .collect();

// Wait for all
for handle in handles {
    handle.await??;
}
}

Thread Safety

  • Engine is Send + Sync
  • Compiled logic shared via Arc
  • Each message processed independently

Memory Considerations

Large Messages

For very large messages, consider:

  1. Streaming - Process chunks instead of entire payload
  2. Selective Loading - Load only needed fields
  3. Cleanup temp_data - Clear intermediate results when done

Many Workflows

For many workflows:

  1. Organize by Domain - Group related workflows
  2. Use Conditions - Skip irrelevant workflows early
  3. Profile - Identify bottleneck workflows

Profiling

Enable Logging

#![allow(unused)]
fn main() {
env_logger::Builder::from_env(
    env_logger::Env::default().default_filter_or("debug")
).init();
}

Custom Metrics

#![allow(unused)]
fn main() {
use std::time::Instant;

let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();

metrics::histogram!("dataflow.processing_time", duration);
}

Production Recommendations

  1. Build with –release - Debug builds are significantly slower
  2. Pre-warm - Process a few messages at startup to warm caches
  3. Monitor - Track processing times and error rates
  4. Profile - Identify slow workflows in production
  5. Scale Horizontally - Engine is stateless, scale with instances

API Reference

Quick reference for the main dataflow-rs types and methods.

Engine

The central component that processes messages through workflows.

#![allow(unused)]
fn main() {
use dataflow_rs::Engine;
}

Constructor

#![allow(unused)]
fn main() {
pub fn new(
    workflows: Vec<Workflow>,
    custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>>>
) -> Engine
}

Creates a new engine with the given workflows. All JSONLogic is compiled at creation.

Methods

#![allow(unused)]
fn main() {
// Process a message through all matching workflows
pub async fn process_message(&self, message: &mut Message) -> Result<()>

// Get registered workflows
pub fn workflows(&self) -> &HashMap<String, Workflow>
}

Workflow

A collection of tasks with optional conditions and priority.

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;
}

Constructors

#![allow(unused)]
fn main() {
// Parse from JSON string
pub fn from_json(json: &str) -> Result<Workflow>

// Load from file
pub fn from_file(path: &str) -> Result<Workflow>
}

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "priority": "number (optional, default: 0)",
    "condition": "JSONLogic (optional)",
    "continue_on_error": "boolean (optional, default: false)",
    "tasks": "array of Task (required)"
}

Task

An individual processing unit within a workflow.

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "condition": "JSONLogic (optional)",
    "continue_on_error": "boolean (optional)",
    "function": {
        "name": "string (required)",
        "input": "object (required)"
    }
}

Message

The data container that flows through workflows.

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
}

Constructors

#![allow(unused)]
fn main() {
// Create from data (goes to context.data)
pub fn new(data: &Value) -> Message

// Create from full context value
pub fn from_value(context: &Value) -> Message
}

Fields

#![allow(unused)]
fn main() {
pub struct Message {
    pub id: Uuid,
    pub payload: Arc<Value>,
    pub context: Value,  // Contains data, metadata, temp_data
    pub audit_trail: Vec<AuditTrail>,
    pub errors: Vec<ErrorInfo>,
}
}

Methods

#![allow(unused)]
fn main() {
// Get context as Arc for efficient sharing
pub fn get_context_arc(&mut self) -> Arc<Value>

// Invalidate context cache after modifications
pub fn invalidate_context_cache(&mut self)
}

AsyncFunctionHandler

Trait for implementing custom functions.

#![allow(unused)]
fn main() {
use dataflow_rs::engine::AsyncFunctionHandler;
}

Trait Definition

#![allow(unused)]
fn main() {
#[async_trait]
pub trait AsyncFunctionHandler: Send + Sync {
    async fn execute(
        &self,
        message: &mut Message,
        config: &FunctionConfig,
        datalogic: Arc<DataLogic>,
    ) -> Result<(usize, Vec<Change>)>;
}
}

Return Value

  • usize - Status code (200 = success, 400 = validation error, 500 = execution error)
  • Vec<Change> - List of changes for audit trail

FunctionConfig

Configuration for function execution.

#![allow(unused)]
fn main() {
pub struct FunctionConfig {
    pub name: String,
    pub input: Value,
}
}

Change

Represents a single data modification.

#![allow(unused)]
fn main() {
pub struct Change {
    pub path: Arc<str>,
    pub old_value: Arc<Value>,
    pub new_value: Arc<Value>,
}
}

AuditTrail

Records changes made by a task.

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub task_id: String,
    pub workflow_id: String,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
}
}

ErrorInfo

Error information recorded in the message.

#![allow(unused)]
fn main() {
pub struct ErrorInfo {
    pub code: String,
    pub message: String,
    pub workflow_id: Option<String>,
    pub task_id: Option<String>,
}
}

DataflowError

Main error type for the library.

#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
}

Variants

#![allow(unused)]
fn main() {
pub enum DataflowError {
    Validation(String),
    Execution(String),
    Logic(String),
    Io(String),
    // ... other variants
}
}

Built-in Functions

map

Data transformation using JSONLogic.

{
    "name": "map",
    "input": {
        "mappings": [
            {
                "path": "string",
                "logic": "JSONLogic expression"
            }
        ]
    }
}

validation

Rule-based data validation.

{
    "name": "validation",
    "input": {
        "rules": [
            {
                "logic": "JSONLogic expression",
                "message": "string"
            }
        ]
    }
}

WASM API (dataflow-wasm)

For browser/JavaScript usage.

import init, { WasmEngine, create_message } from 'dataflow-wasm';

// Initialize
await init();

// Create engine
const engine = new WasmEngine(workflowsJson);

// Create message
const message = create_message(dataJson, metadataJson);

// Process (returns Promise)
const result = await engine.process(message);

// Get workflow info
const count = engine.workflow_count();
const ids = engine.workflow_ids();

Full API Documentation

For complete API documentation, run:

cargo doc --open

This generates detailed documentation from the source code comments.