Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Function Reference

A workflow is an ordered list of tasks, and every task invokes one built-in function with an input object:

{
  "id": "enrich",
  "name": "Look up customer",
  "function": {
    "name": "http_call",
    "input": { "connector": "crm", "path": "/customers/42", "response_path": "data.customer" }
  }
}

Functions read from and write to the data context — the JSON document that flows through the pipeline. By convention the request body is parsed into data.* by parse_json, later tasks read data.* in their JSONLogic, and the data object is what a sync channel returns. See the Workflow Reference for the context model and how condition expressions are evaluated.

Orion ships 16 functions (plus validate, an alias for validation). Eight are contributed by the dataflow-rs engine; eight are Orion handlers that talk to connectors or compose channels.

FunctionCategoryConnectorPurpose
parse_jsonDataParse the raw payload into the data context
parse_xmlDataParse an XML payload into the data context
mapDataTransform/reshape data with JSONLogic
filterDataGate the pipeline on a JSONLogic condition
validationDataCollect validation errors from JSONLogic rules
logDataEmit a structured log line
publish_jsonDataSerialize a context field to a JSON string
publish_xmlDataSerialize a context field to an XML string
http_callConnectorHTTPCall an external API with retry + circuit breaker
db_readConnectorSQLRun a SELECT, return rows as JSON
db_writeConnectorSQLRun INSERT/UPDATE/DELETE, return affected count
cache_readConnectorCacheRead a value from Redis or the in-memory cache
cache_writeConnectorCacheWrite a value to cache with optional TTL
mongo_readConnectorMongoDBRun find(), return documents as JSON
publish_kafkaConnectorKafkaPublish a message to a Kafka topic
channel_callCompositionInvoke another channel’s workflow in-process

Wherever an input field is described as JSONLogic, you pass a JSONLogic expression that is evaluated against the data context. A plain JSON literal (string, number, object) is also valid JSONLogic and evaluates to itself.


Data functions

These come from the dataflow-rs engine. Orion does not input-schema-validate them at workflow-create time (unlike the connector functions below), so an invalid input here surfaces at execution time rather than on create.

parse_json

Reads a raw value (typically the request payload) and parses it as JSON into the data context. Almost every workflow starts with this — without it, task conditions that reference data.* see an empty context.

FieldTypeRequiredDefaultDescription
sourcestringyesWhere to read the raw value from, e.g. "payload"
targetstringyesField name under data; the parsed value is stored at data.{target}
{ "name": "parse_json", "input": { "source": "payload", "target": "order" } }

parse_xml

Same input shape as parse_json, but parses an XML payload into a JSON structure at data.{target}.

FieldTypeRequiredDefaultDescription
sourcestringyesWhere to read the raw XML from, e.g. "payload"
targetstringyesStored at data.{target}
{ "name": "parse_xml", "input": { "source": "payload", "target": "order" } }

map

Applies an ordered list of JSONLogic expressions, writing each result to a dotted path in the context. The primary tool for reshaping, computing, and enriching data.

FieldTypeRequiredDefaultDescription
mappingsarrayyesOrdered list of { "path", "logic" } entries
mappings[].pathstringyesDotted target path, e.g. "data.order.total"
mappings[].logicJSONLogicyesExpression whose result is written to path
{
  "name": "map",
  "input": {
    "mappings": [
      { "path": "data.order.flagged", "logic": true },
      { "path": "data.order.total_with_tax", "logic": { "*": [{ "var": "data.order.total" }, 1.1] } }
    ]
  }
}

filter

Evaluates a JSONLogic condition. If it is truthy the pipeline continues; otherwise the on_reject action is taken.

FieldTypeRequiredDefaultDescription
conditionJSONLogicyesEvaluated against the data context
on_rejectstringno"halt""halt" stops the whole workflow; "skip" skips only this task
{
  "name": "filter",
  "input": {
    "condition": { ">": [{ "var": "data.order.total" }, 0] },
    "on_reject": "halt"
  }
}

validation / validate

Evaluates a list of rules. Each rule’s logic must evaluate to exactly true; any other result records the rule’s message in the response’s error list. Validation is non-destructive — it never mutates the data context. validate is an accepted alias for validation.

FieldTypeRequiredDefaultDescription
rulesarrayyesList of { "logic", "message" } rules
rules[].logicJSONLogicyesMust evaluate to true to pass
rules[].messagestringyesError message recorded when the rule fails
{
  "name": "validation",
  "input": {
    "rules": [
      { "logic": { "!!": [{ "var": "data.order.customer_id" }] }, "message": "customer_id is required" },
      { "logic": { ">": [{ "var": "data.order.total" }, 0] },        "message": "total must be positive" }
    ]
  }
}

log

Emits a structured log line. message is a JSONLogic expression (a plain string is valid), and fields attaches additional JSONLogic-derived key/values.

FieldTypeRequiredDefaultDescription
messageJSONLogicyesThe log message (string literal or expression)
levelstringno"info"trace | debug | info | warn | error
fieldsobjectno{}Map of name → JSONLogic expression, logged as structured fields
{
  "name": "log",
  "input": {
    "level": "info",
    "message": "Order processed",
    "fields": { "order_id": { "var": "data.order.id" } }
  }
}

publish_json

Serializes a field inside the data context to a JSON string and stores it at another field. (It writes back into the context; it does not publish to an external system.)

FieldTypeRequiredDefaultDescription
sourcestringyesField under data to serialize, e.g. "order" (reads data.order)
targetstringyesField under data to receive the serialized string
prettyboolnofalsePretty-print the JSON output
{ "name": "publish_json", "input": { "source": "order", "target": "order_json", "pretty": true } }

publish_xml

Like publish_json, but serializes to an XML string.

FieldTypeRequiredDefaultDescription
sourcestringyesField under data to serialize
targetstringyesField under data to receive the XML string
root_elementstringno"root"Name of the XML root element
{ "name": "publish_xml", "input": { "source": "order", "target": "order_xml", "root_element": "Order" } }

Connector functions

These reference a connector by name — credentials and endpoints live in the connector, not the workflow. Orion validates their input at workflow create/update time and exposes the schema via GET /api/v1/admin/functions. Every connector call is wrapped in a circuit breaker.

http_call

Makes an HTTP request through an HTTP connector, with retry and circuit-breaker support. The connector supplies the base URL and auth.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the HTTP connector
methodstringno"GET"GET | POST | PUT | PATCH | DELETE
pathstringnoPath appended to the connector’s base URL
path_logicJSONLogicnoCompute the path dynamically (use instead of path)
headersobjectno{}Extra request headers (string → string)
bodyanynoStatic request body (serialized as JSON)
body_logicJSONLogicnoCompute the body dynamically (use instead of body)
response_pathstringnoDotted path where the response body is written; omit to discard it
timeout_msnumberno30000Per-request timeout in milliseconds
{
  "name": "http_call",
  "input": {
    "connector": "payment-api",
    "method": "POST",
    "path": "/charge",
    "body_logic": { "var": "data.payment" },
    "response_path": "data.charge_result",
    "timeout_ms": 5000
  }
}

db_read

Runs a SELECT against a SQL connector and writes the result rows as a JSON array. Use placeholders bound from params? for SQLite/MySQL, $1, $2, … for PostgreSQL.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the SQL connector
querystringyesSELECT statement with bind placeholders
paramsarraynoValues bound to the placeholders, in order
outputstringno"data"Dotted path where the row array is written
{
  "name": "db_read",
  "input": {
    "connector": "primary-db",
    "query": "SELECT id, name, tier FROM customers WHERE id = ?",
    "params": [{ "var": "data.order.customer_id" }],
    "output": "data.customer"
  }
}

db_write

Runs an INSERT/UPDATE/DELETE against a SQL connector and writes { "rows_affected": N }.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the SQL connector
querystringyesINSERT/UPDATE/DELETE statement with bind placeholders
paramsarraynoValues bound to the placeholders, in order
outputstringno"data"Dotted path where { "rows_affected": N } is written
{
  "name": "db_write",
  "input": {
    "connector": "primary-db",
    "query": "INSERT INTO orders (id, total) VALUES (?, ?)",
    "params": [{ "var": "data.order.id" }, { "var": "data.order.total" }],
    "output": "data.write_result"
  }
}

cache_read

Reads a key from a cache connector (Redis or the built-in in-memory backend). Missing keys yield null.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the cache connector
keystringyesCache key to read
outputstringno"data"Dotted path where the value is written
{ "name": "cache_read", "input": { "connector": "redis", "key": "rate:42", "output": "data.cached" } }

cache_write

Writes a key to a cache connector, optionally with a TTL.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the cache connector
keystringyesCache key to set
valueanyyesValue to store (non-strings are JSON-serialized)
ttl_secsnumbernono expiryTime-to-live in seconds
{ "name": "cache_write", "input": { "connector": "redis", "key": "rate:42", "value": 1, "ttl_secs": 60 } }

mongo_read

Runs a find() against a MongoDB connector and writes the matched documents as a JSON array.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the MongoDB connector
databasestringyesDatabase name
collectionstringyesCollection name
filterobjectno{}MongoDB find filter document
outputstringno"data"Dotted path where matched documents are written
{
  "name": "mongo_read",
  "input": {
    "connector": "mongo",
    "database": "shop",
    "collection": "customers",
    "filter": { "tier": "vip" },
    "output": "data.vips"
  }
}

publish_kafka

Publishes a message to a Kafka topic through a Kafka connector. Requires Kafka to be enabled in config. If value_logic is omitted, the full data context is published.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the Kafka connector
topicstringyesTarget topic
key_logicJSONLogicnoExpression that derives the message key
value_logicJSONLogicnofull dataExpression that derives the message value
{
  "name": "publish_kafka",
  "input": {
    "connector": "events",
    "topic": "order.placed",
    "key_logic": { "var": "data.order.id" },
    "value_logic": { "var": "data.order" }
  }
}

Composition functions

channel_call

Invokes another channel’s workflow in-process — no network hop. The called channel keeps its own versioning and governance. Cycle detection and a max call depth prevent runaway recursion. Provide exactly one of channel/channel_logic and at most one of data/data_logic.

FieldTypeRequiredDefaultDescription
channelstringone of channel/channel_logicStatic target channel name
channel_logicJSONLogicone of channel/channel_logicExpression that resolves to the target channel name
dataanynorequest payloadStatic payload passed to the target channel
data_logicJSONLogicnoExpression that derives the payload
response_pathstringno"data"Dotted path where the called channel’s response is stored
timeout_msnumbernofrom configPer-call timeout in milliseconds
{
  "name": "channel_call",
  "input": {
    "channel": "customer-lookup",
    "data_logic": { "var": "data.order.customer_id" },
    "response_path": "data.customer"
  }
}

Inspecting schemas at runtime

GET /api/v1/admin/functions returns the live input schema for the connector and composition functions (the data functions are provided by dataflow-rs and are not catalogued there). The Orion CLI MCP server surfaces the same schemas to AI assistants so generated workflows use correct field names.