Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Message

A Message is the data container that flows through rules, carrying data, metadata, and an audit trail.

Overview

The Message structure contains:

  • context.data - Main data payload
  • context.metadata - Message metadata (routing, source info)
  • context.temp_data - Temporary processing data
  • audit_trail - Record of all changes
  • errors - Collected errors during processing

Message Structure

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
use std::sync::Arc;

pub struct Message {
    // Read via accessors: id(), payload(), payload_arc(), audit_trail(),
    // errors(), capture_changes(). Mutate `errors` via add_error(...).
    // `context` is the only public field — it's the legitimate read
    // surface for tests (e.g. `message.context["data"]["x"]`); inside a
    // handler, prefer `TaskContext::set` so audit-trail changes are
    // recorded automatically.
    pub context: OwnedDataValue,          // Always an Object {data, metadata, temp_data}
    // ... encapsulated fields ...
}
}

The context is structured as:

{
    "data": { ... },
    "metadata": { ... },
    "temp_data": { ... }
}

Creating Messages

Basic Creation

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

// `from_value` bridges from serde_json::Value — handiest when you already
// have JSON literals. The payload lands on the message; the context
// starts empty with the canonical {data, metadata, temp_data} shape.
let mut message = Message::from_value(&json!({
    "name": "John",
    "email": "john@example.com"
}));
}

Native Construction (Zero-Conversion)

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
use std::sync::Arc;

let payload = Arc::new(OwnedDataValue::from(&json!({
    "name": "John"
})));
let mut message = Message::new(payload);
}

Builder

For the richer cases — caller-supplied id (correlation), capture-off fast path — use Message::builder():

#![allow(unused)]
fn main() {
let mut message = Message::builder()
    .id("correlation-123")
    .payload_json(&json!({"name": "John"}))
    .capture_changes(false) // skip per-write Change capture
    .build();
}

Populating the Context

In practice you don’t mutate message.context directly from Rust — the parse_json / map / validation built-ins are how your workflows populate it. Inside a custom AsyncFunctionHandler, use TaskContext::set which records audit-trail changes automatically:

ctx.set("metadata.source", OwnedDataValue::from(&json!("api")));
ctx.set("metadata.type",   OwnedDataValue::from(&json!("user")));

Context Fields

data

The main data payload. This is where your primary data lives and is transformed. Workflows populate it via parse_json / map tasks; handlers read it through ctx.data(). The example below shows the read accessors:

// Inside an AsyncFunctionHandler — TaskContext::data() returns
// &OwnedDataValue (Null if missing, matching serde_json::Value index
// semantics).
let name = ctx.data().get("name");

// Outside a handler (e.g. inspecting a processed message in tests):
let name = &message.data()["name"];

metadata

Information about the message itself (not the data). Commonly used for:

  • Routing decisions (rule conditions)
  • Source tracking
  • Timestamps
  • Message type classification

From a handler, ctx.set("metadata.X", v) is the canonical write path. The engine also stamps metadata.processed_at and metadata.engine_version automatically on every process_message call.

temp_data

Temporary storage for intermediate processing results — useful for values threaded between tasks within the same workflow. From a handler:

ctx.set("temp_data.calculated_value", OwnedDataValue::from(&json!(42)));

// Later tasks read it via JSONLogic:
//   {"var": "temp_data.calculated_value"}

Audit Trail

Every modification to message data is recorded:

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,  // owned (not Arc) — one fewer heap alloc per Change
    pub new_value: OwnedDataValue,
}
}

To skip per-write Change capture (bulk-pipeline fast path), build the message with capture_changes(false):

#![allow(unused)]
fn main() {
let m = Message::builder()
    .payload_json(&json!({}))
    .capture_changes(false)
    .build();
}

Audit-trail entries are still recorded — just with empty changes lists. The wire shape is unchanged either way.

Accessing Audit Trail

#![allow(unused)]
fn main() {
// After processing — audit_trail() returns &[AuditTrail].
for entry in message.audit_trail() {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    for change in &entry.changes {
        println!("  {} -> {} at {}", change.old_value, change.new_value, change.path);
    }
}
}

Error Handling

Errors are collected in message.errors() (the always-on channel, even when Engine::process_message returns Result::Err):

#![allow(unused)]
fn main() {
for error in message.errors() {
    println!("Error in {}/{}: {}",
        error.workflow_id.as_deref().unwrap_or("unknown"),
        error.task_id.as_deref().unwrap_or("unknown"),
        error.message
    );
}
}

See Error Handling for the unified-channel contract in detail.

JSONLogic Access

In rule conditions and mappings, access message fields using JSONLogic:

// Access data fields
{"var": "data.name"}
{"var": "data.user.email"}

// Access metadata
{"var": "metadata.type"}
{"var": "metadata.source"}

// Access temp_data
{"var": "temp_data.intermediate_result"}

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice how temp_data is used to store an intermediate result.

Best Practices

  1. Separate Concerns

    • Use data for business data
    • Use metadata for routing and rule conditions
    • Use temp_data for intermediate results
  2. Don’t Modify metadata in Tasks

    • Metadata should remain stable for routing decisions
  3. Clean temp_data

    • Use temp_data for values only needed during processing
  4. Check Audit Trail

    • Use the audit trail for debugging and compliance