Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Audit Trails

Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.

Overview

Every change to message data is recorded in the audit trail:

  • What changed - Path and values (old and new)
  • When it changed - Timestamp
  • Which action - Rule (workflow) and action (task) identifiers

Audit Trail Structure

#![allow(unused)]
fn main() {
pub struct AuditTrail {
    pub workflow_id: Arc<str>,
    pub task_id: Arc<str>,
    pub timestamp: DateTime<Utc>,
    pub changes: Vec<Change>,
    pub status: usize,
}

pub struct Change {
    pub path: Arc<str>,
    pub old_value: OwnedDataValue,
    pub new_value: OwnedDataValue,
}
}

old_value / new_value are owned (not Arc<OwnedDataValue>) — one less heap allocation per recorded mutation. workflow_id / task_id are Arc<str> mirrors of the workflow/task ids — the engine clones them by refcount bump rather than allocating per audit entry. status mirrors the TaskOutcome variant returned by the task: 200 for Success, the supplied code for Status(u16), and 299 (HALT_STATUS_CODE) for Halt. TaskOutcome::Skip is recorded as no audit entry at all.

Accessing the Audit Trail

After processing, the audit trail is available on the message:

#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;

for entry in message.audit_trail() {
    println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
    println!("Timestamp: {}", entry.timestamp);

    for change in &entry.changes {
        println!("  Path: {}", change.path);
        println!("  Old: {}", change.old_value);
        println!("  New: {}", change.new_value);
    }
}
}

JSON Representation

In the playground output, the audit trail appears as:

{
    "audit_trail": [
        {
            "task_id": "transform_data",
            "workflow_id": "my_workflow",
            "timestamp": "2024-01-01T12:00:00Z",
            "changes": [
                {
                    "path": "data.full_name",
                    "old_value": null,
                    "new_value": "John Doe"
                },
                {
                    "path": "data.greeting",
                    "old_value": null,
                    "new_value": "Hello, John Doe!"
                }
            ]
        }
    ]
}

What Gets Tracked

Map Function

Every mapping that modifies data creates a change entry:

{
    "mappings": [
        {"path": "data.name", "logic": "John"}
    ]
}

Creates:

{
    "path": "data.name",
    "old_value": null,
    "new_value": "John"
}

Custom Functions

Custom functions don’t build Change entries by hand — TaskContext::set records them automatically when capture_changes is on. The handler just writes the value and returns TaskOutcome::Success:

ctx.set("data.processed", OwnedDataValue::Bool(true));
Ok(TaskOutcome::Success)

Validation Function

Validation is read-only, so it produces no audit trail entries.

Try It

Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.

Notice the audit trail shows each step’s changes.

Use Cases

Debugging

Trace exactly how data was transformed:

#![allow(unused)]
fn main() {
// Find where a value was set
for entry in message.audit_trail() {
    for change in &entry.changes {
        if change.path.as_ref() == "data.total" {
            println!("data.total set by {}/{}",
                entry.workflow_id, entry.task_id);
            println!("Changed from {} to {}",
                change.old_value, change.new_value);
        }
    }
}
}

Compliance

Log all changes for regulatory compliance:

#![allow(unused)]
fn main() {
for entry in message.audit_trail() {
    log_to_audit_system(
        entry.timestamp,
        entry.workflow_id.clone(),
        entry.task_id.clone(),
        &entry.changes
    );
}
}

Change Detection

Detect if specific fields were modified:

#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
    message.audit_trail().iter()
        .flat_map(|e| e.changes.iter())
        .any(|c| c.path.as_ref() == field)
}

if was_field_modified(&message, "data.price") {
    // Price was changed during processing
}
}

Rollback (Conceptual)

The audit trail can be used to implement rollback:

#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;

fn get_original_value<'a>(message: &'a Message, field: &str) -> Option<&'a OwnedDataValue> {
    message.audit_trail().iter()
        .flat_map(|e| e.changes.iter())
        .find(|c| c.path.as_ref() == field)
        .map(|c| &c.old_value)
}
}

Best Practices

  1. Track All Changes - Custom functions should record all modifications
  2. Use Arc for ids - workflow_id / task_id clone via refcount bump
  3. Timestamp Accuracy - Timestamps are UTC for consistency
  4. Check Audit Trail - Review audit trail during development
  5. Log for Production - Persist audit trails for production debugging
  6. Bulk Pipelines - Build the message with Message::builder().capture_changes(false).build() to skip per-write change capture in throughput-critical pipelines (audit entries are still recorded with empty changes).