Audit Trails
Dataflow-rs automatically tracks all data modifications for debugging, monitoring, and compliance.
Overview
Every change to message data is recorded in the audit trail:
- What changed - Path and values (old and new)
- When it changed - Timestamp
- Which task - Workflow and task identifiers
Audit Trail Structure
#![allow(unused)]
fn main() {
pub struct AuditTrail {
pub task_id: String,
pub workflow_id: String,
pub timestamp: DateTime<Utc>,
pub changes: Vec<Change>,
}
pub struct Change {
pub path: Arc<str>,
pub old_value: Arc<Value>,
pub new_value: Arc<Value>,
}
}
Accessing the Audit Trail
After processing, the audit trail is available on the message:
#![allow(unused)]
fn main() {
engine.process_message(&mut message).await?;
for entry in &message.audit_trail {
println!("Workflow: {}, Task: {}", entry.workflow_id, entry.task_id);
println!("Timestamp: {}", entry.timestamp);
for change in &entry.changes {
println!(" Path: {}", change.path);
println!(" Old: {}", change.old_value);
println!(" New: {}", change.new_value);
}
}
}
JSON Representation
In the playground output, the audit trail appears as:
{
"audit_trail": [
{
"task_id": "transform_data",
"workflow_id": "my_workflow",
"timestamp": "2024-01-01T12:00:00Z",
"changes": [
{
"path": "data.full_name",
"old_value": null,
"new_value": "John Doe"
},
{
"path": "data.greeting",
"old_value": null,
"new_value": "Hello, John Doe!"
}
]
}
]
}
What Gets Tracked
Map Function
Every mapping that modifies data creates a change entry:
{
"mappings": [
{"path": "data.name", "logic": "John"}
]
}
Creates:
{
"path": "data.name",
"old_value": null,
"new_value": "John"
}
Custom Functions
Custom functions should track changes for proper auditing:
#![allow(unused)]
fn main() {
let changes = vec![Change {
path: Arc::from("data.processed"),
old_value: Arc::new(old_value),
new_value: Arc::new(new_value),
}];
Ok((200, changes))
}
Validation Function
Validation is read-only, so it produces no audit trail entries.
Try It
Notice the audit trail shows each step’s changes.
Use Cases
Debugging
Trace exactly how data was transformed:
#![allow(unused)]
fn main() {
// Find where a value was set
for entry in &message.audit_trail {
for change in &entry.changes {
if change.path.as_ref() == "data.total" {
println!("data.total set by {}/{}",
entry.workflow_id, entry.task_id);
println!("Changed from {} to {}",
change.old_value, change.new_value);
}
}
}
}
Compliance
Log all changes for regulatory compliance:
#![allow(unused)]
fn main() {
for entry in &message.audit_trail {
log_to_audit_system(
entry.timestamp,
entry.workflow_id.clone(),
entry.task_id.clone(),
&entry.changes
);
}
}
Change Detection
Detect if specific fields were modified:
#![allow(unused)]
fn main() {
fn was_field_modified(message: &Message, field: &str) -> bool {
message.audit_trail.iter()
.flat_map(|e| e.changes.iter())
.any(|c| c.path.as_ref() == field)
}
if was_field_modified(&message, "data.price") {
// Price was changed during processing
}
}
Rollback (Conceptual)
The audit trail can be used to implement rollback:
#![allow(unused)]
fn main() {
fn get_original_value(message: &Message, field: &str) -> Option<&Value> {
message.audit_trail.iter()
.flat_map(|e| e.changes.iter())
.find(|c| c.path.as_ref() == field)
.map(|c| c.old_value.as_ref())
}
}
Best Practices
- Track All Changes - Custom functions should record all modifications
- Use Arc - Use
Arc<str>andArc<Value>for efficient sharing - Timestamp Accuracy - Timestamps are UTC for consistency
- Check Audit Trail - Review audit trail during development
- Log for Production - Persist audit trails for production debugging