Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Performance

Dataflow-rs is designed for high-performance data processing with minimal overhead.

Architecture for Performance

Pre-compilation

All JSONLogic expressions are compiled once at engine startup:

#![allow(unused)]
fn main() {
// This compiles all logic at creation time
let engine = Engine::new(workflows, None);

// Runtime processing uses pre-compiled logic
// No parsing or compilation overhead
engine.process_message(&mut message).await?;
}

Benefits of Pre-compilation

  • Zero runtime parsing - No JSON parsing during message processing
  • Cached compiled logic - O(1) access to compiled expressions
  • Early validation - Invalid expressions caught at startup
  • Consistent latency - Predictable performance per message

Memory Efficiency

  • Arc-wrapped compiled logic - Shared without copying
  • Immutable workflows - Safe concurrent access
  • Context caching - Avoids repeated JSON cloning

Benchmarking

Run the included benchmark:

cargo run --example benchmark --release

Sample Benchmark

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow, Message};
use std::time::Instant;

// Setup
let workflow = Workflow::from_json(workflow_json)?;
let engine = Engine::new(vec![workflow], None);

// Benchmark
let iterations = 10_000;
let start = Instant::now();

for _ in 0..iterations {
    let mut message = Message::new(&test_data);
    engine.process_message(&mut message).await?;
}

let elapsed = start.elapsed();
println!("Processed {} messages in {:?}", iterations, elapsed);
println!("Average: {:?} per message", elapsed / iterations);
}

Optimization Tips

1. Minimize Mappings

Combine related transformations:

// Less efficient: Multiple mappings
{
    "mappings": [
        {"path": "data.a", "logic": {"var": "data.source.a"}},
        {"path": "data.b", "logic": {"var": "data.source.b"}},
        {"path": "data.c", "logic": {"var": "data.source.c"}}
    ]
}

// More efficient: Single object mapping when possible
{
    "mappings": [
        {"path": "data", "logic": {"var": "data.source"}}
    ]
}

2. Use Conditions Wisely

Skip unnecessary processing with conditions:

{
    "id": "expensive_task",
    "condition": {"==": [{"var": "metadata.needs_processing"}, true]},
    "function": { ... }
}

3. Order Workflows by Frequency

Put frequently-executed workflows earlier (lower priority):

{"id": "common_workflow", "priority": 1, ...}
{"id": "rare_workflow", "priority": 100, ...}

4. Use temp_data

Store intermediate results to avoid recomputation:

{
    "mappings": [
        {
            "path": "temp_data.computed",
            "logic": {"expensive": "computation"}
        },
        {
            "path": "data.result1",
            "logic": {"var": "temp_data.computed"}
        },
        {
            "path": "data.result2",
            "logic": {"var": "temp_data.computed"}
        }
    ]
}

5. Avoid Unnecessary Validation

Validate only what’s necessary:

// Validate at system boundaries
{
    "id": "input_validation",
    "condition": {"==": [{"var": "metadata.source"}, "external"]},
    "tasks": [
        {"id": "validate", "function": {"name": "validation", ...}}
    ]
}

Concurrent Processing

Process multiple messages concurrently:

#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::new(workflows, None));

let handles: Vec<_> = messages.into_iter()
    .map(|mut msg| {
        let engine = Arc::clone(&engine);
        task::spawn(async move {
            engine.process_message(&mut msg).await
        })
    })
    .collect();

// Wait for all
for handle in handles {
    handle.await??;
}
}

Thread Safety

  • Engine is Send + Sync
  • Compiled logic shared via Arc
  • Each message processed independently

Memory Considerations

Large Messages

For very large messages, consider:

  1. Streaming - Process chunks instead of entire payload
  2. Selective Loading - Load only needed fields
  3. Cleanup temp_data - Clear intermediate results when done

Many Workflows

For many workflows:

  1. Organize by Domain - Group related workflows
  2. Use Conditions - Skip irrelevant workflows early
  3. Profile - Identify bottleneck workflows

Profiling

Enable Logging

#![allow(unused)]
fn main() {
env_logger::Builder::from_env(
    env_logger::Env::default().default_filter_or("debug")
).init();
}

Custom Metrics

#![allow(unused)]
fn main() {
use std::time::Instant;

let start = Instant::now();
engine.process_message(&mut message).await?;
let duration = start.elapsed();

metrics::histogram!("dataflow.processing_time", duration);
}

Production Recommendations

  1. Build with –release - Debug builds are significantly slower
  2. Pre-warm - Process a few messages at startup to warm caches
  3. Monitor - Track processing times and error rates
  4. Profile - Identify slow workflows in production
  5. Scale Horizontally - Engine is stateless, scale with instances