Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Engine

The Engine is the central component that orchestrates message processing through workflows.

Overview

The Engine is responsible for:

  • Compiling all JSONLogic expressions at initialization
  • Managing workflow execution order
  • Processing messages through matching workflows
  • Coordinating task execution

Creating an Engine

#![allow(unused)]
fn main() {
use dataflow_rs::{Engine, Workflow};
use std::collections::HashMap;

// Parse workflows from JSON
let workflow1 = Workflow::from_json(r#"{
    "id": "workflow1",
    "name": "First Workflow",
    "priority": 1,
    "tasks": [...]
}"#)?;

let workflow2 = Workflow::from_json(r#"{
    "id": "workflow2",
    "name": "Second Workflow",
    "priority": 2,
    "tasks": [...]
}"#)?;

// Create engine with workflows
let engine = Engine::new(
    vec![workflow1, workflow2],
    None  // Optional custom functions
);

// Engine is now ready - all logic compiled
println!("Loaded {} workflows", engine.workflows().len());
}

Processing Messages

#![allow(unused)]
fn main() {
use dataflow_rs::Message;
use serde_json::json;

// Create a message
let mut message = Message::new(&json!({
    "user": "john",
    "action": "login"
}));

// Process through all matching workflows
engine.process_message(&mut message).await?;

// Access results
println!("Processed data: {:?}", message.context["data"]);
println!("Audit trail: {:?}", message.audit_trail);
}

Workflow Execution Order

Workflows execute in priority order (lowest priority number first):

#![allow(unused)]
fn main() {
// Priority 1 executes first
let high_priority = Workflow::from_json(r#"{
    "id": "high",
    "priority": 1,
    "tasks": [...]
}"#)?;

// Priority 10 executes later
let low_priority = Workflow::from_json(r#"{
    "id": "low",
    "priority": 10,
    "tasks": [...]
}"#)?;
}

Workflow Conditions

Workflows can have conditions that determine if they should execute:

{
    "id": "user_workflow",
    "name": "User Workflow",
    "condition": { "==": [{"var": "metadata.type"}, "user"] },
    "tasks": [...]
}

The workflow only executes if the condition evaluates to true.

Custom Functions

Register custom functions when creating the engine:

#![allow(unused)]
fn main() {
use dataflow_rs::engine::AsyncFunctionHandler;
use std::collections::HashMap;

let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> = HashMap::new();
custom_functions.insert("my_function".to_string(), Box::new(MyCustomFunction));

let engine = Engine::new(workflows, Some(custom_functions));
}

Thread Safety

The Engine is designed for concurrent use:

  • Workflows are immutable after creation
  • Compiled logic is shared via Arc
  • Each message is processed independently
#![allow(unused)]
fn main() {
use std::sync::Arc;
use tokio::task;

let engine = Arc::new(Engine::new(workflows, None));

// Process multiple messages concurrently
let handles: Vec<_> = messages.into_iter().map(|mut msg| {
    let engine = Arc::clone(&engine);
    task::spawn(async move {
        engine.process_message(&mut msg).await
    })
}).collect();

// Wait for all to complete
for handle in handles {
    handle.await??;
}
}

API Reference

Engine::new(workflows, custom_functions)

Creates a new engine with the given workflows and optional custom functions.

  • workflows: Vec<Workflow> - Workflows to register
  • custom_functions: Option<HashMap<String, Box<dyn AsyncFunctionHandler>>> - Custom function implementations

engine.process_message(&mut message)

Processes a message through all matching workflows.

  • Returns Result<()> - Ok if processing succeeded
  • Message is modified in place with results and audit trail

engine.workflows()

Returns a reference to the registered workflows.

#![allow(unused)]
fn main() {
let workflow_ids: Vec<&String> = engine.workflows().keys().collect();
}