Audit Trails
Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.
Overview
Every change to message data is recorded in the audit trail:
- What changed - Path and values (old and new)
- When it changed - Timestamp
- Which action - Rule (workflow) and action (task) identifiers
Audit Trail Structure
#![allow(unused)]
fn main() {
pub struct AuditTrail {
pub workflow_id: Arc<str>,
pub task_id: Arc<str>,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
pub status: usize,
}
pub struct Change {
pub path: Arc<str>,
pub old_value: OwnedDataValue,
pub new_value: OwnedDataValue,
}
}
old_value / new_value are owned (not Arc<OwnedDataValue>) — one less
heap allocation per recorded mutation. workflow_id / task_id are
Arc<str> mirrors of the workflow/task ids
— the engine clones them by refcount bump rather than allocating per
audit entry. status mirrors the TaskOutcome variant returned by the
task: 200 for Success, the supplied code for Status(u16), and 299
(HALT_STATUS_CODE) for Halt. TaskOutcome::Skip is recorded as no
audit entry at all.
Accessing the Audit Trail
After processing, the audit trail is available on the message:
#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;
for entry in message.audit_trail() {
println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
println!("Timestamp: {}", entry.timestamp);
for change in &entry.changes {
println!(" Path: {}", change.path);
println!(" Old: {}", change.old_value);
println!(" New: {}", change.new_value);
}
}
}
JSON Representation
In the playground output, the audit trail appears as:
{
"audit_trail": [
{
"task_id": "transform_data",
"workflow_id": "my_workflow",
"timestamp": "2024-01-01T12:00:00Z",
"changes": [
{
"path": "data.full_name",
"old_value": null,
"new_value": "John Doe"
},
{
"path": "data.greeting",
"old_value": null,
"new_value": "Hello, John Doe!"
}
]
}
]
}
What Gets Tracked
Map Function
Every mapping that modifies data creates a change entry:
{
"mappings": [
{"path": "data.name", "logic": "John"}
]
}
Creates:
{
"path": "data.name",
"old_value": null,
"new_value": "John"
}
Custom Functions
Custom functions don’t build Change entries by hand — TaskContext::set
records them automatically when capture_changes is on. The handler
just writes the value and returns TaskOutcome::Success:
ctx.set("data.processed", OwnedDataValue::Bool(true));
Ok(TaskOutcome::Success)
Validation Function
Validation is read-only, so it produces no audit trail entries.
Try It
Want more features? Try the Full Debugger UI with step-by-step execution and workflow visualization.
Notice the audit trail shows each step’s changes.
Use Cases
Debugging
Trace exactly how data was transformed:
#![allow(unused)]
fn main() {
// Find where a value was set
for entry in message.audit_trail() {
for change in &entry.changes {
if change.path.as_ref() == "data.total" {
println!("data.total set by {}/{}",
entry.workflow_id, entry.task_id);
println!("Changed from {} to {}",
change.old_value, change.new_value);
}
}
}
}
Compliance
Log all changes for regulatory compliance:
#![allow(unused)]
fn main() {
for entry in message.audit_trail() {
log_to_audit_system(
entry.timestamp,
entry.workflow_id.clone(),
entry.task_id.clone(),
&entry.changes
);
}
}
Change Detection
Detect if specific fields were modified:
#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
message.audit_trail().iter()
.flat_map(|e| e.changes.iter())
.any(|c| c.path.as_ref() == field)
}
if was_field_modified(&message, "data.price") {
// Price was changed during processing
}
}
Rollback (Conceptual)
The audit trail can be used to implement rollback:
#![allow(unused)]
fn main() {
use datavalue::OwnedDataValue;
fn get_original_value<'a>(message: &'a Message, field: &str) -> Option<&'a OwnedDataValue> {
message.audit_trail().iter()
.flat_map(|e| e.changes.iter())
.find(|c| c.path.as_ref() == field)
.map(|c| &c.old_value)
}
}
Best Practices
- Track All Changes - Custom functions should record all modifications
- Use Arc
for ids -workflow_id/task_idclone via refcount bump - Timestamp Accuracy - Timestamps are UTC for consistency
- Check Audit Trail - Review audit trail during development
- Log for Production - Persist audit trails for production debugging
- Bulk Pipelines - Build the message with
Message::builder().capture_changes(false).build()to skip per-write change capture in throughput-critical pipelines (audit entries are still recorded with emptychanges).