Custom Functions
Extend dataflow-rs with your own custom processing logic by implementing the AsyncFunctionHandler trait.
Overview
Custom functions allow you to:
- Add domain-specific processing logic
- Integrate with external systems
- Perform async operations (HTTP, database, etc.)
- Implement complex transformations
Implementing AsyncFunctionHandler
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
AsyncFunctionHandler,
FunctionConfig,
error::Result,
message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;
pub struct MyCustomFunction;
#[async_trait]
impl AsyncFunctionHandler for MyCustomFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Your custom logic here
// Access input configuration
let input = &config.input;
// Modify message data
let old_value = message.context["data"]["processed"].clone();
message.context["data"]["processed"] = json!(true);
// Track changes for audit trail
let changes = vec![Change {
path: Arc::from("data.processed"),
old_value: Arc::new(old_value),
new_value: Arc::new(json!(true)),
}];
// Return status code and changes
// 200 = success, 400 = validation error, 500 = execution error
Ok((200, changes))
}
}
}
Registering Custom Functions
#![allow(unused)]
fn main() {
use std::collections::HashMap;
use dataflow_rs::{Engine, Workflow};
use dataflow_rs::engine::AsyncFunctionHandler;
// Create custom functions map
let mut custom_functions: HashMap<String, Box<dyn AsyncFunctionHandler + Send + Sync>> =
HashMap::new();
// Register your function
custom_functions.insert(
"my_custom_function".to_string(),
Box::new(MyCustomFunction)
);
// Create engine with custom functions
let engine = Engine::new(workflows, Some(custom_functions));
}
Using Custom Functions in Workflows
{
"id": "custom_workflow",
"tasks": [
{
"id": "custom_task",
"function": {
"name": "my_custom_function",
"input": {
"option1": "value1",
"option2": 42
}
}
}
]
}
Accessing Configuration
The config.input contains the function’s input configuration:
#![allow(unused)]
fn main() {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Access input parameters
let option1 = config.input
.get("option1")
.and_then(Value::as_str)
.unwrap_or("default");
let option2 = config.input
.get("option2")
.and_then(Value::as_i64)
.unwrap_or(0);
// Use the parameters...
Ok((200, vec![]))
}
}
Using DataLogic
Evaluate JSONLogic expressions using the provided datalogic instance:
#![allow(unused)]
fn main() {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Get the context for evaluation
let context_arc = message.get_context_arc();
// Compile and evaluate a JSONLogic expression
let logic = json!({"var": "data.input"});
let compiled = datalogic.compile(&logic)?;
let result = datalogic.evaluate(&compiled, context_arc)?;
// Use the result...
Ok((200, vec![]))
}
}
Async Operations
Custom functions support async/await for I/O operations:
#![allow(unused)]
fn main() {
use reqwest;
pub struct HttpFetchFunction;
#[async_trait]
impl AsyncFunctionHandler for HttpFetchFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
let url = config.input
.get("url")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation("Missing url".to_string()))?;
// Make HTTP request
let response = reqwest::get(url)
.await
.map_err(|e| DataflowError::ExecutionError(e.to_string()))?;
let data: Value = response.json()
.await
.map_err(|e| DataflowError::ExecutionError(e.to_string()))?;
// Store result
let old_value = message.context["data"]["fetched"].clone();
message.context["data"]["fetched"] = data.clone();
let changes = vec![Change {
path: Arc::from("data.fetched"),
old_value: Arc::new(old_value),
new_value: Arc::new(data),
}];
Ok((200, changes))
}
}
}
Error Handling
Return appropriate errors for different failure modes:
#![allow(unused)]
fn main() {
use dataflow_rs::engine::error::DataflowError;
async fn execute(&self, ...) -> Result<(usize, Vec<Change>)> {
// Validation error
if some_validation_fails {
return Err(DataflowError::Validation("Invalid input".to_string()));
}
// Execution error
if some_operation_fails {
return Err(DataflowError::ExecutionError("Operation failed".to_string()));
}
// Or return status codes without error
// 400 for validation failure (errors added to message)
// 500 for execution failure
Ok((200, vec![]))
}
}
Complete Example
#![allow(unused)]
fn main() {
use async_trait::async_trait;
use dataflow_rs::engine::{
AsyncFunctionHandler, FunctionConfig,
error::{DataflowError, Result},
message::{Change, Message}
};
use datalogic_rs::DataLogic;
use serde_json::{json, Value};
use std::sync::Arc;
/// Calculates statistics from numeric array data
pub struct StatisticsFunction;
#[async_trait]
impl AsyncFunctionHandler for StatisticsFunction {
async fn execute(
&self,
message: &mut Message,
config: &FunctionConfig,
_datalogic: Arc<DataLogic>,
) -> Result<(usize, Vec<Change>)> {
// Get the field to analyze
let field = config.input
.get("field")
.and_then(Value::as_str)
.ok_or_else(|| DataflowError::Validation(
"Missing 'field' in config".to_string()
))?;
// Get the array from message
let data = message.context["data"]
.get(field)
.and_then(Value::as_array)
.ok_or_else(|| DataflowError::Validation(
format!("Field '{}' is not an array", field)
))?;
// Calculate statistics
let numbers: Vec<f64> = data.iter()
.filter_map(|v| v.as_f64())
.collect();
if numbers.is_empty() {
return Err(DataflowError::Validation(
"No numeric values found".to_string()
));
}
let sum: f64 = numbers.iter().sum();
let count = numbers.len() as f64;
let mean = sum / count;
let min = numbers.iter().cloned().fold(f64::INFINITY, f64::min);
let max = numbers.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
// Build result
let stats = json!({
"count": count,
"sum": sum,
"mean": mean,
"min": min,
"max": max
});
// Store in message
let old_value = message.context["data"]["statistics"].clone();
message.context["data"]["statistics"] = stats.clone();
message.invalidate_context_cache();
let changes = vec![Change {
path: Arc::from("data.statistics"),
old_value: Arc::new(old_value),
new_value: Arc::new(stats),
}];
Ok((200, changes))
}
}
}
Best Practices
- Track Changes - Record all modifications for audit trail
- Validate Input - Check configuration before processing
- Handle Errors - Return appropriate error types
- Invalidate Cache - Call
message.invalidate_context_cache()after modifications - Document - Add clear documentation for your function
- Test - Write unit tests for your custom functions