Performance
Dataflow-rs is designed for high-performance data processing with minimal overhead.
Architecture for Performance
Pre-compilation
All JSONLogic expressions are compiled once at engine startup:
#![allow(unused)]
fn main() {
// This compiles all logic at creation time
let engine = Engine::new(workflows, None);
// Runtime processing uses pre-compiled logic
// No parsing or compilation overhead
engine.process_message(&mut message).await?;
}
Benefits of Pre-compilation
- Zero runtime parsing - No JSON parsing during message processing
- Cached compiled logic - O(1) access to compiled expressions
- Early validation - Invalid expressions caught at startup
- Consistent latency - Predictable performance per message
Memory Efficiency
- Arc-wrapped compiled logic - Shared without copying
- Immutable workflows - Safe concurrent access
- Context caching - Avoids repeated JSON cloning
Benchmarking
Run the included benchmark:
cargo run --example benchmark --release
Sample Benchmark
#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;
// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::new(vec![workflow], None);
// Benchmark
let iterations = 10_000;
let start = Instant::now();
for _ in 0..iterations {
let mut message = Message::new(&test_data);
engine.process_message(&mut message).await?;
}
let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
}
Optimization Tips
1. Minimize Mappings
Combine related transformations:
// Less efficient: Multiple mappings
{
"mappings": [
{"path": "data.a", "logic": {"var": "data.source.a"}},
{"path": "data.b", "logic": {"var": "data.source.b"}},
{"path": "data.c", "logic": {"var": "data.source.c"}}
]
}
// More efficient: Single object mapping when possible
{
"mappings": [
{"path": "data", "logic": {"var": "data.source"}}
]
}
2. Use Conditions Wisely
Skip unnecessary processing with conditions:
{
"id": "expensive_task",
"condition": {"==": [{"var": "metadata.needs_processing"}, true]},
"function": { ... }
}
3. Order Workflows by Frequency
Put frequently-executed workflows earlier (lower priority):
{"id": "common_workflow", "priority": 1, ...}
{"id": "rare_workflow", "priority": 100, ...}
4. Use temp_data
Store intermediate results to avoid recomputation:
{
"mappings": [
{
"path": "temp_data.computed",
"logic": {"expensive": "computation"}
},
{
"path": "data.result1",
"logic": {"var": "temp_data.computed"}
},
{
"path": "data.result2",
"logic": {"var": "temp_data.computed"}
}
]
}
5. Avoid Unnecessary Validation
Validate only what’s necessary:
// Validate at system boundaries
{
"id": "input_validation",
"condition": {"==": [{"var": "metadata.source"}, "external"]},
"tasks": [
{"id": "validate", "function": {"name": "validation", ...}}
]
}
Concurrent Processing
Process multiple messages concurrently:
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;
let engine = Arc::new(Engine::new(workflows, None));
let handles: Vec<_> = messages.into_iter()
.map(|mut msg| {
let engine = Arc::clone(&engine);
task::spawn(async move {
engine.process_message(&mut msg).await
})
})
.collect();
// Wait for all
for handle in handles {
handle.await??;
}
}
Thread Safety
- Engine is
Send + Sync - Compiled logic shared via
Arc - Each message processed independently
Memory Considerations
Large Messages
For very large messages, consider:
- Streaming - Process chunks instead of entire payload
- Selective Loading - Load only needed fields
- Cleanup temp_data - Clear intermediate results when done
Many Workflows
For many workflows:
- Organize by Domain - Group related workflows
- Use Conditions - Skip irrelevant workflows early
- Profile - Identify bottleneck workflows
Profiling
Enable Logging
#![allow(unused)]
fn main() {
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("debug")
).init();
}
Custom Metrics
#![allow(unused)]
fn main() {
use std::time::Instant;
let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();
metrics::histogram!("dataflow.processing_time", duration);
}
Production Recommendations
- Build with –release - Debug builds are significantly slower
- Pre-warm - Process a few messages at startup to warm caches
- Monitor - Track processing times and error rates
- Profile - Identify slow workflows in production
- Scale Horizontally - Engine is stateless, scale with instances