Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Custom Functions

Extend dataflow-rs with your own custom processing logic by implementing the AsyncFunctionHandler trait.

Overview

Custom functions allow you to:

  • Add domain-specific processing logic
  • Integrate with external systems
  • Perform async operations (HTTP, database, etc.)
  • Implement complex transformations

The trait has three moving parts:

  • type Input — your typed config shape. The engine deserializes each task’s FunctionConfig::Custom { input } JSON into this type once at Engine::builder().build(), not per message. Misshapen config fails at startup.
  • TaskContext — handed to every call. Read the message context (ctx.data(), ctx.metadata(), ctx.temp_data(), ctx.get(path)), mutate it through ctx.set(path, value) which records audit-trail changes automatically, and append errors via ctx.add_error(...).
  • TaskOutcome — the return value: Success, Status(u16), Skip, or Halt. Replaces the magic-number usize of earlier versions.

Implementing AsyncFunctionHandler

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;

/// Typed config for the handler. The engine deserializes the task's
/// `FunctionConfig::Custom { input }` JSON into this struct at startup;
/// misshapen config fails there, not on first message.
#[derive(Deserialize)]
pub struct MyInput {
    target: String,
}

pub struct MyCustomFunction;

#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
    type Input = MyInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &MyInput,
    ) -> Result<TaskOutcome> {
        // Write into the context. `ctx.set` auto-creates intermediate
        // objects/arrays and records a `Change` on the audit trail
        // when `message.capture_changes` is on.
        ctx.set(&input.target, OwnedDataValue::from(&json!(true)));
        Ok(TaskOutcome::Success)
    }
}
}

Three concrete things the new shape removes:

  1. No match config { Custom { input, .. } => ..., _ => Err(...) } block — input is the typed parameter directly.
  2. No hand-built Change entries — ctx.set does that.
  3. No magic Ok((200, vec![])) return — TaskOutcome::Success is self-documenting.

Registering Custom Functions

#![allow(unused)]
fn main() {
let engine = Engine::builder()
    .with_workflows(workflows)
    .register("my_custom_function", MyCustomFunction)
    .build()?;
}

register("name", handler) accepts any AsyncFunctionHandler and boxes it internally. The dyn-trait name (BoxedFunctionHandler) stays out of user code.

Using Custom Functions in Rules

{
    "id": "custom_rule",
    "tasks": [
        {
            "id": "custom_action",
            "function": {
                "name": "my_custom_function",
                "input": {
                    "target": "data.processed"
                }
            }
        }
    ]
}

The input shape on the wire must match your handler’s Input struct. serde does the parse at engine init time.

Accessing Configuration

Because the engine pre-parses the JSON, configuration is just the input parameter — no extraction step. For freeform JSON, set type Input = serde_json::Value;:

use serde_json::Value;

#[async_trait]
impl AsyncFunctionHandler for FreeformHandler {
    type Input = Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &Value,
    ) -> Result<TaskOutcome> {
        let option1 = input.get("option1").and_then(Value::as_str).unwrap_or("default");
        let option2 = input.get("option2").and_then(Value::as_i64).unwrap_or(0);
        // ...
        Ok(TaskOutcome::Success)
    }
}

Evaluating JSONLogic from a handler

Custom handlers can compile and evaluate ad-hoc JSONLogic using the shared datalogic engine exposed by TaskContext::datalogic():

use bumpalo::Bump;
use dataflow_rs::prelude::*;
use serde_json::json;

#[async_trait]
impl AsyncFunctionHandler for EvalDemo {
    type Input = serde_json::Value;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        _input: &serde_json::Value,
    ) -> Result<TaskOutcome> {
        // Compile the expression — Arc<Logic> so it can be cached/shared.
        let compiled = ctx
            .datalogic()
            .compile_arc(&json!({"var": "data.input"}))
            .map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;

        // Evaluate against the current message context.
        let arena = Bump::new();
        let av = ctx.message().context.to_arena(&arena);
        let result = ctx
            .datalogic()
            .evaluate(&compiled, av, &arena)
            .map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;

        // `result` is a `DataValue<'_>` borrowed from the arena.
        let _owned = result.to_owned();
        Ok(TaskOutcome::Success)
    }
}

If your handler evaluates many expressions against the same context, build the DataValue<'_> once via to_arena and reuse it.

Async Operations

The trait is async/await all the way through. Real I/O works naturally:

use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::Value;

#[derive(Deserialize)]
pub struct HttpFetchInput {
    url: String,
}

pub struct HttpFetchFunction;

#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
    type Input = HttpFetchInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &HttpFetchInput,
    ) -> Result<TaskOutcome> {
        let response = reqwest::get(&input.url)
            .await
            .map_err(|e| DataflowError::http(0, e.to_string()))?;

        let body: Value = response
            .json()
            .await
            .map_err(|e| DataflowError::http(0, e.to_string()))?;

        ctx.set("data.fetched", OwnedDataValue::from(&body));
        Ok(TaskOutcome::Success)
    }
}

Error Handling

Return appropriate errors for different failure modes:

async fn execute(
    &self,
    ctx: &mut TaskContext<'_>,
    _input: &Self::Input,
) -> Result<TaskOutcome> {
    if some_validation_fails {
        return Err(DataflowError::Validation("Invalid input".to_string()));
    }

    if some_operation_fails {
        return Err(DataflowError::Task("Operation failed".to_string()));
    }

    if downstream_call_failed {
        return Err(DataflowError::function_execution(
            "HTTP call failed",
            Some(DataflowError::http(503, "Service Unavailable")),
        ));
    }

    // Or return a status code for an HTTP-style outcome that isn't an Err:
    // 200 for success, 400 for validation failure, 500 for processing failure.
    Ok(TaskOutcome::Status(500))
}

The engine routes errors and 5xx statuses through message.errors() — see Error Handling for the unified-channel contract.

Complete Example

#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;

/// Calculates statistics from numeric array data
#[derive(Deserialize)]
pub struct StatisticsInput {
    /// Field inside `data` whose value is the array to summarize.
    field: String,
}

pub struct StatisticsFunction;

#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
    type Input = StatisticsInput;

    async fn execute(
        &self,
        ctx: &mut TaskContext<'_>,
        input: &StatisticsInput,
    ) -> Result<TaskOutcome> {
        let numbers: Vec<f64> = ctx
            .data()
            .get(input.field.as_str())
            .and_then(|v| v.as_array())
            .map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
            .unwrap_or_default();

        if numbers.is_empty() {
            return Err(DataflowError::Validation(format!(
                "Field '{}' has no numeric values",
                input.field
            )));
        }

        let sum: f64 = numbers.iter().sum();
        let count = numbers.len() as f64;
        let mean = sum / count;
        let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
        let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);

        ctx.set(
            "data.statistics",
            OwnedDataValue::from(&json!({
                "count": count,
                "sum": sum,
                "mean": mean,
                "min": min,
                "max": max,
            })),
        );
        Ok(TaskOutcome::Success)
    }
}
}

Best Practices

  1. Use a typed Input — let serde validate at startup. Reach for serde_json::Value only when the input genuinely is freeform.
  2. Mutate via ctx.set — it auto-records the audit trail. Reaching into message.context directly bypasses change capture.
  3. Return TaskOutcome cleanlySuccess for the happy path, Status(u16) for HTTP-like codes (5xx pushes a TASK_STATUS_ERROR to message.errors()), Skip for “did nothing, continue”, Halt for “stop this workflow”.
  4. Use the right error typeDataflowError::retryable looks at the variant to decide whether transient errors are worth retrying.
  5. Document — your handler’s Input struct is its contract; docstring it.
  6. Test — drive the handler with TaskContext::new(&mut message, &datalogic) and assert on the outcome and ctx.into_changes().