Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

API Reference

Quick reference for the main dataflow-rs types and methods.

Type Aliases

Dataflow-rs provides rules-engine aliases alongside the original workflow terminology:

Rules EngineWorkflow EngineImport
RulesEngineEngineuse dataflow_rs::RulesEngine;
RuleWorkflowuse dataflow_rs::Rule;
ActionTaskuse dataflow_rs::Action;

Both names refer to the same types — use whichever fits your mental model.

Engine (RulesEngine)

The central component that evaluates rules and processes messages.

#![allow(unused)]
fn main() {
use dataflow_rs::Engine;  // or: use dataflow_rs::RulesEngine;
}

Constructors

#![allow(unused)]
fn main() {
// Recommended path — fluent builder.
pub fn builder() -> EngineBuilder

// Lower-level entry. Use HashMap::new() for the no-handler case.
pub fn new(
    workflows: Vec<Workflow>,
    custom_functions: HashMap<String, BoxedFunctionHandler>,
) -> Result<Engine>
}

EngineBuilder (#[must_use]) chains .register("name", handler), .register_boxed(name, boxed), .with_workflow(w), .with_workflows(iter), then .build() -> Result<Engine>. All JSONLogic is compiled and Custom inputs are pre-parsed into their typed Self::Input at .build() — config-shape errors fail there, not on first message.

Methods

#![allow(unused)]
fn main() {
// Process a message through all matching rules
pub async fn process_message(&self, message: &mut Message) -> Result<()>

// Process with execution trace for debugging
pub async fn process_message_with_trace(&self, message: &mut Message) -> Result<ExecutionTrace>

// Process only workflows on a specific channel (O(1) lookup)
pub async fn process_message_for_channel(&self, channel: &str, message: &mut Message) -> Result<()>

// Channel routing with execution trace
pub async fn process_message_for_channel_with_trace(&self, channel: &str, message: &mut Message) -> Result<ExecutionTrace>

// Get registered rules (sorted by priority)
pub fn workflows(&self) -> &Arc<Vec<Workflow>>

// Find a workflow by ID
pub fn workflow_by_id(&self, id: &str) -> Option<&Workflow>

// Create a new engine with different workflows, preserving custom functions
pub fn with_new_workflows(&self, workflows: Vec<Workflow>) -> Self
}

Workflow (Rule)

A collection of actions with optional conditions and priority.

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;  // or: use dataflow_rs::Rule;
}

Constructors

#![allow(unused)]
fn main() {
// Parse from JSON string
pub fn from_json(json: &str) -> Result<Workflow>

// Load from file
pub fn from_file(path: &str) -> Result<Workflow>

// Convenience constructor for rules-engine pattern
pub fn rule(id: &str, name: &str, condition: Value, tasks: Vec<Task>) -> Self
}

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "priority": "number (optional, default: 0)",
    "condition": "JSONLogic (optional, evaluated against full context)",
    "continue_on_error": "boolean (optional, default: false)",
    "tasks": "array of Task (required)",
    "channel": "string (optional, default: 'default')",
    "version": "number (optional, default: 1)",
    "status": "'active' | 'paused' | 'archived' (optional, default: 'active')",
    "tags": "array of string (optional, default: [])",
    "created_at": "ISO 8601 datetime (optional)",
    "updated_at": "ISO 8601 datetime (optional)"
}

Task (Action)

An individual processing unit within a rule.

#![allow(unused)]
fn main() {
use dataflow_rs::Task;  // or: use dataflow_rs::Action;
}

Constructor

#![allow(unused)]
fn main() {
// Convenience constructor for rules-engine pattern
pub fn action(id: &str, name: &str, function: FunctionConfig) -> Self
}

JSON Schema

{
    "id": "string (required)",
    "name": "string (optional)",
    "condition": "JSONLogic (optional, evaluated against full context)",
    "continue_on_error": "boolean (optional)",
    "function": {
        "name": "string (required)",
        "input": "object (required)"
    }
}

Message

The data container that flows through rules. The context tree is held as datavalue::OwnedDataValue (not serde_json::Value) so the JSONLogic evaluator can borrow it into its arena without a serde_json round-trip.

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use datavalue::OwnedDataValue;
use std::sync::Arc;
}

Constructors

#![allow(unused)]
fn main() {
// Fluent builder — recommended path for richer cases (custom id,
// capture_changes off, etc.).
pub fn builder() -> MessageBuilder

// Native zero-conversion entry point — perf path.
pub fn new(payload: Arc<OwnedDataValue>) -> Message

// Convenience: bridge from a serde_json::Value payload.
pub fn from_value(payload: &serde_json::Value) -> Message
}

MessageBuilder (#[must_use]) chains .id(...), .payload(Arc<OwnedDataValue>) / .payload_json(&serde_json::Value), .capture_changes(bool), then .build() -> Message.

Structure

#![allow(unused)]
fn main() {
pub struct Message {
    pub context: OwnedDataValue,    // Always Object {data, metadata, temp_data}
    // ... encapsulated fields ...
}
}

context is the only pub field — it’s the legitimate read surface (tests do message.context["data"]["x"] lookups). Every other field is read via accessors and mutated via add_error (errors) or TaskContext::set (context) so audit-trail changes are recorded.

Methods

#![allow(unused)]
fn main() {
// Identity + payload
pub fn id(&self) -> &str
pub fn payload(&self) -> &OwnedDataValue
pub fn payload_arc(&self) -> &Arc<OwnedDataValue>

// Context accessors
pub fn data(&self) -> &OwnedDataValue
pub fn metadata(&self) -> &OwnedDataValue
pub fn temp_data(&self) -> &OwnedDataValue

// Error + audit views
pub fn errors(&self) -> &[ErrorInfo]
pub fn audit_trail(&self) -> &[AuditTrail]
pub fn capture_changes(&self) -> bool

// Mutation (constructive)
pub fn add_error(&mut self, error: ErrorInfo)

// Predicates
pub fn has_errors(&self) -> bool
}

Inside a custom AsyncFunctionHandler, mutate the context via TaskContext::set — it records audit-trail changes automatically.

AsyncFunctionHandler

Trait for implementing custom action handlers. See Custom Functions for the full walk-through.

#![allow(unused)]
fn main() {
use dataflow_rs::prelude::*;
}

Trait Definition

#![allow(unused)]
fn main() {
use serde::de::DeserializeOwned;

#[async_trait]
pub trait AsyncFunctionHandler: Send + Sync + 'static {
    /// Typed configuration shape for this handler. Use
    /// `serde_json::Value` for freeform JSON.
    type Input: DeserializeOwned + Send + Sync + 'static;

    /// Parse the raw `FunctionConfig::Custom { input }` JSON into
    /// `Self::Input`. Default impl uses `serde_json::from_value`;
    /// override only for custom validation beyond what serde provides.
    fn parse_input(input: &serde_json::Value) -> Result<Self::Input> { ... }

    /// Execute the handler.
    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &Self::Input,
    ) -> Result<TaskOutcome>;
}
}

The engine pre-parses each FunctionConfig::Custom { input } JSON into the registered handler’s typed Self::Input at Engine::builder().build() (or Engine::new) — config-shape errors fail there, not on first message.

Boxing

#![allow(unused)]
fn main() {
pub type BoxedFunctionHandler = Box<dyn DynAsyncFunctionHandler + Send + Sync>;
}

Stored in the engine’s registry. Users construct these via Box::new(handler) (or via Engine::builder().register("name", handler)) — the dyn-trait plumbing stays out of user code.

TaskContext

Per-call context handed to every AsyncFunctionHandler::execute call.

#![allow(unused)]
fn main() {
pub struct TaskContext<'a> { /* ... */ }

impl<'a> TaskContext<'a> {
    // Read accessors
    pub fn message(&self) -> &Message
    pub fn message_mut(&mut self) -> &mut Message
    pub fn datalogic(&self) -> &Arc<datalogic_rs::Engine>
    pub fn data(&self) -> &OwnedDataValue
    pub fn metadata(&self) -> &OwnedDataValue
    pub fn temp_data(&self) -> &OwnedDataValue
    pub fn get(&self, path: &str) -> Option<&OwnedDataValue>

    // Audit-trail-aware mutation
    pub fn set(&mut self, path: &str, value: OwnedDataValue)
    pub fn set_json(&mut self, path: &str, value: &serde_json::Value)
    pub fn add_error(&mut self, error: ErrorInfo)
}
}

set records a Change on the audit trail when message.capture_changes is true, then writes through set_nested_value (auto-creates intermediate objects/arrays, handles #-prefix escapes).

TaskOutcome

Return value of every handler:

#![allow(unused)]
fn main() {
pub enum TaskOutcome {
    Success,         // audit status 200, continue
    Status(u16),     // audit status = code; 5xx pushes TASK_STATUS_ERROR
    Skip,            // no audit entry, continue
    Halt,            // audit status 299 (HALT_STATUS_CODE), stop workflow
}
}

FunctionConfig

FunctionConfig is an enum: every built-in is a typed variant, and unknown function names deserialize into Custom { name, input }. Custom handlers typically destructure the Custom variant to access their config.

#![allow(unused)]
fn main() {
pub enum FunctionConfig {
    Map { input: MapConfig, .. },
    Validation { input: ValidationConfig, .. },
    ParseJson { input: ParseConfig, .. },
    ParseXml { input: ParseConfig, .. },
    PublishJson { input: PublishConfig, .. },
    PublishXml { input: PublishConfig, .. },
    Filter { input: FilterConfig, .. },
    Log { input: LogConfig, .. },
    HttpCall { input: HttpCallConfig, .. },
    Enrich { input: EnrichConfig, .. },
    PublishKafka { input: PublishKafkaConfig, .. },
    Custom {
        name: String,
        input: serde_json::Value,
        // #[serde(skip)] — populated by the engine at .build() with the
        // typed Self::Input for the registered handler.
        compiled_input: Option<CompiledCustomInput>,
    },
}
}

Change

Represents a single data modification recorded in the audit trail.

#![allow(unused)]
fn main() {
pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,
    pub new_value: OwnedDataValue,
}
}

old_value and new_value are owned (not Arc<OwnedDataValue>) — one less heap allocation per recorded mutation. Wrap them yourself if you need to share a Change across threads.

AuditTrail

Records changes made by an action. workflow_id / task_id are Arc<str> mirrors of the workflow/task ids — the engine clones them by refcount bump rather than allocating per audit entry.

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}
}

ErrorInfo

Error information recorded in the message.

#![allow(unused)]
fn main() {
pub struct ErrorInfo {
    pub code: String,
    pub message: String,
    pub path: Option<String>,
    pub workflow_id: Option<String>,
    pub task_id: Option<String>,
    pub timestamp: Option<String>,
    pub retry_attempted: Option<bool>,
    pub retry_count: Option<u32>,
}
}

DataflowError

Main error type for the library.

#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
}

Variants

#![allow(unused)]
fn main() {
pub enum DataflowError {
    Validation(String),
    FunctionExecution { context: String, source: Option<Box<DataflowError>> },
    Workflow(String),
    Task(String),
    FunctionNotFound(String),
    Deserialization(String),
    Io(String),
    LogicEvaluation(String),
    Http { status: u16, message: String },
    Timeout(String),
    Unknown(String),
}
}

DataflowError::retryable() returns true for transient infrastructure failures (5xx HTTP, 429, 408, timeouts, IO) and false for data/logic/ configuration errors.

WorkflowStatus

Lifecycle status for workflows.

#![allow(unused)]
fn main() {
use dataflow_rs::WorkflowStatus;
}

Variants

#![allow(unused)]
fn main() {
pub enum WorkflowStatus {
    Active,    // Default — workflow executes normally
    Paused,    // Excluded from channel routing
    Archived,  // Permanently retired
}
}

Built-in Functions

map

Data transformation using JSONLogic.

{
    "name": "map",
    "input": {
        "mappings": [
            {
                "path": "string",
                "logic": "JSONLogic expression"
            }
        ]
    }
}

validation

Rule-based data validation.

{
    "name": "validation",
    "input": {
        "rules": [
            {
                "logic": "JSONLogic expression",
                "message": "string"
            }
        ]
    }
}

filter

Pipeline control flow — halt workflow or skip task.

{
    "name": "filter",
    "input": {
        "condition": "JSONLogic expression",
        "on_reject": "halt | skip (default: halt)"
    }
}

Returns TaskOutcome::Success (pass), TaskOutcome::Skip (no audit entry, continue), or TaskOutcome::Halt (audit status 299, stop workflow) depending on the condition and on_reject.

log

Structured logging with JSONLogic expressions.

{
    "name": "log",
    "input": {
        "level": "trace | debug | info | warn | error (default: info)",
        "message": "JSONLogic expression",
        "fields": {
            "key": "JSONLogic expression"
        }
    }
}

Always returns TaskOutcome::Success — never modifies the message.

WASM API (dataflow-wasm)

For browser/JavaScript usage.

import init, { WasmEngine, process_message } from 'dataflow-wasm';

// Initialize
await init();

// Create engine
const engine = new WasmEngine(workflowsJson);

// Process with payload string (returns Promise)
const result = await engine.process(payloadStr);

// One-off convenience function (no engine needed)
const result2 = await process_message(workflowsJson, payloadStr);

// Get rule info
const count = engine.workflow_count();
const ids = engine.workflow_ids();

Full API Documentation

For complete API documentation, run:

cargo doc --open

This generates detailed documentation from the source code comments.