Custom Functions
Extend dataflow-rs with your own custom processing logic by implementing
the AsyncFunctionHandler trait.
Overview
Custom functions allow you to:
- Add domain-specific processing logic
- Integrate with external systems
- Perform async operations (HTTP, database, etc.)
- Implement complex transformations
The trait has three moving parts:
type Input— your typed config shape. The engine deserializes each task’sFunctionConfig::Custom { input }JSON into this type once atEngine::builder().build(), not per message. Misshapen config fails at startup.TaskContext— handed to every call. Read the message context (ctx.data(),ctx.metadata(),ctx.temp_data(),ctx.get(path)), mutate it throughctx.set(path, value)which records audit-trail changes automatically, and append errors viactx.add_error(...).TaskOutcome— the return value:Success,Status(u16),Skip, orHalt. Replaces the magic-numberusizeof earlier versions.
Implementing AsyncFunctionHandler
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;
/// Typed config for the handler. The engine deserializes the task's
/// `FunctionConfig::Custom { input }` JSON into this struct at startup;
/// misshapen config fails there, not on first message.
#[derive(Deserialize)]
pub struct MyInput {
target: String,
}
pub struct MyCustomFunction;
#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
type Input = MyInput;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &MyInput,
) -> Result<TaskOutcome> {
// Write into the context. `ctx.set` auto-creates intermediate
// objects/arrays and records a `Change` on the audit trail
// when `message.capture_changes` is on.
ctx.set(&input.target, OwnedDataValue::from(&json!(true)));
Ok(TaskOutcome::Success)
}
}
}
Three concrete things the new shape removes:
- No
match config { Custom { input, .. } => ..., _ => Err(...) }block —inputis the typed parameter directly. - No hand-built
Changeentries —ctx.setdoes that. - No magic
Ok((200, vec![]))return —TaskOutcome::Successis self-documenting.
Registering Custom Functions
#![allow(unused)]
fn main() {
let engine = Engine::builder()
.with_workflows(workflows)
.register("my_custom_function", MyCustomFunction)
.build()?;
}
register("name", handler) accepts any AsyncFunctionHandler and boxes
it internally. The dyn-trait name (BoxedFunctionHandler) stays out of
user code.
Using Custom Functions in Rules
{
"id": "custom_rule",
"tasks": [
{
"id": "custom_action",
"function": {
"name": "my_custom_function",
"input": {
"target": "data.processed"
}
}
}
]
}
The input shape on the wire must match your handler’s Input struct.
serde does the parse at engine init time.
Accessing Configuration
Because the engine pre-parses the JSON, configuration is just the
input parameter — no extraction step. For freeform JSON, set
type Input = serde_json::Value;:
use serde_json::Value;
#[async_trait]
impl AsyncFunctionHandler for FreeformHandler {
type Input = Value;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &Value,
) -> Result<TaskOutcome> {
let option1 = input.get("option1").and_then(Value::as_str).unwrap_or("default");
let option2 = input.get("option2").and_then(Value::as_i64).unwrap_or(0);
// ...
Ok(TaskOutcome::Success)
}
}
Evaluating JSONLogic from a handler
Custom handlers can compile and evaluate ad-hoc JSONLogic using the
shared datalogic engine exposed by TaskContext::datalogic():
use bumpalo::Bump;
use dataflow_rs::prelude::*;
use serde_json::json;
#[async_trait]
impl AsyncFunctionHandler for EvalDemo {
type Input = serde_json::Value;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
_input: &serde_json::Value,
) -> Result<TaskOutcome> {
// Compile the expression — Arc<Logic> so it can be cached/shared.
let compiled = ctx
.datalogic()
.compile_arc(&json!({"var": "data.input"}))
.map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;
// Evaluate against the current message context.
let arena = Bump::new();
let av = ctx.message().context.to_arena(&arena);
let result = ctx
.datalogic()
.evaluate(&compiled, av, &arena)
.map_err(|e| DataflowError::LogicEvaluation(e.to_string()))?;
// `result` is a `DataValue<'_>` borrowed from the arena.
let _owned = result.to_owned();
Ok(TaskOutcome::Success)
}
}
If your handler evaluates many expressions against the same context,
build the DataValue<'_> once via to_arena and reuse it.
Async Operations
The trait is async/await all the way through. Real I/O works naturally:
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::Value;
#[derive(Deserialize)]
pub struct HttpFetchInput {
url: String,
}
pub struct HttpFetchFunction;
#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
type Input = HttpFetchInput;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &HttpFetchInput,
) -> Result<TaskOutcome> {
let response = reqwest::get(&input.url)
.await
.map_err(|e| DataflowError::http(0, e.to_string()))?;
let body: Value = response
.json()
.await
.map_err(|e| DataflowError::http(0, e.to_string()))?;
ctx.set("data.fetched", OwnedDataValue::from(&body));
Ok(TaskOutcome::Success)
}
}
Error Handling
Return appropriate errors for different failure modes:
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
_input: &Self::Input,
) -> Result<TaskOutcome> {
if some_validation_fails {
return Err(DataflowError::Validation("Invalid input".to_string()));
}
if some_operation_fails {
return Err(DataflowError::Task("Operation failed".to_string()));
}
if downstream_call_failed {
return Err(DataflowError::function_execution(
"HTTP call failed",
Some(DataflowError::http(503, "Service Unavailable")),
));
}
// Or return a status code for an HTTP-style outcome that isn't an Err:
// 200 for success, 400 for validation failure, 500 for processing failure.
Ok(TaskOutcome::Status(500))
}
The engine routes errors and 5xx statuses through message.errors() —
see Error Handling for the
unified-channel contract.
Complete Example
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::prelude::*;
use datavalue::OwnedDataValue;
use serde::Deserialize;
use serde_json::json;
/// Calculates statistics from numeric array data
#[derive(Deserialize)]
pub struct StatisticsInput {
/// Field inside `data` whose value is the array to summarize.
field: String,
}
pub struct StatisticsFunction;
#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
type Input = StatisticsInput;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
input: &StatisticsInput,
) -> Result<TaskOutcome> {
let numbers: Vec<f64> = ctx
.data()
.get(input.field.as_str())
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_f64()).collect())
.unwrap_or_default();
if numbers.is_empty() {
return Err(DataflowError::Validation(format!(
"Field '{}' has no numeric values",
input.field
)));
}
let sum: f64 = numbers.iter().sum();
let count = numbers.len() as f64;
let mean = sum / count;
let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
ctx.set(
"data.statistics",
OwnedDataValue::from(&json!({
"count": count,
"sum": sum,
"mean": mean,
"min": min,
"max": max,
})),
);
Ok(TaskOutcome::Success)
}
}
}
Best Practices
- Use a typed Input — let serde validate at startup. Reach for
serde_json::Valueonly when the input genuinely is freeform. - Mutate via
ctx.set— it auto-records the audit trail. Reaching intomessage.contextdirectly bypasses change capture. - Return TaskOutcome cleanly —
Successfor the happy path,Status(u16)for HTTP-like codes (5xx pushes aTASK_STATUS_ERRORtomessage.errors()),Skipfor “did nothing, continue”,Haltfor “stop this workflow”. - Use the right error type —
DataflowError::retryablelooks at the variant to decide whether transient errors are worth retrying. - Document — your handler’s
Inputstruct is its contract; docstring it. - Test — drive the handler with
TaskContext::new(&mut message, &datalogic)and assert on the outcome andctx.into_changes().