Integration Functions
The http_call, enrich, and publish_kafka functions provide typed
configuration schemas for the three most common service-layer integration
patterns. Unlike map or validation, they do not ship with a built-in
handler — the actual I/O is provided by your application via
AsyncFunctionHandler.
Why a config schema without an implementation?
The engine itself is I/O-agnostic: it doesn’t bundle an HTTP client, a Kafka producer, or any other transport. But the shape of these integrations is predictable enough that dataflow-rs provides typed config structs so that:
- JSONLogic expressions inside the config (
path_logic,body_logic,key_logic, …) are pre-compiled at engine startup — same fail-loud behaviour asmaprules - Misshapen config fails at
Engine::new(), not at first message - Your handler receives an already-validated
HttpCallConfig/EnrichConfig/PublishKafkaConfig— no per-call JSON parse
How to use them
For each integration variant you want to use, register a handler under the matching name when building the engine:
use dataflow_rs::prelude::*;
use dataflow_rs::HttpCallConfig;
use async_trait::async_trait;
struct HttpCallHandler { /* reqwest::Client, connector registry, etc. */ }
#[async_trait]
impl AsyncFunctionHandler for HttpCallHandler {
type Input = HttpCallConfig;
async fn execute(
&self,
ctx: &mut TaskContext<'_>,
cfg: &HttpCallConfig,
) -> Result<TaskOutcome> {
// Resolve cfg.connector, evaluate cfg.compiled_path_logic if set,
// make the call, merge response into ctx via cfg.response_path…
Ok(TaskOutcome::Success)
}
}
let engine = Engine::builder()
.register("http_call", HttpCallHandler { /* … */ })
.with_workflow(workflow)
.build()?;
Skip the registration step and any workflow that uses these variants will fail
with DataflowError::FunctionNotFound("http_call") at dispatch time.
http_call
Issue an HTTP request and optionally merge the response into the message context.
Configuration
{
"function": {
"name": "http_call",
"input": {
"connector": "user_service",
"method": "GET",
"path_logic": { "cat": ["/users/", {"var": "data.user_id"}] },
"headers": { "X-Request-Id": "abc" },
"response_path": "data.user_profile",
"timeout_ms": 5000
}
}
}
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connector | string | Yes | Named reference resolved by your service layer |
method | string | No | GET (default), POST, PUT, PATCH, DELETE |
path | string | No | Static request path |
path_logic | JSONLogic | No | Computed path; pre-compiled at startup |
headers | object | No | Static request headers |
body | any | No | Static request body |
body_logic | JSONLogic | No | Computed body; pre-compiled at startup |
response_path | string | No | Dot-path to merge response into the message context |
timeout_ms | u64 | No | Request timeout in milliseconds (default: 30000) |
Use path or path_logic, not both. Same for body / body_logic.
enrich
Fetch external data and merge it into the message context at a specified path.
A specialization of http_call aimed at the “look up and attach” pattern.
Configuration
{
"function": {
"name": "enrich",
"input": {
"connector": "customer_lookup",
"method": "GET",
"path_logic": { "cat": ["/customers/", {"var": "data.customer_id"}] },
"merge_path": "data.customer",
"timeout_ms": 5000,
"on_error": "skip"
}
}
}
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connector | string | Yes | Named reference resolved by your service layer |
method | string | No | HTTP method (default GET) |
path | string | No | Static request path |
path_logic | JSONLogic | No | Computed request path |
merge_path | string | Yes | Dot-path where the response is merged into the context |
timeout_ms | u64 | No | Request timeout in milliseconds (default: 30000) |
on_error | "fail" | "skip" | No | Behaviour on lookup failure (default: fail) |
on_error: skip is useful when enrichment is best-effort and an absent
upstream service shouldn’t fail the workflow.
publish_kafka
Emit the message (or a derived value) to a Kafka topic.
Configuration
{
"function": {
"name": "publish_kafka",
"input": {
"connector": "events_cluster",
"topic": "orders.processed",
"key_logic": { "var": "data.order_id" },
"value_logic": { "var": "data" }
}
}
}
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
connector | string | Yes | Named reference resolved by your service layer |
topic | string | Yes | Target Kafka topic |
key_logic | JSONLogic | No | Computed message key |
value_logic | JSONLogic | No | Computed message value (default: serialize the message) |
The handler decides exactly how to render the produced value — for example,
sending the entire message JSON when value_logic is omitted.
Connectors
The connector field is a string that your handler resolves into a concrete
client (HTTP client + base URL, Kafka producer + cluster config, …). The
engine does not interpret it. A typical layout:
struct HttpCallHandler {
connectors: HashMap<String, HttpConnector>, // "user_service" -> &Client + base_url
}
This separation keeps secrets out of workflow JSON and lets you swap endpoints (staging / prod) without touching rule definitions.
Why typed configs matter
Compared to free-form Custom configs:
- Startup-time validation — bad config fails at
Engine::new() - Pre-compiled JSONLogic —
path_logic,body_logic,key_logic,value_logicare all compiled once; the handler readsArc<Logic>from the config and evaluates at zero allocation cost in the hot path - Stable shape — the same config struct is shared by every handler in the ecosystem, so handlers from different crates can be swapped without rewriting workflows