Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Extensibility

Orion integrates with external systems through connectors, exposes custom logic via async function handlers, and supports multiple channel protocols for different ingestion patterns.

Connectors

Connectors are named external service configurations. Secrets stay in connectors, out of your workflows.

Authentication

Three auth schemes are supported:

Auth TypeFieldsExample
bearertoken{ "type": "bearer", "token": "sk-..." }
basicusername, password{ "type": "basic", "username": "user", "password": "pass" }
apikeyheader, key{ "type": "apikey", "header": "X-API-Key", "key": "abc123" }

Header Precedence

When http_call builds a request, headers are applied in this order (later layers override earlier ones):

PrioritySourceExample
1 (lowest)Connector default headers"headers": {"x-source": "orion"} in connector config
2Connector authBearer token, Basic auth, API key
3Default content-typeapplication/json (only when a body is present)
4 (highest)Task-level headers"headers": {"content-type": "text/xml"} in the task input

Task-level headers always win. This means a workflow developer can override content-type, authorization, or any other header set by the connector.

Secret Masking

Sensitive fields (token, password, key, secret, api_key, connection_string) are automatically masked as "******" in all API responses. Secrets are stored but never exposed through the API. Workflows reference connectors by name; they never see or embed actual credentials.

HTTP Connector

REST API calls, webhooks, and external service integration:

{
  "name": "payments-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.stripe.com/v1",
    "auth": { "type": "bearer", "token": "sk-..." },
    "headers": { "x-source": "orion" },
    "retry": { "max_retries": 3, "retry_delay_ms": 1000 },
    "max_response_size": 10485760,
    "allow_private_urls": false
  }
}
FieldDefaultDescription
urlrequiredBase URL for all requests
method""Default HTTP method
headers{}Default headers applied to every request
authnullAuthentication config (bearer, basic, or apikey)
retry3 retries, 1000msRetry with exponential backoff
max_response_size10 MBMaximum response body size to prevent OOM
allow_private_urlsfalseAllow requests to private/internal IPs (SSRF protection)

Kafka Connector

Produce to Kafka topics:

{
  "name": "event-bus",
  "connector_type": "kafka",
  "config": {
    "type": "kafka",
    "brokers": ["kafka1:9092", "kafka2:9092"],
    "topic": "events",
    "group_id": "orion-producer"
  }
}

Use the publish_kafka task function with optional JSONLogic for dynamic keys and values:

{
  "function": {
    "name": "publish_kafka",
    "input": {
      "connector": "event-bus",
      "topic": "processed-orders",
      "key_logic": { "var": "data.order_id" }
    }
  }
}
FieldRequiredDescription
connectorYesKafka connector name
topicYesTarget topic
key_logicNoJSONLogic expression for partition key
value_logicNoJSONLogic expression for message value (default: message.data)

Kafka consumer configuration: map topics to channels in your config file:

[kafka]
enabled = true
brokers = ["localhost:9092"]
group_id = "orion"

[[kafka.topics]]
topic = "incoming-orders"
channel = "orders"

Async channels with protocol: "kafka" can also register topics via the API (DB-driven). Config-file and DB-driven topics are merged; duplicates are deduplicated with config-file entries taking precedence. The consumer restarts automatically on engine reload when the topic set changes.

Metadata injection: Kafka metadata is automatically injected into every message:

FieldDescription
kafka_topicSource topic name
kafka_keyMessage key (if present)
kafka_partitionPartition number
kafka_offsetOffset within partition

Access these in workflows via { "var": "metadata.kafka_topic" }.

Dead letter queue: failed messages are routed to a configurable DLQ topic:

[kafka.dlq]
enabled = true
topic = "orion-dlq"

Consumer settings:

ConfigDefaultDescription
kafka.processing_timeout_ms60000Per-message processing timeout
kafka.max_inflight100Max in-flight messages
kafka.lag_poll_interval_secs30Consumer lag polling interval

Database Connector (SQL)

Parameterized SQL queries against PostgreSQL, MySQL, or SQLite:

{
  "name": "orders-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "postgres://user:pass@db-host:5432/orders",
    "driver": "postgres",
    "max_connections": 10,
    "connect_timeout_ms": 5000,
    "query_timeout_ms": 30000
  }
}
FieldDefaultDescription
connection_stringrequiredDatabase URL (auto-masked in API responses)
driver"postgres"Driver type: postgres, mysql, or sqlite
max_connectionsnullConnection pool max size
connect_timeout_msnullConnection establishment timeout
query_timeout_msnullIndividual query timeout
retry3 retries, 1000msRetry with exponential backoff

Use db_read for SELECT (returns rows as JSON array) and db_write for INSERT/UPDATE/DELETE (returns affected count):

{
  "function": {
    "name": "db_read",
    "input": {
      "connector": "orders-db",
      "query": "SELECT * FROM orders WHERE customer_id = $1",
      "params": [{ "var": "data.customer_id" }],
      "output": "data.orders"
    }
  }
}

Cache Connector

In-memory or Redis cache for lookups, session state, and temporary storage:

{
  "name": "session-cache",
  "connector_type": "cache",
  "config": {
    "type": "cache",
    "backend": "redis",
    "url": "redis://localhost:6379",
    "default_ttl_secs": 300,
    "max_connections": 10
  }
}
FieldDefaultDescription
backendrequired"redis" or "memory"
urlrequired (redis)Redis connection URL
default_ttl_secsnullDefault TTL for cache entries
max_connectionsnullConnection pool max size
retry3 retries, 1000msRetry with exponential backoff

Use cache_read and cache_write in workflows:

{
  "function": {
    "name": "cache_write",
    "input": {
      "connector": "session-cache",
      "key": "session:user123",
      "value": { "var": "data.session" },
      "ttl_secs": 3600
    }
  }
}

Storage Connector (S3/GCS)

S3, GCS, or local filesystem for file operations:

{
  "name": "uploads",
  "connector_type": "storage",
  "config": {
    "type": "storage",
    "provider": "s3",
    "bucket": "my-uploads",
    "region": "us-east-1",
    "base_path": "/data"
  }
}
FieldDefaultDescription
providerrequiredStorage provider: s3, gcs, local
bucketnullBucket or container name
regionnullCloud region
base_pathnullBase path prefix for all operations
retry3 retries, 1000msRetry with exponential backoff

MongoDB Connector (NoSQL)

MongoDB document queries with BSON-to-JSON conversion:

{
  "name": "analytics-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "mongodb://localhost:27017",
    "driver": "mongodb"
  }
}

Use mongo_read in workflows:

{
  "function": {
    "name": "mongo_read",
    "input": {
      "connector": "analytics-db",
      "database": "analytics",
      "collection": "events",
      "filter": { "user_id": { "var": "data.user_id" } },
      "output": "data.events"
    }
  }
}

Custom Functions

Orion provides 8 async function handlers that can be used in workflow tasks:

FunctionDescription
http_callCall external APIs via HTTP connectors
channel_callInvoke another channel’s workflow in-process (no HTTP round-trip)
db_readExecute SELECT queries against SQL connectors
db_writeExecute INSERT/UPDATE/DELETE against SQL connectors
cache_readRead from memory or Redis cache connectors
cache_writeWrite to memory or Redis cache connectors
mongo_readQuery MongoDB collections
publish_kafkaProduce messages to Kafka topics

In addition to async functions, Orion includes a built-in function library for data transformation:

FunctionDescription
parse_jsonParse raw payload into structured data
parse_xmlParse XML payload into structured data
filterFilter arrays using JSONLogic conditions
mapTransform data with field mappings
validationValidate data against JSONLogic rules
logLog data at a specified level

JSONLogic expressions power all conditions and dynamic values. Use { "var": "data.field" } to reference data, { "cat": [...] } for string concatenation, arithmetic operators, and more. Dynamic paths (path_logic) and bodies (body_logic) let you compute URLs and request payloads from message data.

Channel Protocols

Channels support three protocol modes:

REST (Sync)

REST channels define route patterns for RESTful API routing with method and path matching:

{
  "name": "order-detail",
  "channel_type": "sync",
  "protocol": "rest",
  "methods": ["GET", "POST"],
  "route_pattern": "/orders/{order_id}/items/{item_id}",
  "workflow_id": "order-detail-workflow"
}

Path parameters are extracted and injected into the message metadata. Routes are matched by priority (descending) then specificity (segment count).

Simple HTTP (Sync)

Simple HTTP channels are matched by channel name. Requests to /api/v1/data/{channel-name} are routed directly:

{
  "name": "orders",
  "channel_type": "sync",
  "protocol": "http",
  "workflow_id": "order-processing"
}

Kafka (Async)

Kafka channels consume from topics and process messages asynchronously:

{
  "name": "kafka-orders",
  "channel_type": "async",
  "protocol": "kafka",
  "topic": "incoming-orders",
  "consumer_group": "orion-orders",
  "workflow_id": "order-processing"
}

DB-driven Kafka channels are automatically registered as consumers at startup and on engine reload. Add Kafka ingestion via the API without restarting Orion.