Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Orion is a declarative services runtime written in Rust. Instead of writing, deploying, and operating a microservice for every piece of business logic, you declare what the service should do, and Orion runs it. Architectural governance — observability, rate limiting, circuit breakers, versioning, input validation, and more — is built in.

AI generates workflows, Orion provides the governance. Every service gets health checks, metrics, retries, and error handling, regardless of how the workflow was created.

Replace a sprawl of single-purpose microservices with one runtime: each channel and workflow is independently versioned, testable, and deployable, but they share a single binary and a single set of built-in production features. This site is the deep reference and how-to guide — new here? Install Orion and ship your first service in a couple of minutes.

From zero to a live, governed service — business logic as JSON, deployed over plain HTTP. ▶ Click to play.

Three Primitives

You build services in Orion with three things:

┌─────────────┐       ┌──────────────┐       ┌─────────────┐
│   Channel   │──────▶│   Workflow   │──────▶│  Connector  │
│  (endpoint) │       │   (logic)    │       │  (external) │
└─────────────┘       └──────────────┘       └─────────────┘
PrimitiveWhat it isExample
ChannelA service endpoint: sync (REST, HTTP) or async (Kafka)POST /orders, GET /users/{id}, Kafka topic order.placed
WorkflowA pipeline of tasks that defines what the service doesParse → validate → enrich → transform → respond
ConnectorA named connection to an external system, with auth and retriesStripe API, PostgreSQL, Redis, Kafka cluster

Design-time: define channels, build workflows, configure connectors, test with dry-run, manage versions — all through the admin API. Runtime: Orion routes traffic to channels, executes workflows, calls connectors, and handles observability automatically.

Start here

  • CLI Setup — install Orion and ship your first service in a couple of minutes.
  • MCP Server Setup — give an AI assistant full Orion context so it generates valid workflows.
  • Use Cases & Patterns — complete, tested examples for classification, transformation, routing, and CI/CD.

Build workflows

  • Workflow Reference — the workflow & task JSON schema, conditions, error handling, lifecycle, and rollout.
  • Function Reference — every built-in task function and its exact input schema.
  • Admin API & Data API — the full REST surface for managing and calling services.
  • Configuration — config file, environment variables, database backends, and deployment.

How it works

Architecture Overview

Three Primitives

Services in Orion are composed from three building blocks:

PrimitiveRoleExamples
ChannelService endpoint: sync (REST, HTTP) or async (Kafka)POST /orders, GET /users/{id}, Kafka topic order.placed
WorkflowPipeline of tasks that defines what the service doesParse → validate → enrich → transform → respond
ConnectorNamed connection to an external system with auth and retriesStripe API, PostgreSQL, Redis, Kafka cluster

Channels receive traffic. Workflows process it. Connectors reach out to external systems. Everything else (rate limiting, metrics, circuit breakers, versioning) is handled by the platform.

Deployment Topology

Before Orion

Every piece of business logic is its own service to build, deploy, and operate, each with its own infrastructure stack:

4 services x (code + Dockerfile + CI pipeline + health checks + metrics agent + log agent + sidecar proxy + scaling policy + secret config + canary rollout) = dozens of components to build, wire, and keep running.

After Orion

One Orion instance replaces all four:

No API gateway needed. Governance is built in. One binary to deploy.

The best of both worlds: each channel and workflow is independently versioned, testable, and deployable. The modularity of microservices with the operational simplicity of a monolith.

Deploy Anywhere

Single binary. SQLite by default, no database to provision, no runtime dependencies. Need more scale? Swap to PostgreSQL or MySQL by changing storage.url. No rebuild needed.

Same channel definitions work in any topology: run everything in one instance, split channels across instances with include/exclude filters, or deploy as sidecars.

Request Processing Flow

  1. Route Resolution: REST pattern matching finds the channel, or falls back to name lookup
  2. Channel Registry: enforces deduplication, rate limits, input validation, backpressure, and checks the response cache
  3. Engine: the workflow engine sits behind a double-Arc (Arc<RwLock<Arc<Engine>>>) allowing zero-downtime swaps
  4. Workflow Matcher: evaluates JSONLogic conditions and rollout percentages to pick the right workflow
  5. Task Pipeline: executes functions in order (parse, map, filter, http_call, db_read, etc.)

Sync and Async

Sync     POST /api/v1/data/{channel}         → immediate response
Async    POST /api/v1/data/{channel}/async   → returns trace_id, poll later

REST     GET /api/v1/data/orders/{id}        → matched by route pattern
Kafka    topic: order.placed                 → consumed automatically

Sync channels respond immediately. Async channels return a trace ID; poll GET /api/v1/data/traces/{id} for results. Kafka channels consume from topics configured in the DB or config file.

Bridging is a pattern, not a feature. A sync workflow can publish_kafka and return 202. An async channel picks it up from there.

Service Composition

Most platforms require HTTP calls between services, adding latency, failure modes, and serialization overhead. Orion’s channel_call invokes another channel’s workflow in-process with zero network round-trip:

POST /orders (order-processing workflow)
  ├── parse_json         → extract order data
  ├── channel_call       → "inventory-check" channel (in-process)
  ├── channel_call       → "customer-lookup" channel (in-process)
  ├── map                → compute pricing with enriched data
  └── publish_json       → return combined result

Each composed channel has its own workflow, versioning, and governance, but calls between them are function calls, not network hops. Cycle detection prevents infinite recursion.

Built-in Task Functions

Two sources contribute task functions, all compiled into every binary:

From dataflow-rs 3.0 (workflow primitives):

FunctionDescription
parse_jsonParse payload into the data context
parse_xmlParse XML payloads into structured JSON
filterAllow or halt processing based on JSONLogic conditions
mapTransform and reshape JSON using JSONLogic expressions
validationEnforce required fields and constraints
publish_json / publish_xmlSerialize data to JSON or XML output
logEmit structured log entries

From Orion (connector- and channel-backed handlers):

FunctionDescription
http_callInvoke downstream APIs via an HTTP connector
channel_callInvoke another channel’s workflow in-process
db_read / db_writeExecute SQL queries against a SQL connector, return rows / affected count
cache_read / cache_writeRead/write to an in-memory or Redis cache connector
mongo_readQuery MongoDB collections via a MongoDB connector
publish_kafkaPublish messages via a Kafka connector

The Orion handlers have machine-readable input schemas surfaced at GET /api/v1/admin/functions, and workflow create/update calls validate function.input against those schemas with field-pathed FieldErrors before the workflow can be activated.

Architectural Characteristics

Orion provides production-grade capabilities across eight architectural dimensions. Each subcategory below links to its detailed documentation.

C Creational · S Structural · B Behavioral

Click a node to expand its capabilities. Click any leaf node to jump to its documentation.


Observability — S

AreaCapabilities
Structured LoggingJSON & pretty-print formats · Configurable log levels · Per-request context · Per-crate filtering
Prometheus MetricsRequest counters & error rates · Latency histograms · Circuit breaker metrics · Rate limit rejections
Distributed TracingW3C Trace Context · OpenTelemetry OTLP export · Configurable sampling rate · Per-task span tracking
Health MonitoringComponent-level health checks · Automatic degradation · Request ID propagation · Kubernetes liveness & readiness probes

Resilience — S

AreaCapabilities
Circuit BreakersLock-free state machine · Per-connector isolation · Auto-recovery after cooldown · Admin API to inspect & reset
Retry & BackoffExponential backoff (capped 60 s) · Configurable max retries · Retryable error detection
TimeoutsPer-channel enforcement · Workflow execution limits · Per-connector query timeout
Fault ToleranceGraceful shutdown (SIGTERM/SIGINT) · Connection draining · Dead letter queue with retry · Panic recovery middleware

Security — B

AreaCapabilities
Secret ManagementAuto-masked API responses · Credential isolation via connectors
Input ValidationPer-channel JSONLogic rules · Payload size limits · Header & query param access
Network SecuritySSRF protection (private IP blocking) · TLS/HTTPS support · Security headers (CSP, X-Frame-Options)
Access ControlAdmin API authentication · Per-channel CORS enforcement · Origin allowlist
Data SafetyParameterized SQL queries · Injection protection · URL validation

Scalability — C

AreaCapabilities
Rate LimitingToken bucket algorithm · Per-client keying via JSONLogic · Platform & per-channel limits
BackpressureSemaphore concurrency limits · 503 load shedding · Per-channel configuration
Async ProcessingMulti-worker trace queue · Bounded buffer channels · DLQ retry processor
Horizontal ScalingStateless instances · Channel include/exclude filters · Multi-database backends

Deployability — C

AreaCapabilities
PackagingSingle binary · SQLite, PostgreSQL, MySQL · Minimal footprint
ContainerizationMulti-stage Docker build · Non-root execution · Built-in health probes
ConfigurationTOML + env var overrides · Sensible defaults · Runtime configuration
DistributionHomebrew tap · Shell & PowerShell installers · Multi-platform binaries

Extensibility — S

AreaCapabilities
ConnectorsHTTP & Webhooks · Kafka pub/sub · Database (SQL) · Cache (Memory & Redis) · Storage (S3/GCS) · MongoDB (NoSQL)
Custom FunctionsAsync function handlers · Built-in function library · JSONLogic expressions
Channel ProtocolsREST with route matching (sync) · Simple HTTP (sync) · Kafka (async)

Availability — C

AreaCapabilities
Hot-ReloadZero-downtime engine swap · Channel registry rebuild · Kafka consumer restart
Canary RolloutsPercentage-based traffic split · Gradual migration · Instant rollback
VersioningDraft / Active / Archived lifecycle · Multi-version history · Workflow import & export
PerformanceResponse caching · Request deduplication · Connection pool caching

Maintainability — B

AreaCapabilities
Admin APIsFull CRUD for all entities · Version management · Engine control · OpenAPI / Swagger UI
CI/CD IntegrationBulk import & export · Pre-deploy validation · GitOps-friendly
TestingDry-run execution · Workflow validation · Step-by-step traces
OperationsAudit logging · Database backup & restore · Config validation CLI

Observability

Orion provides structured logging, Prometheus metrics, distributed tracing, and health monitoring out of the box. No sidecars, no agents. Everything runs inside the single binary.

Structured Logging

Orion emits structured logs in JSON or pretty-printed format, configurable at runtime:

[logging]
level = "info"        # trace, debug, info, warn, error
format = "pretty"     # pretty or json

JSON format is recommended for production. It integrates directly with log aggregators like Loki, Datadog, or CloudWatch:

ORION_LOGGING__FORMAT=json
ORION_LOGGING__LEVEL=info

Per-crate filtering with RUST_LOG gives fine-grained control:

RUST_LOG=orion=debug,tower_http=warn,sqlx=warn
LevelUsage
errorFailures that need attention
warnDegraded behavior (circuit breakers, retries)
infoRequest lifecycle, engine reloads, startup/shutdown
debugDetailed processing, SQL queries, connector calls
traceFine-grained internal state

Every request carries a UUID x-request-id header. Pass your own or let Orion generate one. The ID propagates through logs and responses for end-to-end correlation.

Prometheus Metrics

Enable metrics and scrape at GET /metrics (Prometheus text format):

[metrics]
enabled = true
MetricTypeLabelsDescription
messages_totalCounterchannel, statusTotal messages processed
message_duration_secondsHistogramchannelProcessing latency
active_workflowsGaugeWorkflows loaded in engine
errors_totalCountertypeErrors encountered
http_requests_totalCountermethod, path, statusHTTP requests served
http_request_duration_secondsHistogrammethod, path, statusHTTP request latency
db_query_duration_secondsHistogramoperationDatabase query latency
engine_reloads_totalCounterstatusEngine reload events
engine_reload_duration_secondsHistogramEngine reload latency
circuit_breaker_trips_totalCounterconnector, channelCircuit breaker trip events
circuit_breaker_rejections_totalCounterconnector, channelRequests rejected by open breakers
channel_executions_totalCounterchannelChannel invocations
rate_limit_rejections_totalCounterclientRate-limited requests

Distributed Tracing

Enable OpenTelemetry trace export with OTLP gRPC:

[tracing]
enabled = true
otlp_endpoint = "http://localhost:4317"
service_name = "orion"
sample_rate = 1.0    # 0.0 (none) to 1.0 (all)
  • W3C Trace Context extraction and propagation: incoming traceparent headers are respected
  • Per-request spans with channel, workflow, and task attributes
  • OTLP gRPC export to Jaeger, Tempo, or any compatible collector
  • Configurable sampling rate for production use
  • Trace context injected into outbound http_call requests for full distributed traces

Health Monitoring

Orion exposes three health endpoints for different operational needs.

Component health: GET /health returns component-level status with automatic degradation detection:

{
  "status": "ok",
  "version": "0.2.0",
  "uptime_seconds": 3600,
  "workflows_loaded": 42,
  "components": {
    "database": "ok",
    "engine": "ok"
  }
}

The health check tests the database with SELECT 1 and verifies engine availability with a configurable lock timeout. If either check fails, the endpoint returns 503 Service Unavailable with "status": "degraded".

Kubernetes probes:

EndpointPurposeBehavior
GET /healthzLiveness probeAlways returns 200. If the process is running, it’s alive
GET /readyzReadiness probeReturns 200 only when DB is reachable, engine is loaded, and startup is complete; 503 otherwise
livenessProbe:
  httpGet:
    path: /healthz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10
readinessProbe:
  httpGet:
    path: /readyz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5

Engine status: GET /api/v1/admin/engine/status returns a detailed breakdown:

{
  "version": "0.2.0",
  "uptime_seconds": 3600,
  "workflows_count": 42,
  "active_workflows": 38,
  "channels": ["orders", "events", "alerts"]
}

Resilience

Orion protects your services from cascading failures, transient errors, and overload with circuit breakers, automatic retries, timeouts, and graceful degradation, all built into the runtime.

Circuit Breakers

Every connector gets automatic circuit breaker protection. When failures exceed a threshold, the breaker opens and short-circuits requests to prevent cascading failures.

StateBehavior
ClosedNormal operation; requests flow through
OpenRequests rejected immediately (503) after failure threshold exceeded
Half-OpenAfter cooldown, one probe request allowed to test recovery

The circuit breaker uses a lock-free state machine with per-connector isolation. Configure globally:

[engine.circuit_breaker]
enabled = true
failure_threshold = 5          # Failures before tripping the breaker
recovery_timeout_secs = 30     # Cooldown before half-open probe
max_breakers = 10000           # Max breaker instances (LRU eviction)

Inspect and reset breakers via the admin API:

# List all circuit breaker states
curl -s http://localhost:8080/api/v1/admin/connectors/circuit-breakers

# Reset a specific breaker
curl -s -X POST http://localhost:8080/api/v1/admin/connectors/circuit-breakers/{key}

Retry-Backoff

All HTTP connectors support automatic retries with exponential backoff, capped at 60 seconds:

{
  "retry": {
    "max_retries": 5,
    "retry_delay_ms": 500
  }
}

Delay doubles on each retry: 500ms → 1s → 2s → 4s → … → capped at 60s.

Retries are configured per-connector, so each external service can have its own retry policy. The mechanism automatically detects retryable errors (network failures, 5xx responses) and skips non-retryable ones (4xx client errors).

All connector types (HTTP, DB, Cache, MongoDB, Storage) support the same retry configuration.

Timeouts

Timeouts are enforced at multiple levels to prevent runaway requests:

Per-channel timeout: set in the channel’s config_json to limit workflow execution time:

{
  "timeout_ms": 5000
}

If the workflow exceeds this limit, the request returns 504 Gateway Timeout.

Per-connector query timeout: for database connectors:

{
  "query_timeout_ms": 30000,
  "connect_timeout_ms": 5000
}

Global HTTP timeout: for the shared HTTP client used by http_call:

[engine]
global_http_timeout_secs = 30

Engine lock timeouts: prevent health checks and reloads from blocking indefinitely:

[engine]
health_check_timeout_secs = 2
reload_timeout_secs = 10

Fault Tolerance

Graceful shutdown: Orion handles SIGTERM and SIGINT with a controlled shutdown sequence:

  1. HTTP server stops accepting new connections
  2. In-flight requests drain (configurable via shutdown_drain_secs, default 30s)
  3. Kafka consumer (if enabled) is signaled to stop
  4. Trace cleanup task is stopped
  5. DLQ retry consumer is stopped
  6. Async trace queue drains with timeout
  7. OpenTelemetry spans are flushed (if enabled)
  8. Process exits

Dead letter queue: failed async traces are stored in the trace_dlq database table with automatic retry:

[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30

For Kafka, failed messages can also be routed to a configurable DLQ topic:

[kafka.dlq]
enabled = true
topic = "orion-dlq"

Fault-tolerant pipelines: set continue_on_error: true on a workflow to keep the task pipeline running even if individual tasks fail. Errors are collected in the response rather than halting execution:

{
  "status": "ok",
  "data": { "req": { "action": "test-call" } },
  "errors": [
    { "code": "TASK_ERROR", "task_id": "call", "message": "HTTP request failed..." }
  ]
}

Panic recovery: the outermost middleware layer (CatchPanicLayer) catches panics in any handler, returning a 500 response instead of crashing the process.

Security

Orion enforces security at every layer: secrets are isolated in connectors, inputs are validated before processing, network requests are checked for SSRF, and admin endpoints are protected by authentication.

Secret Management

Sensitive fields are automatically masked in all API responses. Fields named token, password, key, secret, api_key, and connection_string are returned as "******". Secrets are stored but never exposed through the API.

# Create a connector with real credentials
curl -s -X POST http://localhost:8080/api/v1/admin/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "payments-api",
    "connector_type": "http",
    "config": {
      "type": "http",
      "url": "https://api.stripe.com/v1",
      "auth": { "type": "bearer", "token": "sk-live-secret-token" }
    }
  }'

# Read it back (secrets are masked)
curl -s http://localhost:8080/api/v1/admin/connectors/<id>
# auth.token → "******"

Workflows reference connectors by name ("connector": "payments-api"). They never see or embed actual credentials. This means AI-generated workflows can be safely created and shared without risk of credential exposure.

Input Validation

Each channel can define JSONLogic validation rules evaluated against incoming requests before workflow execution:

{
  "validation_logic": {
    "and": [
      { "!!": [{ "var": "data.order_id" }] },
      { ">": [{ "var": "data.amount" }, 0] }
    ]
  }
}

If validation fails, the request is rejected with 400 Bad Request before any workflow logic runs.

Validation rules have access to:

  • data.*: request body fields
  • headers.*: HTTP headers
  • query.*: query string parameters
  • path.*: path parameters (for REST channels)

Payload size limits are enforced globally to prevent oversized requests:

[ingest]
max_payload_size = 1048576   # 1 MB

Network Security

SSRF protection: HTTP connectors validate URLs to prevent Server-Side Request Forgery. By default, requests to private/internal IP addresses (RFC 1918, loopback, link-local) are blocked:

{
  "name": "external-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.example.com",
    "allow_private_urls": false
  }
}

Set allow_private_urls: true only when calling internal services.

TLS/HTTPS: enable TLS termination in the server:

[server.tls]
enabled = true
cert_path = "cert.pem"
key_path = "key.pem"

Security headers: set on all responses:

HeaderValue
X-Content-Type-Optionsnosniff
X-Frame-OptionsDENY
Content-Security-Policydefault-src 'none'; frame-ancestors 'none'
Referrer-Policystrict-origin-when-cross-origin
Permissions-Policycamera=(), microphone=(), geolocation=()
Strict-Transport-SecuritySet when TLS is enabled

Access Control

Admin API authentication: protect admin endpoints with bearer token or API key:

[admin_auth]
enabled = true
api_keys = ["your-secret-key"]   # Any number of accepted keys; any match authorises a request
# header = "Authorization"       # Bearer format (default)
# header = "X-API-Key"           # Raw key format

When header is "Authorization", the key is expected as Bearer <key>. For any other header name, the raw key value is matched directly.

# Bearer token
curl -H "Authorization: Bearer your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

# API key via custom header
curl -H "X-API-Key: your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

Per-channel CORS: configure allowed origins per channel in config_json:

{
  "cors": {
    "allowed_origins": ["https://app.example.com", "https://admin.example.com"]
  }
}

Global CORS defaults are configured in the server config:

[cors]
allowed_origins = ["*"]    # Global default

Data Safety

Parameterized SQL queries: the db_read and db_write functions use parameterized queries to prevent SQL injection:

{
  "function": {
    "name": "db_read",
    "input": {
      "connector": "orders-db",
      "query": "SELECT * FROM orders WHERE customer_id = $1",
      "params": [{ "var": "data.customer_id" }],
      "output": "data.orders"
    }
  }
}

Values are always passed as parameters, never interpolated into SQL strings.

URL validation: connector URLs are validated at creation time. Combined with SSRF protection, this prevents workflows from making requests to unexpected destinations.

Injection protection: JSONLogic expressions are evaluated in a sandboxed environment. User-supplied data cannot escape the data context or execute arbitrary code.

Scalability

Orion handles high-throughput workloads with token-bucket rate limiting, semaphore-based backpressure, async processing queues, and stateless horizontal scaling, all configurable per channel.

Rate Limiting

Rate limiting operates at two levels: platform-wide (all requests) and per-channel (individual service endpoints).

Platform-level: enable in config:

[rate_limit]
enabled = true
default_rps = 100
default_burst = 50

[rate_limit.endpoints]
admin_rps = 50
data_rps = 200

Per-channel: configure in the channel’s config_json:

{
  "rate_limit": {
    "requests_per_second": 100,
    "burst": 50
  }
}

Rate limiting uses the token bucket algorithm: tokens replenish at the configured rate, and burst allows short spikes above the steady-state limit. When the bucket is empty, requests receive 429 Too Many Requests.

Per-client keying: use JSONLogic to compute rate limit keys from request data, enabling per-user or per-tenant limits:

{
  "rate_limit": {
    "requests_per_second": 10,
    "burst": 5,
    "key_logic": { "var": "headers.x-api-key" }
  }
}

Rate limiter state is per-instance (in-memory). In multi-instance deployments, divide the configured RPS by the number of instances to approximate global limits, or use sticky sessions at the load balancer.

Backpressure

Semaphore-based concurrency limits prevent any single channel from overwhelming the system:

{
  "backpressure": {
    "max_concurrent": 200
  }
}

When all semaphore permits are taken, additional requests receive 503 Service Unavailable immediately. This is load shedding. The system sheds excess load rather than queuing unboundedly, which protects latency for requests that are admitted.

Each channel has its own independent backpressure semaphore, so a spike in one channel doesn’t affect others.

Async Processing

For workloads that don’t need immediate responses, Orion supports async processing via a bounded trace queue:

# Submit for async processing (returns immediately with a trace ID)
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-123" } }'

# Poll for the result
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

The queue is backed by tokio::sync::mpsc channels with configurable concurrency:

[queue]
workers = 4                       # Concurrent trace workers
buffer_size = 1000                # Channel buffer for pending traces
processing_timeout_ms = 60000     # Per-trace processing timeout
max_result_size_bytes = 1048576   # Max size of trace result (1 MB)
max_queue_memory_bytes = 104857600  # Max memory for queued traces (100 MB)

Failed traces go to the dead letter queue with automatic retry:

[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30

Completed traces are cleaned up automatically based on retention policy:

[queue]
trace_retention_hours = 72
trace_cleanup_interval_secs = 3600

Horizontal Scaling

Orion is designed for single-instance simplicity with multi-instance capability. Each instance is stateless; all persistent data lives in the shared database.

What works across instances:

ComponentHow It Works
DatabaseAll instances share the same database (PostgreSQL or MySQL recommended)
Kafka consumersConsumer groups handle partition assignment automatically
TracesStored in the shared database; queries return consistent results
Workflows & ChannelsDefinitions live in the database; all instances load the same set
Audit logsStored in the shared database regardless of which instance handles the request

Per-Instance State

The following components use in-memory state that is local to each instance:

ComponentImpactWorkaround
Rate Limiting3 instances at 100 RPS = 300 RPS effective global limitSticky sessions; divide configured RPS by instance count
Request DeduplicationSame idempotency key on two instances → processed twiceSticky sessions, or Redis-backed dedup store
Response CachingLower cache hit rates (each instance has a cold cache)Sticky sessions, or Redis-backed cache connector
Circuit BreakersOne instance may trip while others keep sendingAcceptable; monitor /health on each instance
Engine StatePOST /admin/engine/reload only reloads the receiving instanceScript reload to hit all instances (see below)

Reload all instances:

for host in $INSTANCE_HOSTS; do
  curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
    -H "Authorization: Bearer $API_KEY"
done

Alternatively, use a rolling restart strategy with your orchestrator (e.g., Kubernetes rolling deployment).

Topology Control

Use channel include/exclude filters to run different Orion instances for different channel groups:

# Instance A: order processing
[channels]
include = ["orders.*", "payments.*"]

# Instance B: analytics and reporting
[channels]
include = ["analytics.*", "reports.*"]

This enables microservice-style deployment where each instance handles a subset of channels, all sharing the same database.

Database Backend Recommendations

BackendSingle InstanceMultiple InstancesNotes
SQLiteRecommendedNot recommendedWAL mode supports concurrent reads but only one writer. File-based, cannot be shared across hosts.
PostgreSQLSupportedRecommendedFull multi-connection support. Use connection pooling (PgBouncer) for many instances.
MySQLSupportedSupportedEnsure READ-COMMITTED isolation for best concurrency.

For multi-instance deployments, use PostgreSQL with connection pooling (PgBouncer). Script engine reloads to broadcast to all instances after workflow or channel changes.

Deployability

Orion ships as a single binary with embedded migrations, sensible defaults, and multiple installation methods. No runtime dependencies. You’re up and running immediately.

Packaging

Orion compiles into a single binary (orion-server) with everything built in:

  • All three database backends (SQLite, PostgreSQL, MySQL) with embedded migrations
  • Kafka producer and consumer
  • OpenTelemetry trace export
  • TLS termination
  • Swagger UI at /docs

No feature flags, no plugins, no shared libraries to manage. The binary auto-detects the database backend from the URL scheme at startup.

CapabilityConfigurationDefault
Database backendstorage.url schemeSQLite
Kafkakafka.enabledDisabled
OpenTelemetrytracing.enabledDisabled
TLS/HTTPSserver.tls.enabledDisabled
Swagger UIAlways at /docsEnabled
Metricsmetrics.enabledDisabled

With the default SQLite backend, there are zero external dependencies.

Containerization

Orion uses a multi-stage Docker build for minimal image size:

docker build -t orion .

The Dockerfile uses rust:1.93-slim for building and debian:trixie-slim for the runtime image. Key characteristics:

  • Non-root execution: the container runs as a non-root user
  • Built-in health probes: /healthz (liveness) and /readyz (readiness) are available without additional configuration

Docker with SQLite persistence: SQLite stores data in a local file. Without a persistent volume, data is lost when the container restarts:

docker run -p 8080:8080 \
  -v orion-data:/app/data \
  -e ORION_STORAGE__URL=sqlite:/app/data/orion.db \
  orion

With Docker Compose:

services:
  orion:
    image: orion
    ports:
      - "8080:8080"
    environment:
      ORION_STORAGE__URL: sqlite:/app/data/orion.db
    volumes:
      - orion-data:/app/data

volumes:
  orion-data:

For production, PostgreSQL or MySQL is recommended. Point ORION_STORAGE__URL to your database server.

Configuration

All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.

TOML config file: pass with -c:

orion-server -c config.toml

Environment variable overrides: use double-underscore nesting to override any setting:

ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEY="your-secret-key"

Environment variables take precedence over the config file, making it easy to customize per environment without changing files.

Runtime configuration: channels carry their own runtime config (rate limits, timeouts, CORS, validation) via config_json, changeable through the admin API without restarts.

Distribution

Orion is available through multiple installation methods:

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-server

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh

PowerShell installer (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"

Docker:

docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest

From source:

cargo install --git https://github.com/GoPlasmatic/Orion

Multi-platform binaries are published for Linux (x86_64, aarch64), macOS (x86_64, aarch64), and Windows (x86_64).

Extensibility

Orion integrates with external systems through connectors, exposes custom logic via async function handlers, and supports multiple channel protocols for different ingestion patterns.

Connectors

Connectors are named external service configurations. Secrets stay in connectors, out of your workflows.

Authentication

Three auth schemes are supported:

Auth TypeFieldsExample
bearertoken{ "type": "bearer", "token": "sk-..." }
basicusername, password{ "type": "basic", "username": "user", "password": "pass" }
apikeyheader, key{ "type": "apikey", "header": "X-API-Key", "key": "abc123" }

Header Precedence

When http_call builds a request, headers are applied in this order (later layers override earlier ones):

PrioritySourceExample
1 (lowest)Connector default headers"headers": {"x-source": "orion"} in connector config
2Connector authBearer token, Basic auth, API key
3Default content-typeapplication/json (only when a body is present)
4 (highest)Task-level headers"headers": {"content-type": "text/xml"} in the task input

Task-level headers always win. This means a workflow developer can override content-type, authorization, or any other header set by the connector.

Secret Masking

Sensitive fields (token, password, key, secret, api_key, connection_string) are automatically masked as "******" in all API responses. Secrets are stored but never exposed through the API. Workflows reference connectors by name; they never see or embed actual credentials.

Pulling Secrets from the Environment

Any string field inside a connector’s config may use an env://VAR_NAME reference instead of a literal value. The resolver runs once when the connector is loaded:

{
  "name": "payments-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.stripe.com/v1",
    "auth": { "type": "bearer", "token": "env://STRIPE_API_KEY" }
  }
}

If STRIPE_API_KEY is not set in the process environment, startup (or the create/update call) fails with a structured error pointing at the field — production credentials never have to be POSTed into the admin API or stored in the database. The same env:// scheme works on every string field in every connector type.

HTTP Connector

REST API calls, webhooks, and external service integration:

{
  "name": "payments-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.stripe.com/v1",
    "auth": { "type": "bearer", "token": "sk-..." },
    "headers": { "x-source": "orion" },
    "retry": { "max_retries": 3, "retry_delay_ms": 1000 },
    "max_response_size": 10485760,
    "allow_private_urls": false
  }
}
FieldDefaultDescription
urlrequiredBase URL for all requests
method""Default HTTP method
headers{}Default headers applied to every request
authnullAuthentication config (bearer, basic, or apikey)
retry3 retries, 1000msRetry with exponential backoff
max_response_size10 MBMaximum response body size to prevent OOM
allow_private_urlsfalseAllow requests to private/internal IPs (SSRF protection)

Kafka Connector

Produce to Kafka topics:

{
  "name": "event-bus",
  "connector_type": "kafka",
  "config": {
    "type": "kafka",
    "brokers": ["kafka1:9092", "kafka2:9092"],
    "topic": "events",
    "group_id": "orion-producer"
  }
}

Use the publish_kafka task function with optional JSONLogic for dynamic keys and values:

{
  "function": {
    "name": "publish_kafka",
    "input": {
      "connector": "event-bus",
      "topic": "processed-orders",
      "key_logic": { "var": "data.order_id" }
    }
  }
}
FieldRequiredDescription
connectorYesKafka connector name
topicYesTarget topic
key_logicNoJSONLogic expression for partition key
value_logicNoJSONLogic expression for message value (default: message.data)

Kafka consumer configuration: map topics to channels in your config file:

[kafka]
enabled = true
brokers = ["localhost:9092"]
group_id = "orion"

[[kafka.topics]]
topic = "incoming-orders"
channel = "orders"

Async channels with protocol: "kafka" can also register topics via the API (DB-driven). Config-file and DB-driven topics are merged; duplicates are deduplicated with config-file entries taking precedence. The consumer restarts automatically on engine reload when the topic set changes.

Metadata injection: Kafka metadata is automatically injected into every message:

FieldDescription
kafka_topicSource topic name
kafka_keyMessage key (if present)
kafka_partitionPartition number
kafka_offsetOffset within partition

Access these in workflows via { "var": "metadata.kafka_topic" }.

Dead letter queue: failed messages are routed to a configurable DLQ topic:

[kafka.dlq]
enabled = true
topic = "orion-dlq"

Consumer settings:

ConfigDefaultDescription
kafka.processing_timeout_ms60000Per-message processing timeout
kafka.max_inflight100Max in-flight messages
kafka.lag_poll_interval_secs30Consumer lag polling interval

Database Connector (SQL)

Parameterized SQL queries against PostgreSQL, MySQL, or SQLite:

{
  "name": "orders-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "postgres://user:pass@db-host:5432/orders",
    "driver": "postgres",
    "max_connections": 10,
    "connect_timeout_ms": 5000,
    "query_timeout_ms": 30000
  }
}
FieldDefaultDescription
connection_stringrequiredDatabase URL (auto-masked in API responses)
driver"postgres"Driver type: postgres, mysql, or sqlite
max_connectionsnullConnection pool max size
connect_timeout_msnullConnection establishment timeout
query_timeout_msnullIndividual query timeout
retry3 retries, 1000msRetry with exponential backoff

Use db_read for SELECT (returns rows as JSON array) and db_write for INSERT/UPDATE/DELETE (returns affected count):

{
  "function": {
    "name": "db_read",
    "input": {
      "connector": "orders-db",
      "query": "SELECT * FROM orders WHERE customer_id = $1",
      "params": [{ "var": "data.customer_id" }],
      "output": "data.orders"
    }
  }
}

Cache Connector

In-memory or Redis cache for lookups, session state, and temporary storage:

{
  "name": "session-cache",
  "connector_type": "cache",
  "config": {
    "type": "cache",
    "backend": "redis",
    "url": "redis://localhost:6379",
    "default_ttl_secs": 300,
    "max_connections": 10
  }
}
FieldDefaultDescription
backendrequired"redis" or "memory"
urlrequired (redis)Redis connection URL
default_ttl_secsnullDefault TTL for cache entries
max_connectionsnullConnection pool max size
retry3 retries, 1000msRetry with exponential backoff

Use cache_read and cache_write in workflows:

{
  "function": {
    "name": "cache_write",
    "input": {
      "connector": "session-cache",
      "key": "session:user123",
      "value": { "var": "data.session" },
      "ttl_secs": 3600
    }
  }
}

Storage Connector (S3/GCS)

S3, GCS, or local filesystem for file operations:

{
  "name": "uploads",
  "connector_type": "storage",
  "config": {
    "type": "storage",
    "provider": "s3",
    "bucket": "my-uploads",
    "region": "us-east-1",
    "base_path": "/data"
  }
}
FieldDefaultDescription
providerrequiredStorage provider: s3, gcs, local
bucketnullBucket or container name
regionnullCloud region
base_pathnullBase path prefix for all operations
retry3 retries, 1000msRetry with exponential backoff

MongoDB Connector (NoSQL)

MongoDB document queries with BSON-to-JSON conversion:

{
  "name": "analytics-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "mongodb://localhost:27017",
    "driver": "mongodb"
  }
}

Use mongo_read in workflows:

{
  "function": {
    "name": "mongo_read",
    "input": {
      "connector": "analytics-db",
      "database": "analytics",
      "collection": "events",
      "filter": { "user_id": { "var": "data.user_id" } },
      "output": "data.events"
    }
  }
}

Custom Functions

Orion provides 8 async function handlers that can be used in workflow tasks:

FunctionDescription
http_callCall external APIs via HTTP connectors
channel_callInvoke another channel’s workflow in-process (no HTTP round-trip)
db_readExecute SELECT queries against SQL connectors
db_writeExecute INSERT/UPDATE/DELETE against SQL connectors
cache_readRead from memory or Redis cache connectors
cache_writeWrite to memory or Redis cache connectors
mongo_readQuery MongoDB collections
publish_kafkaProduce messages to Kafka topics

In addition to the Orion-specific handlers, the dataflow-rs 3.0 engine contributes a built-in function library for parsing, transformation, and output:

FunctionDescription
parse_jsonParse raw payload into structured data
parse_xmlParse XML payload into structured data
filterFilter arrays using JSONLogic conditions
mapTransform data with field mappings
validationValidate data against JSONLogic rules
publish_jsonSerialize the data context into a JSON response body
publish_xmlSerialize the data context into an XML response body
logLog data at a specified level

The Orion-specific handlers have machine-readable input schemas surfaced at GET /api/v1/admin/functions; workflow create/update calls validate function.input against those schemas with field-pathed errors before the workflow can be activated.

JSONLogic expressions power all conditions and dynamic values. Use { "var": "data.field" } to reference data, { "cat": [...] } for string concatenation, arithmetic operators, and more. Dynamic paths (path_logic) and bodies (body_logic) let you compute URLs and request payloads from message data. Under the hood, datalogic-rs 5 compiles each JSONLogic expression once at engine-construction time and evaluates it via arena-mode dispatch, so per-request cost is constant regardless of expression complexity.

Channel Protocols

Channels support three protocol modes:

REST (Sync)

REST channels define route patterns for RESTful API routing with method and path matching:

{
  "name": "order-detail",
  "channel_type": "sync",
  "protocol": "rest",
  "methods": ["GET", "POST"],
  "route_pattern": "/orders/{order_id}/items/{item_id}",
  "workflow_id": "order-detail-workflow"
}

Path parameters are extracted and injected into the message metadata. Routes are matched by priority (descending) then specificity (segment count).

Simple HTTP (Sync)

Simple HTTP channels are matched by channel name. Requests to /api/v1/data/{channel-name} are routed directly:

{
  "name": "orders",
  "channel_type": "sync",
  "protocol": "http",
  "workflow_id": "order-processing"
}

Kafka (Async)

Kafka channels consume from topics and process messages asynchronously:

{
  "name": "kafka-orders",
  "channel_type": "async",
  "protocol": "kafka",
  "topic": "incoming-orders",
  "consumer_group": "orion-orders",
  "workflow_id": "order-processing"
}

DB-driven Kafka channels are automatically registered as consumers at startup and on engine reload. Add Kafka ingestion via the API without restarting Orion.

Availability

Orion supports zero-downtime engine reloads, percentage-based canary rollouts, full version lifecycle management, and response caching, enabling continuous delivery without service interruptions.

Hot-Reload

The engine is held in memory as Arc<RwLock<Arc<Engine>>>. A reload swaps the inner Arc<Engine> while existing readers continue using the old one. Zero dropped requests.

Trigger a reload:

curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload

A reload performs three operations atomically:

  1. Engine swap: rebuilds the engine from all active workflows and channels in the database
  2. Channel registry rebuild: reconstructs the route table, validation logic, rate limiters, backpressure semaphores, dedup stores, and response caches
  3. Kafka consumer restart: if the topic set changed, the Kafka consumer is stopped and restarted with the new topics

Reloads are triggered automatically on status changes (activate/archive) and deletes. Draft creates and updates do not trigger reload.

In multi-instance deployments, reload only affects the instance that receives the request. Script the reload to broadcast to all instances:

for host in $INSTANCE_HOSTS; do
  curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
    -H "Authorization: Bearer $API_KEY"
done

Canary Rollouts

Control traffic exposure for active workflows with rollout percentages:

# Activate a workflow at 10% rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" \
  -d '{"status": "active", "rollout_percentage": 10}'

# Increase rollout to 50%
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
  -H "Content-Type: application/json" \
  -d '{"rollout_percentage": 50}'

# Full rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
  -H "Content-Type: application/json" \
  -d '{"rollout_percentage": 100}'

The rollout percentage determines the probability that incoming requests are matched to this workflow. This enables:

  • Gradual migration: slowly ramp traffic from 0% to 100%
  • A/B testing: run two workflow versions at different percentages
  • Instant rollback: set rollout to 0% or archive the workflow

Versioning

Both workflows and channels follow a draft → active → archived lifecycle with automatic version tracking:

# Create (starts as draft, version 1)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
  -H "Content-Type: application/json" \
  -d '{ "name": "Order Processor", ... }'

# Update (only drafts can be updated)
curl -s -X PUT http://localhost:8080/api/v1/admin/workflows/<id> \
  -H "Content-Type: application/json" -d '{ ... }'

# Activate (loads into engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

# Create new version (new draft from active)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/versions

# Archive (removes from engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" -d '{"status": "archived"}'

All versions are stored with incrementing version numbers. List the version history:

curl -s http://localhost:8080/api/v1/admin/workflows/<id>/versions

Import and export: bulk operations for GitOps and migration:

# Export workflows (as JSON)
curl -s http://localhost:8080/api/v1/admin/workflows/export?status=active

# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
  -H "Content-Type: application/json" -d @workflows.json

Performance

Response caching: cache responses for identical requests to reduce redundant workflow execution:

{
  "cache": {
    "enabled": true,
    "ttl_secs": 60,
    "cache_key_fields": ["data.user_id", "data.action"]
  }
}

Cache keys are computed from the specified fields. Cached responses are returned directly without executing the workflow. The cache backend is in-memory by default; Redis-backed caching is available via a cache connector.

Request deduplication: prevent duplicate processing using idempotency keys:

{
  "deduplication": {
    "header": "Idempotency-Key",
    "retention_secs": 300
  }
}

When a request with the same idempotency key arrives within the retention window, it returns 409 Conflict instead of re-processing.

Connection pool caching: external database and MongoDB connector pools are cached and reused across requests, with configurable pool sizes and idle timeouts:

[engine]
max_pool_cache_entries = 100
cache_cleanup_interval_secs = 60

Maintainability

Orion provides comprehensive admin APIs, CI/CD integration patterns, dry-run testing, and operational tools for managing services in production.

Admin APIs

Full CRUD operations for all entities through a RESTful admin API:

ResourceEndpoints
WorkflowsCreate, read, update, delete, status management, versioning, rollout, dry-run test, import/export, validate
ChannelsCreate, read, update, delete, status management, versioning
ConnectorsCreate, read, update, delete, reload, circuit breaker inspection/reset
EngineStatus, hot-reload
Audit logsList with filtering by action and resource type
BackupDatabase export and restore

Version management: both workflows and channels support the draft → active → archived lifecycle. Filter by status:

curl -s "http://localhost:8080/api/v1/admin/workflows?status=active"
curl -s "http://localhost:8080/api/v1/admin/channels?status=draft"

Engine control:

# Check engine status
curl -s http://localhost:8080/api/v1/admin/engine/status

# Hot-reload after changes
curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload

OpenAPI / Swagger UI: interactive API documentation is always available at /docs, and the OpenAPI 3.0 spec at /api/v1/openapi.json.

CI/CD Integration

Orion workflows are JSON files that version, diff, and review like any other config.

Bulk import and export:

# Export active workflows
curl -s "http://localhost:8080/api/v1/admin/workflows/export?status=active" -o workflows.json

# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
  -H "Content-Type: application/json" -d @workflows.json

Pre-deploy validation: validate workflow structure without creating:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
  -H "Content-Type: application/json" -d @workflow.json

GitOps pipeline: a typical CI/CD flow:

AI generates workflow → commit as JSON → CI validates & dry-runs → review → import → activate

GitHub Actions example:

name: Validate Workflows
on:
  pull_request:
    paths: ['workflows/**/*.json']

jobs:
  validate:
    runs-on: ubuntu-latest
    services:
      orion:
        image: ghcr.io/goplasmatic/orion:latest
        ports: ['8080:8080']
    steps:
      - uses: actions/checkout@v4
      - name: Import and test workflows
        run: |
          for file in workflows/**/*.json; do
            curl -sf -X POST http://localhost:8080/api/v1/admin/workflows \
              -H "Content-Type: application/json" -d @"$file"
          done

Tag-based organization: tag workflows for filtering:

{ "tags": ["fraud", "high-priority", "v2"] }
curl -s "http://localhost:8080/api/v1/admin/workflows?tag=fraud"

Testing

Dry-run execution: test a workflow against sample data without activating it:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/test \
  -H "Content-Type: application/json" \
  -d '{"data": {"amount": 50000, "currency": "USD"}}'

The response includes a full execution trace showing which tasks ran and which were skipped:

{
  "matched": true,
  "trace": {
    "steps": [
      { "task_id": "parse", "result": "executed" },
      { "task_id": "high_risk", "result": "executed" },
      { "task_id": "normal_risk", "result": "skipped" }
    ]
  },
  "output": {
    "txn": { "amount": 50000, "risk_level": "high", "requires_review": true }
  }
}

Workflow validation: check that a workflow definition is structurally valid:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
  -H "Content-Type: application/json" -d @workflow.json

Step-by-step traces: async traces record the full execution path and can be retrieved for debugging:

# Submit async request
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" -d '{ "data": { "order_id": "ORD-123" } }'

# Get trace with execution details
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

Operations

Audit logging: all admin actions are recorded for compliance and debugging:

curl -s http://localhost:8080/api/v1/admin/audit-logs
curl -s "http://localhost:8080/api/v1/admin/audit-logs?action=activate&resource_type=workflow"

Each entry captures: principal, action, resource type, resource ID, details, and timestamp.

Database backup and restore:

# Export backup
curl -s -X POST http://localhost:8080/api/v1/admin/backup -o backup.json

# Restore from backup
curl -s -X POST http://localhost:8080/api/v1/admin/restore \
  -H "Content-Type: application/json" -d @backup.json

Config validation CLI: validate your configuration without starting the server:

orion-server validate-config
orion-server validate-config -c config.toml

Database migrations: run or preview pending migrations:

orion-server migrate              # Run migrations
orion-server migrate --dry-run    # Preview pending migrations

Admin API

All admin endpoints are under /api/v1/admin/. When admin authentication is enabled, requests must include a valid bearer token or API key.

Channels

MethodPathDescription
POST/api/v1/admin/channelsCreate channel (as draft)
GET/api/v1/admin/channelsList channels. Filter with ?status=, ?channel_type=, ?protocol=
GET/api/v1/admin/channels/{id}Get channel by ID
PUT/api/v1/admin/channels/{id}Update draft channel
DELETE/api/v1/admin/channels/{id}Delete channel (all versions)
PATCH/api/v1/admin/channels/{id}/statusChange status (active/archived)
GET/api/v1/admin/channels/{id}/versionsList channel version history
POST/api/v1/admin/channels/{id}/versionsCreate new draft version from active channel
POST/api/v1/admin/channels/importBulk import channels (as drafts). ?dry_run=true validates without writing

Workflows

MethodPathDescription
POST/api/v1/admin/workflowsCreate workflow (as draft; optional id field for custom IDs)
GET/api/v1/admin/workflowsList workflows. Filter with ?tag=, ?status=
GET/api/v1/admin/workflows/{id}Get workflow by ID
PUT/api/v1/admin/workflows/{id}Update draft workflow
DELETE/api/v1/admin/workflows/{id}Delete workflow (all versions)
PATCH/api/v1/admin/workflows/{id}/statusChange status (active/archived)
GET/api/v1/admin/workflows/{id}/versionsList workflow version history
POST/api/v1/admin/workflows/{id}/versionsCreate new draft version from active workflow
PATCH/api/v1/admin/workflows/{id}/rolloutUpdate rollout percentage
POST/api/v1/admin/workflows/{id}/testDry-run on sample payload
POST/api/v1/admin/workflows/importBulk import workflows (as drafts). ?dry_run=true validates without writing
GET/api/v1/admin/workflows/exportExport workflows. Filter with ?tag=, ?status=
POST/api/v1/admin/workflows/validateValidate workflow definition

Connectors

MethodPathDescription
POST/api/v1/admin/connectorsCreate connector. String fields may use env://VAR_NAME to pull values from the process environment
GET/api/v1/admin/connectorsList connectors (secrets masked)
GET/api/v1/admin/connectors/{id}Get connector by ID (secrets masked)
PUT/api/v1/admin/connectors/{id}Update connector
DELETE/api/v1/admin/connectors/{id}Delete connector
POST/api/v1/admin/connectors/importBulk import connectors. ?dry_run=true validates without writing
POST/api/v1/admin/connectors/reloadReload all connectors from DB
GET/api/v1/admin/connectors/circuit-breakersList circuit breaker states
POST/api/v1/admin/connectors/circuit-breakers/{key}Reset a circuit breaker

Engine

MethodPathDescription
GET/api/v1/admin/engine/statusEngine status (version, uptime, workflows count, channels)
POST/api/v1/admin/engine/reloadHot-reload channels and workflows

Functions

MethodPathDescription
GET/api/v1/admin/functionsList every task function with its input-field schema (category, type, required flag, description). Used by CLI tools and IDEs for autocompletion and by workflow validators to give field-pathed errors

Audit Logs

MethodPathDescription
GET/api/v1/admin/audit-logsList audit log entries. Filter with ?action=, ?resource_type=

Backups

MethodPathDescription
POST/api/v1/admin/backupsCreate a database backup (SQLite only — VACUUM INTO a timestamped file in storage.backup_dir)
GET/api/v1/admin/backupsList backup files currently in storage.backup_dir

Lifecycle

Both channels and workflows follow a draft → active → archived lifecycle:

  1. Create: entities are created as draft (not loaded into the engine)
  2. Update: only draft versions can be updated via PUT
  3. Activate: PATCH /status with {"status": "active"} loads the entity into the engine
  4. New version: POST /versions creates a new draft version from the active entity
  5. Archive: PATCH /status with {"status": "archived"} removes from the engine

A channel links to a workflow via workflow_id. Activating a channel makes it available for data processing; activating a workflow makes its logic available to the engine.

Authentication

Admin API endpoints support bearer token or API key authentication when enabled:

# Bearer token (default header: Authorization)
curl -H "Authorization: Bearer your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

# API key via custom header
curl -H "X-API-Key: your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

Configure via [admin_auth] in config or ORION_ADMIN_AUTH__ENABLED=true environment variable.

Error Response Format

All error responses follow a consistent structure:

{
  "error": {
    "code": "NOT_FOUND",
    "message": "Workflow with id '...' not found"
  }
}
CodeHTTP StatusDescription
NOT_FOUND404Resource not found
BAD_REQUEST400Invalid input
UNAUTHORIZED401Missing or invalid credentials
FORBIDDEN403Access denied
CONFLICT409Duplicate or conflicting state
RATE_LIMITED429Too many requests
TIMEOUT504Workflow execution exceeded timeout
SERVICE_UNAVAILABLE503Backpressure or circuit breaker open
UNSUPPORTED_MEDIA_TYPE415Invalid content type
INTERNAL_ERROR500Internal server error

When a workflow, channel, or connector fails strict validation on create/update, the envelope is extended with a details array of field-pathed errors (kept omitted for single-message errors so v0.1 clients aren’t broken):

{
  "error": {
    "code": "BAD_REQUEST",
    "message": "Workflow validation failed",
    "details": [
      { "field": "tasks[0].function.input.connector", "message": "is required" },
      { "field": "tasks[2].function.input.method",    "message": "expected string, got number" }
    ]
  }
}

The field path mirrors the JSON structure the API received, so editors can jump straight to the failing key. The same envelope is returned by POST /workflows/validate, POST /workflows/{id}/test, and the orion-server lint / dry-run CLI subcommands.

Data API

The data API handles runtime request processing: routing messages to channels, executing workflows, and returning results.

Endpoints

MethodPathDescription
POST/api/v1/data/{channel}Process message synchronously (simple channel name)
POST/api/v1/data/{channel}/asyncSubmit for async processing (returns trace ID)
ANY/api/v1/data/{path...}REST route matching: method + path matched against channel route patterns
ANY/api/v1/data/{path...}/asyncAsync submission via REST route matching
GET/api/v1/data/tracesList traces. Filter with ?status=, ?channel=, ?mode=
GET/api/v1/data/traces/{id}Poll async trace result

Route Resolution

When a request arrives at /api/v1/data/{path}, Orion resolves the target channel in this order:

  1. Async check: strip trailing /async suffix (switches to async mode)
  2. REST route table: match HTTP method + path against channel route_pattern values (e.g., GET /orders/{order_id})
  3. Channel name fallback: direct lookup by single path segment (e.g., /api/v1/data/orders → channel named orders)

REST routes are matched by priority (descending) then specificity (segment count). Path parameters are extracted and injected into the message metadata.

Synchronous Processing

Send a POST to the channel name or a matching REST route:

# By channel name
curl -s -X POST http://localhost:8080/api/v1/data/orders \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-123", "total": 25000 } }'

# By REST route pattern
curl -s -X GET http://localhost:8080/api/v1/data/orders/ORD-123/items/ITEM-1

Response:

{
  "status": "ok",
  "data": {
    "order": { "order_id": "ORD-123", "total": 25000, "flagged": true }
  },
  "errors": []
}

Per-request profiling

Add X-Orion-Profile: 1 (or ?profile=1) to the request and the response gains a _orion.profile block that breaks the request down by phase. The header is opt-in so you only pay the cost on the requests you care about, and tracing.profile_enabled in config gates the surface entirely. The debug surface always sits under the _orion namespace so workflow-produced output keys can never collide with future debug fields.

{
  "status": "ok",
  "data": { ... },
  "errors": [],
  "_orion": {
    "profile": {
      "version": 1,
      "engine_lock_ms": 0.04,
      "workflow_ms": 6.71,
      "tasks": [
        { "id": "parse",    "ms": 0.18 },
        { "id": "validate", "ms": 0.42 },
        { "id": "enrich",   "ms": 4.91 }
      ]
    }
  }
}

Asynchronous Processing

Append /async to submit for background processing:

curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-456" } }'

Response: returns immediately with a trace ID:

{
  "trace_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "pending"
}

Poll for the result:

curl -s http://localhost:8080/api/v1/data/traces/550e8400-e29b-41d4-a716-446655440000

Trace statuses: pendingcompleted or failed.

Trace Endpoints

List and filter traces:

# List all traces
curl -s http://localhost:8080/api/v1/data/traces

# Filter by channel and status
curl -s "http://localhost:8080/api/v1/data/traces?channel=orders&status=completed"

# Filter by mode
curl -s "http://localhost:8080/api/v1/data/traces?mode=async"

Get a specific trace:

curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

Operational Endpoints

MethodPathDescription
GET/healthHealth check (200 OK / 503 degraded). Checks DB, engine, uptime
GET/healthzKubernetes liveness probe. Always returns 200
GET/readyzKubernetes readiness probe. 503 if DB, engine, or startup not ready
GET/metricsPrometheus metrics (when enabled)
GET/docsSwagger UI
GET/api/v1/openapi.jsonOpenAPI 3.0 specification

Workflow Reference

A workflow is a versioned, JSON-defined pipeline of tasks. A channel links to a workflow by workflow_id; when a request arrives, Orion matches an active workflow, runs its tasks in order, and returns the resulting data context.

This page is the authoritative reference for the workflow JSON shape, the data context model, conditions, and the draft → active lifecycle. For the per-task function.input schemas, see the Function Reference.

The workflow object

Send this shape to POST /api/v1/admin/workflows (and PUT .../{id} to update the draft). Fields marked server-managed are set by Orion and returned in responses — you don’t send them on create.

FieldTypeRequiredDefaultNotes
workflow_idstringnoauto (UUID v4)Stable identifier. ≤128 chars, alphanumeric plus ., -, _, must start alphanumeric
namestringyesHuman-readable name. ≤255 chars, non-empty
descriptionstringno≤2048 chars
priorityintegerno0Match order — higher priority workflows are evaluated first (see Matching)
conditionJSONLogicnotrueWhether the workflow matches a request (see Conditions)
tasksarrayyesOrdered, non-empty list of task objects
tagsstring[]no[]Free-form labels for filtering
continue_on_errorboolnofalseIf true, a failing task does not halt the pipeline (see Error handling)
versionintegerserver-managed1Increments per saved version of a workflow_id
statusstringserver-manageddraftdraft | active | archived
rollout_percentageintegerserver-managed100Share of traffic when activated (see Rollout)
created_at / updated_atstringserver-managedRFC 3339 timestamps

Responses wrap the resource in a data envelope:

{ "data": { "workflow_id": "high-value-order", "version": 1, "status": "draft", "...": "..." } }

Validation failures return 400 with a structured error envelope — see the Admin API for the FieldError format.

Tasks

Each entry in tasks is a single step in the pipeline:

FieldTypeRequiredNotes
idstringyesUnique within the workflow; used in tracing
namestringyesHuman-readable label
functionobjectyesThe function to run — see below
conditionJSONLogicnoIf present and falsy, this task is skipped

The function object names a built-in function and supplies its input:

FieldTypeRequiredNotes
namestringyesOne of the 16 built-in functions
inputobjectdependsFunction-specific parameters. Connector functions are schema-validated on create
{
  "id": "flag",
  "name": "Flag high-value order",
  "condition": { ">": [{ "var": "data.order.total" }, 10000] },
  "function": {
    "name": "map",
    "input": { "mappings": [{ "path": "data.order.flagged", "logic": true }] }
  }
}

The data context

Tasks share a single JSON document, the data context, with two top-level areas your JSONLogic can read:

  • data — the working document. The request body is parsed into it by parse_json (or parse_xml), tasks read and write data.*, and for a sync channel the final data object is what’s returned to the caller.
  • metadata — request context such as headers, query params, and path params, available to conditions and validation.

Most functions write their result to a dotted output path (e.g. response_path, output, or a map mapping path), which is created inside the context if it doesn’t exist.

The parse-then-process pattern. A workflow that reads request data should start with parse_json; otherwise conditions referencing data.* see an empty context.

{
  "tasks": [
    { "id": "parse",   "name": "Parse",   "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
    { "id": "process", "name": "Process", "condition": { ">": [{ "var": "data.order.total" }, 100] }, "function": { "name": "map", "input": { "mappings": [] } } }
  ]
}

Conditions

Conditions are JSONLogic expressions, evaluated by datalogic-rs and compiled once at engine build time. They appear at two levels:

  • Workflow-level condition — decides whether the whole workflow matches a request. Defaults to true (always matches). If multiple active workflows are bound to a channel, the first match wins (see Matching).
  • Task-level condition — decides whether that task runs within a matched workflow. Use it for branching inside a pipeline.

Common operators (see the JSONLogic spec for the full set):

OperatorExampleMeaning
var{ "var": "data.order.total" }Read a value from the context
== / !={ "==": [{ "var": "data.type" }, "order"] }Loose equality
> >= < <={ ">": [{ "var": "data.order.total" }, 10000] }Comparison
and / or / !{ "and": [a, b] }Boolean logic
!!{ "!!": [{ "var": "data.order.id" }] }Truthiness (e.g. “is present”)
if{ "if": [cond, then, else] }Conditional value
in{ "in": [{ "var": "data.tier" }, ["vip", "premium"]] }Membership
cat{ "cat": ["Order #", { "var": "data.order.id" }] }String concatenation
+ - * / %{ "*": [{ "var": "data.qty" }, 1.1] }Arithmetic

Error handling

By default the pipeline halts on the first task that errors, and the error is returned to the caller. Set continue_on_error: true on the workflow to keep running subsequent tasks and collect errors instead. The filter function offers finer control: on_reject: "halt" stops the workflow, while on_reject: "skip" skips only the current task.

For async channels, a task failure routes the trace to the Dead Letter Queue for automatic retry — see Resilience.

Lifecycle and versioning

Each workflow_id has one or more versions, identified by the composite key (workflow_id, version). Status moves in one direction:

draft ──activate──▶ active ──archive──▶ archived
  • draft — editable; not served. Only one draft per workflow_id may exist at a time. Creating a workflow starts it as a draft.
  • active — served; immutable. To change an active workflow, create a new draft version, edit it, and activate it.
  • archived — retired; kept for history and instant rollback.

Endpoints (see the Admin API for full details):

ActionEndpoint
Validate without savingPOST /api/v1/admin/workflows/validate
Create (as draft)POST /api/v1/admin/workflows
New draft version of an existing idPOST /api/v1/admin/workflows/{id}/versions
Dry-run against sample dataPOST /api/v1/admin/workflows/{id}/test
Change statusPATCH /api/v1/admin/workflows/{id}/status
Adjust rolloutPATCH /api/v1/admin/workflows/{id}/rollout

Matching

When a channel resolves to its workflows, Orion evaluates active workflows in descending priority, then runs the first whose condition is truthy. Give a catch-all workflow a low priority and specific ones a higher priority to layer behavior.

Rollout

rollout_percentage (1–100) enables canary releases across versions. Activating a new version at, say, 25 directs ~25% of traffic to it and the remainder to the previously active version; traffic is bucketed by a stable hash of the request so a given caller is routed consistently. Promote by raising the percentage to 100 (which archives the older active version), or roll back instantly by re-activating a previous version.

# Activate a new version to 10% of traffic
curl -X PATCH http://localhost:8080/api/v1/admin/workflows/high-value-order/status \
  -H "Content-Type: application/json" -d '{ "status": "active", "rollout_percentage": 10 }'

# Ramp up later
curl -X PATCH http://localhost:8080/api/v1/admin/workflows/high-value-order/rollout \
  -H "Content-Type: application/json" -d '{ "rollout_percentage": 50 }'

Complete example

{
  "workflow_id": "high-value-order",
  "name": "High-Value Order",
  "description": "Flag orders over $10,000 for manual review",
  "priority": 10,
  "condition": { "==": [{ "var": "metadata.headers.x-source" }, "checkout"] },
  "tasks": [
    {
      "id": "parse",
      "name": "Parse payload",
      "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } }
    },
    {
      "id": "validate",
      "name": "Validate order",
      "function": {
        "name": "validation",
        "input": { "rules": [
          { "logic": { "!!": [{ "var": "data.order.id" }] }, "message": "order id is required" },
          { "logic": { ">": [{ "var": "data.order.total" }, 0] }, "message": "total must be positive" }
        ]}
      }
    },
    {
      "id": "flag",
      "name": "Flag for review",
      "condition": { ">": [{ "var": "data.order.total" }, 10000] },
      "function": {
        "name": "map",
        "input": { "mappings": [
          { "path": "data.order.flagged", "logic": true },
          { "path": "data.order.alert", "logic": { "cat": ["High-value order: $", { "var": "data.order.total" }] } }
        ]}
      }
    }
  ],
  "tags": ["orders", "risk"],
  "continue_on_error": false
}

See Use Cases & Patterns for complete, tested workflows, and the Function Reference for every function’s input schema.

Function Reference

A workflow is an ordered list of tasks, and every task invokes one built-in function with an input object:

{
  "id": "enrich",
  "name": "Look up customer",
  "function": {
    "name": "http_call",
    "input": { "connector": "crm", "path": "/customers/42", "response_path": "data.customer" }
  }
}

Functions read from and write to the data context — the JSON document that flows through the pipeline. By convention the request body is parsed into data.* by parse_json, later tasks read data.* in their JSONLogic, and the data object is what a sync channel returns. See the Workflow Reference for the context model and how condition expressions are evaluated.

Orion ships 16 functions (plus validate, an alias for validation). Eight are contributed by the dataflow-rs engine; eight are Orion handlers that talk to connectors or compose channels.

FunctionCategoryConnectorPurpose
parse_jsonDataParse the raw payload into the data context
parse_xmlDataParse an XML payload into the data context
mapDataTransform/reshape data with JSONLogic
filterDataGate the pipeline on a JSONLogic condition
validationDataCollect validation errors from JSONLogic rules
logDataEmit a structured log line
publish_jsonDataSerialize a context field to a JSON string
publish_xmlDataSerialize a context field to an XML string
http_callConnectorHTTPCall an external API with retry + circuit breaker
db_readConnectorSQLRun a SELECT, return rows as JSON
db_writeConnectorSQLRun INSERT/UPDATE/DELETE, return affected count
cache_readConnectorCacheRead a value from Redis or the in-memory cache
cache_writeConnectorCacheWrite a value to cache with optional TTL
mongo_readConnectorMongoDBRun find(), return documents as JSON
publish_kafkaConnectorKafkaPublish a message to a Kafka topic
channel_callCompositionInvoke another channel’s workflow in-process

Wherever an input field is described as JSONLogic, you pass a JSONLogic expression that is evaluated against the data context. A plain JSON literal (string, number, object) is also valid JSONLogic and evaluates to itself.


Data functions

These come from the dataflow-rs engine. Orion does not input-schema-validate them at workflow-create time (unlike the connector functions below), so an invalid input here surfaces at execution time rather than on create.

parse_json

Reads a raw value (typically the request payload) and parses it as JSON into the data context. Almost every workflow starts with this — without it, task conditions that reference data.* see an empty context.

FieldTypeRequiredDefaultDescription
sourcestringyesWhere to read the raw value from, e.g. "payload"
targetstringyesField name under data; the parsed value is stored at data.{target}
{ "name": "parse_json", "input": { "source": "payload", "target": "order" } }

parse_xml

Same input shape as parse_json, but parses an XML payload into a JSON structure at data.{target}.

FieldTypeRequiredDefaultDescription
sourcestringyesWhere to read the raw XML from, e.g. "payload"
targetstringyesStored at data.{target}
{ "name": "parse_xml", "input": { "source": "payload", "target": "order" } }

map

Applies an ordered list of JSONLogic expressions, writing each result to a dotted path in the context. The primary tool for reshaping, computing, and enriching data.

FieldTypeRequiredDefaultDescription
mappingsarrayyesOrdered list of { "path", "logic" } entries
mappings[].pathstringyesDotted target path, e.g. "data.order.total"
mappings[].logicJSONLogicyesExpression whose result is written to path
{
  "name": "map",
  "input": {
    "mappings": [
      { "path": "data.order.flagged", "logic": true },
      { "path": "data.order.total_with_tax", "logic": { "*": [{ "var": "data.order.total" }, 1.1] } }
    ]
  }
}

filter

Evaluates a JSONLogic condition. If it is truthy the pipeline continues; otherwise the on_reject action is taken.

FieldTypeRequiredDefaultDescription
conditionJSONLogicyesEvaluated against the data context
on_rejectstringno"halt""halt" stops the whole workflow; "skip" skips only this task
{
  "name": "filter",
  "input": {
    "condition": { ">": [{ "var": "data.order.total" }, 0] },
    "on_reject": "halt"
  }
}

validation / validate

Evaluates a list of rules. Each rule’s logic must evaluate to exactly true; any other result records the rule’s message in the response’s error list. Validation is non-destructive — it never mutates the data context. validate is an accepted alias for validation.

FieldTypeRequiredDefaultDescription
rulesarrayyesList of { "logic", "message" } rules
rules[].logicJSONLogicyesMust evaluate to true to pass
rules[].messagestringyesError message recorded when the rule fails
{
  "name": "validation",
  "input": {
    "rules": [
      { "logic": { "!!": [{ "var": "data.order.customer_id" }] }, "message": "customer_id is required" },
      { "logic": { ">": [{ "var": "data.order.total" }, 0] },        "message": "total must be positive" }
    ]
  }
}

log

Emits a structured log line. message is a JSONLogic expression (a plain string is valid), and fields attaches additional JSONLogic-derived key/values.

FieldTypeRequiredDefaultDescription
messageJSONLogicyesThe log message (string literal or expression)
levelstringno"info"trace | debug | info | warn | error
fieldsobjectno{}Map of name → JSONLogic expression, logged as structured fields
{
  "name": "log",
  "input": {
    "level": "info",
    "message": "Order processed",
    "fields": { "order_id": { "var": "data.order.id" } }
  }
}

publish_json

Serializes a field inside the data context to a JSON string and stores it at another field. (It writes back into the context; it does not publish to an external system.)

FieldTypeRequiredDefaultDescription
sourcestringyesField under data to serialize, e.g. "order" (reads data.order)
targetstringyesField under data to receive the serialized string
prettyboolnofalsePretty-print the JSON output
{ "name": "publish_json", "input": { "source": "order", "target": "order_json", "pretty": true } }

publish_xml

Like publish_json, but serializes to an XML string.

FieldTypeRequiredDefaultDescription
sourcestringyesField under data to serialize
targetstringyesField under data to receive the XML string
root_elementstringno"root"Name of the XML root element
{ "name": "publish_xml", "input": { "source": "order", "target": "order_xml", "root_element": "Order" } }

Connector functions

These reference a connector by name — credentials and endpoints live in the connector, not the workflow. Orion validates their input at workflow create/update time and exposes the schema via GET /api/v1/admin/functions. Every connector call is wrapped in a circuit breaker.

http_call

Makes an HTTP request through an HTTP connector, with retry and circuit-breaker support. The connector supplies the base URL and auth.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the HTTP connector
methodstringno"GET"GET | POST | PUT | PATCH | DELETE
pathstringnoPath appended to the connector’s base URL
path_logicJSONLogicnoCompute the path dynamically (use instead of path)
headersobjectno{}Extra request headers (string → string)
bodyanynoStatic request body (serialized as JSON)
body_logicJSONLogicnoCompute the body dynamically (use instead of body)
response_pathstringnoDotted path where the response body is written; omit to discard it
timeout_msnumberno30000Per-request timeout in milliseconds
{
  "name": "http_call",
  "input": {
    "connector": "payment-api",
    "method": "POST",
    "path": "/charge",
    "body_logic": { "var": "data.payment" },
    "response_path": "data.charge_result",
    "timeout_ms": 5000
  }
}

db_read

Runs a SELECT against a SQL connector and writes the result rows as a JSON array. Use placeholders bound from params? for SQLite/MySQL, $1, $2, … for PostgreSQL.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the SQL connector
querystringyesSELECT statement with bind placeholders
paramsarraynoValues bound to the placeholders, in order
outputstringno"data"Dotted path where the row array is written
{
  "name": "db_read",
  "input": {
    "connector": "primary-db",
    "query": "SELECT id, name, tier FROM customers WHERE id = ?",
    "params": [{ "var": "data.order.customer_id" }],
    "output": "data.customer"
  }
}

db_write

Runs an INSERT/UPDATE/DELETE against a SQL connector and writes { "rows_affected": N }.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the SQL connector
querystringyesINSERT/UPDATE/DELETE statement with bind placeholders
paramsarraynoValues bound to the placeholders, in order
outputstringno"data"Dotted path where { "rows_affected": N } is written
{
  "name": "db_write",
  "input": {
    "connector": "primary-db",
    "query": "INSERT INTO orders (id, total) VALUES (?, ?)",
    "params": [{ "var": "data.order.id" }, { "var": "data.order.total" }],
    "output": "data.write_result"
  }
}

cache_read

Reads a key from a cache connector (Redis or the built-in in-memory backend). Missing keys yield null.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the cache connector
keystringyesCache key to read
outputstringno"data"Dotted path where the value is written
{ "name": "cache_read", "input": { "connector": "redis", "key": "rate:42", "output": "data.cached" } }

cache_write

Writes a key to a cache connector, optionally with a TTL.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the cache connector
keystringyesCache key to set
valueanyyesValue to store (non-strings are JSON-serialized)
ttl_secsnumbernono expiryTime-to-live in seconds
{ "name": "cache_write", "input": { "connector": "redis", "key": "rate:42", "value": 1, "ttl_secs": 60 } }

mongo_read

Runs a find() against a MongoDB connector and writes the matched documents as a JSON array.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the MongoDB connector
databasestringyesDatabase name
collectionstringyesCollection name
filterobjectno{}MongoDB find filter document
outputstringno"data"Dotted path where matched documents are written
{
  "name": "mongo_read",
  "input": {
    "connector": "mongo",
    "database": "shop",
    "collection": "customers",
    "filter": { "tier": "vip" },
    "output": "data.vips"
  }
}

publish_kafka

Publishes a message to a Kafka topic through a Kafka connector. Requires Kafka to be enabled in config. If value_logic is omitted, the full data context is published.

FieldTypeRequiredDefaultDescription
connectorstringyesName of the Kafka connector
topicstringyesTarget topic
key_logicJSONLogicnoExpression that derives the message key
value_logicJSONLogicnofull dataExpression that derives the message value
{
  "name": "publish_kafka",
  "input": {
    "connector": "events",
    "topic": "order.placed",
    "key_logic": { "var": "data.order.id" },
    "value_logic": { "var": "data.order" }
  }
}

Composition functions

channel_call

Invokes another channel’s workflow in-process — no network hop. The called channel keeps its own versioning and governance. Cycle detection and a max call depth prevent runaway recursion. Provide exactly one of channel/channel_logic and at most one of data/data_logic.

FieldTypeRequiredDefaultDescription
channelstringone of channel/channel_logicStatic target channel name
channel_logicJSONLogicone of channel/channel_logicExpression that resolves to the target channel name
dataanynorequest payloadStatic payload passed to the target channel
data_logicJSONLogicnoExpression that derives the payload
response_pathstringno"data"Dotted path where the called channel’s response is stored
timeout_msnumbernofrom configPer-call timeout in milliseconds
{
  "name": "channel_call",
  "input": {
    "channel": "customer-lookup",
    "data_logic": { "var": "data.order.customer_id" },
    "response_path": "data.customer"
  }
}

Inspecting schemas at runtime

GET /api/v1/admin/functions returns the live input schema for the connector and composition functions (the data functions are provided by dataflow-rs and are not catalogued there). The Orion CLI MCP server surfaces the same schemas to AI assistants so generated workflows use correct field names.

CLI Setup

Get Orion running on your machine in under a minute.

Installation

Choose your preferred method:

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-server

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh

PowerShell (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"

Docker:

docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest

From source (requires Rust 1.88+):

cargo install --git https://github.com/GoPlasmatic/Orion

First Run

Start Orion with default settings (SQLite, port 8080):

orion-server

Verify it’s running:

curl -s http://localhost:8080/health
{
  "status": "ok",
  "version": "0.2.0",
  "uptime_seconds": 5,
  "workflows_loaded": 0,
  "components": {
    "database": "ok",
    "engine": "ok"
  }
}

Swagger UI is available at http://localhost:8080/docs.

Configuration

Create a config file for custom settings:

orion-server -c config.toml

Or use environment variables for individual overrides:

ORION_SERVER__PORT=9090 \
ORION_LOGGING__FORMAT=json \
orion-server

Common configuration scenarios:

# Use PostgreSQL instead of SQLite
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server

# Enable admin authentication
ORION_ADMIN_AUTH__ENABLED=true \
ORION_ADMIN_AUTH__API_KEY="your-secret-key" \
orion-server

# Enable metrics and tracing
ORION_METRICS__ENABLED=true \
ORION_TRACING__ENABLED=true \
ORION_TRACING__OTLP_ENDPOINT="http://localhost:4317" \
orion-server

Validate a config file without starting the server:

orion-server validate-config -c config.toml

Create Your First Service

1. Create a workflow:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
  -H "Content-Type: application/json" \
  -d '{
    "workflow_id": "hello-world",
    "name": "Hello World",
    "condition": true,
    "tasks": [
      { "id": "parse", "name": "Parse", "function": {
          "name": "parse_json", "input": { "source": "payload", "target": "req" }
      }},
      { "id": "greet", "name": "Greet", "function": {
          "name": "map", "input": { "mappings": [
            { "path": "data.req.greeting", "logic": {
              "cat": ["Hello, ", { "var": "data.req.name" }, "!"]
            }}
          ]}
      }}
    ]
  }'

2. Activate the workflow:

curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/hello-world/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

3. Create and activate a channel:

curl -s -X POST http://localhost:8080/api/v1/admin/channels \
  -H "Content-Type: application/json" \
  -d '{ "channel_id": "hello", "name": "hello", "channel_type": "sync",
        "protocol": "rest", "route_pattern": "/hello",
        "methods": ["POST"], "workflow_id": "hello-world" }'

curl -s -X PATCH http://localhost:8080/api/v1/admin/channels/hello/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

4. Test it:

curl -s -X POST http://localhost:8080/api/v1/data/hello \
  -H "Content-Type: application/json" \
  -d '{ "data": { "name": "World" } }'
{
  "status": "ok",
  "data": { "req": { "name": "World", "greeting": "Hello, World!" } },
  "errors": []
}

Orion CLI

The Orion CLI provides a command-line interface and MCP server for managing Orion. No curl commands needed.

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-cli

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.sh | sh

PowerShell (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.ps1 | iex"

From source (requires Rust 1.88+):

cargo install --git https://github.com/GoPlasmatic/Orion-cli

Usage:

orion-cli config set-server http://localhost:8080
orion-cli health
orion-cli workflows list
orion-cli channels list
orion-cli send hello -d '{ "data": { "name": "World" } }'

The full lifecycle — create, activate, dry-run, then send live data — in one tool:

▶ Click to play. Dry-run testing and live traffic flow through the same workflow.

See the CLI reference for the full command list, or set up the MCP Server for AI assistant integration.

Next Steps

MCP Server Setup

Orion includes an MCP (Model Context Protocol) server that lets AI assistants like Claude manage workflows, channels, and connectors through natural language. The MCP server wraps the Orion admin API, giving AI agents full control over your Orion instance.

▶ Click to play. A real stdio JSON-RPC session: handshake, tool discovery, then a live tool call.

Prerequisites

  • A running Orion instance (see CLI Setup)
  • An MCP-compatible client (Claude Code, Claude Desktop, or other MCP clients)

Configuration

Add the Orion MCP server to your MCP client configuration.

Claude Code: add to your .claude/settings.json or project-level .mcp.json:

{
  "mcpServers": {
    "orion": {
      "command": "orion-cli",
      "args": ["mcp", "serve"],
      "env": {
        "ORION_SERVER_URL": "http://localhost:8080"
      }
    }
  }
}

Claude Desktop: add to your Claude Desktop configuration file:

{
  "mcpServers": {
    "orion": {
      "command": "orion-cli",
      "args": ["mcp", "serve"],
      "env": {
        "ORION_SERVER_URL": "http://localhost:8080"
      }
    }
  }
}

If admin authentication is enabled on your Orion instance, include the API key:

{
  "env": {
    "ORION_SERVER_URL": "http://localhost:8080",
    "ORION_API_KEY": "your-secret-key"
  }
}

Available Tools

The MCP server exposes 46 tools covering the full Orion API. Tool names follow a <resource>_<action> convention:

CategoryTools
Healthhealth_check
Engineengine_status, engine_reload
Workflowsworkflows_list, workflows_get, workflows_create, workflows_update, workflows_delete, workflows_activate, workflows_archive, workflows_test, workflows_validate, workflows_rollout, workflows_versions, workflows_create_version, workflows_export, workflows_import
Channelschannels_list, channels_get, channels_create, channels_update, channels_delete, channels_activate, channels_archive, channels_versions, channels_create_version, channels_import
Connectorsconnectors_list, connectors_get, connectors_create, connectors_update, connectors_delete, connectors_enable, connectors_disable, connectors_import
Circuit Breakerscircuit_breakers_list, circuit_breaker_reset
Datadata_send_sync, data_send_async
Tracestraces_list, traces_get
Functionsfunctions_list
Audit Logsaudit_logs_list
Backupsbackups_create, backups_list
Metricsget_metrics

Usage Example

Once configured, you can use natural language to manage Orion:

“Create a workflow that parses incoming orders, flags any over $10,000, and adds a risk level field. Then create a channel called ‘orders’ that uses it.”

The AI assistant will use the MCP tools to:

  1. Create the workflow via workflows_create
  2. Test it with sample data via workflows_test
  3. Activate it via workflows_activate
  4. Create the channel via channels_create
  5. Activate the channel via channels_activate, then apply with engine_reload

Troubleshooting

MCP server not connecting:

  • Verify Orion is running: curl http://localhost:8080/health
  • Check the ORION_SERVER_URL environment variable is set correctly
  • Ensure orion-cli is in your PATH

Authentication errors:

  • Verify ORION_API_KEY matches your Orion instance’s admin_auth.api_key
  • Check that admin auth is enabled on the Orion instance if you’re passing a key

Use Cases & Patterns

Real-world examples showing how AI generates Orion workflows from natural language. Every example follows the same pattern: describe what you need → AI generates the workflow → create a channel → send data → get results.

E-Commerce Order Classification

Classify orders into tiers and compute discounts.

AI prompt:

Create a workflow for the "orders" channel that:
1. Parses the payload into "order"
2. Assigns tiers based on amount:
   - VIP: amount >= 500, discount 15%
   - Premium: amount 100-500, discount 5%
   - Standard: amount < 100, no discount

Generated workflow:

{
  "name": "Order Classification",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "order" }
    }},
    { "id": "vip_tier", "name": "Set VIP Tier",
      "condition": { ">=": [{ "var": "data.order.amount" }, 500] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "vip" },
        { "path": "data.order.discount_pct", "logic": 15 }
      ]}}
    },
    { "id": "premium_tier", "name": "Set Premium Tier",
      "condition": { "and": [
        { ">=": [{ "var": "data.order.amount" }, 100] },
        { "<": [{ "var": "data.order.amount" }, 500] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "premium" },
        { "path": "data.order.discount_pct", "logic": 5 }
      ]}}
    },
    { "id": "standard_tier", "name": "Set Standard Tier",
      "condition": { "<": [{ "var": "data.order.amount" }, 100] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "standard" },
        { "path": "data.order.discount_pct", "logic": 0 }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/orders \
  -H "Content-Type: application/json" \
  -d '{ "data": { "amount": 750, "product": "Diamond Ring" } }'

Response:

{
  "status": "ok",
  "data": {
    "order": { "amount": 750, "product": "Diamond Ring", "tier": "vip", "discount_pct": 15 }
  },
  "errors": []
}

Key patterns: Task-level conditions, computed output fields.

IoT Sensor Alert Classification

Classify sensor readings into severity levels using range-based conditions.

AI prompt:

Create a workflow for the "sensors" channel that classifies temperature readings:
- Critical: temperature > 90 or below 0, set alert flag
- Warning: temperature 70-90, set alert flag
- Normal: temperature 0-70, no alert
Parse the payload into "reading" and set severity and alert fields.

Generated workflow:

{
  "name": "Sensor Alert Pipeline",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "reading" }
    }},
    { "id": "critical", "name": "Mark Critical",
      "condition": { "or": [
        { ">": [{ "var": "data.reading.temperature" }, 90] },
        { "<": [{ "var": "data.reading.temperature" }, 0] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "critical" },
        { "path": "data.reading.alert", "logic": true }
      ]}}
    },
    { "id": "warning", "name": "Mark Warning",
      "condition": { "and": [
        { ">": [{ "var": "data.reading.temperature" }, 70] },
        { "<=": [{ "var": "data.reading.temperature" }, 90] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "warning" },
        { "path": "data.reading.alert", "logic": true }
      ]}}
    },
    { "id": "normal", "name": "Mark Normal",
      "condition": { "and": [
        { ">=": [{ "var": "data.reading.temperature" }, 0] },
        { "<=": [{ "var": "data.reading.temperature" }, 70] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "normal" },
        { "path": "data.reading.alert", "logic": false }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/sensors \
  -H "Content-Type: application/json" \
  -d '{ "data": { "temperature": 80, "sensor_id": "SENSOR-42" } }'

Response:

{
  "status": "ok",
  "data": {
    "reading": { "temperature": 80, "sensor_id": "SENSOR-42", "severity": "warning", "alert": true }
  },
  "errors": []
}
InputSeverityAlert
temperature: 45normalfalse
temperature: 80warningtrue
temperature: 95criticaltrue
temperature: -5criticaltrue

Key patterns: Range-based classification with and/or conditions, boolean flags.

Webhook Payload Transformation

Normalize incoming webhook payloads from different providers into a consistent internal schema.

AI prompt:

Create a workflow for the "webhooks" channel that normalizes webhook payloads from any provider:
- Map "origin" to "source"
- Map "type" to "event_type"
- Map "body" to "payload"
- Add a "processed" flag set to true
Output should be under data.normalized.

Generated workflow:

{
  "name": "Webhook Transform Pipeline",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "event" }
    }},
    { "id": "normalize", "name": "Normalize Schema", "function": {
        "name": "map", "input": { "mappings": [
          { "path": "data.normalized.source", "logic": { "var": "data.event.origin" } },
          { "path": "data.normalized.event_type", "logic": { "var": "data.event.type" } },
          { "path": "data.normalized.payload", "logic": { "var": "data.event.body" } },
          { "path": "data.normalized.processed", "logic": true }
        ]}
    }}
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/webhooks \
  -H "Content-Type: application/json" \
  -d '{ "data": { "origin": "github", "type": "push", "body": {"ref": "refs/heads/main"} } }'

Response:

{
  "status": "ok",
  "data": {
    "normalized": { "source": "github", "event_type": "push", "payload": {"ref": "refs/heads/main"}, "processed": true }
  },
  "errors": []
}

Missing optional fields produce null, no errors. This makes the pipeline safe for partial payloads from different webhook providers.

Key patterns: Schema mapping with var, null-safe field access, static enrichment.

Notification Routing

Route notifications to different delivery channels based on severity.

AI prompt:

Create a workflow for the "notifications" channel that routes by severity:
- Log all notifications
- Send email for anything except "low" severity
- Send SMS only for "high" and "critical" severity
Parse the payload into "notification".

Generated workflow:

{
  "name": "Notification Router",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "notification" }
    }},
    { "id": "log_all", "name": "Log All Notifications", "function": {
        "name": "map", "input": { "mappings": [
          { "path": "data.notification.logged", "logic": true }
        ]}
    }},
    { "id": "email", "name": "Send Email",
      "condition": { "!=": [{ "var": "data.notification.severity" }, "low"] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.notification.email_sent", "logic": true }
      ]}}
    },
    { "id": "sms", "name": "Send SMS for High/Critical",
      "condition": { "in": [{ "var": "data.notification.severity" }, ["high", "critical"]] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.notification.sms_sent", "logic": true }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/notifications \
  -H "Content-Type: application/json" \
  -d '{ "data": { "message": "Disk usage at 92%", "severity": "high" } }'

Response:

{
  "status": "ok",
  "data": {
    "notification": { "message": "Disk usage at 92%", "severity": "high", "logged": true, "email_sent": true, "sms_sent": true }
  },
  "errors": []
}
SeverityLoggedEmailSMS
lowyesnono
mediumyesyesno
highyesyesyes
criticalyesyesyes

In production, replace the map tasks with http_call tasks pointing to your email and SMS connectors.

Key patterns: Task-level condition gating, in operator for set membership, progressive pipeline.

Compliance Risk Classification

Classify transactions by risk level and use dry-run testing to verify workflows before activating them.

AI prompt:

Create a workflow for the "compliance" channel that classifies transaction risk:
- High risk: amount > 10000, requires manual review
- Normal risk: amount <= 10000, no review needed
Parse the payload into "txn".

Generated workflow:

{
  "name": "Risk Classifier",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "txn" }
    }},
    { "id": "high_risk", "name": "Flag High Risk",
      "condition": { ">": [{ "var": "data.txn.amount" }, 10000] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.txn.risk_level", "logic": "high" },
        { "path": "data.txn.requires_review", "logic": true }
      ]}}
    },
    { "id": "normal_risk", "name": "Normal Risk",
      "condition": { "<=": [{ "var": "data.txn.amount" }, 10000] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.txn.risk_level", "logic": "normal" },
        { "path": "data.txn.requires_review", "logic": false }
      ]}}
    }
  ]
}

Dry-run before going live:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<workflow-id>/test \
  -H "Content-Type: application/json" \
  -d '{"data": {"amount": 50000, "currency": "USD"}}'
{
  "matched": true,
  "trace": { "steps": [
    { "task_id": "parse", "result": "executed" },
    { "task_id": "high_risk", "result": "executed" },
    { "task_id": "normal_risk", "result": "skipped" }
  ]},
  "output": { "txn": { "amount": 50000, "currency": "USD", "risk_level": "high", "requires_review": true } }
}

The trace shows exactly which tasks ran and which were skipped. Verify the logic is correct before a single real transaction flows through.

Key patterns: Dry-run verification, execution trace inspection, regulatory workflow.

AI Workflow & CI/CD

AI writes workflows, not services. Instead of generating microservices that need their own governance, LLMs generate Orion workflows: constrained JSON that the platform validates, versions, and monitors automatically.

Prompt Templates

Structure your LLM prompts to produce valid Orion workflows. Here’s a reusable system prompt:

You generate Orion workflows in JSON format. Workflows have:
- name, condition (JSONLogic or true), continue_on_error (optional boolean)
- tasks: array of { id, name, condition (optional, JSONLogic), function: { name, input } }
- Every workflow starts with a parse_json task: { "name": "parse_json", "input": { "source": "payload", "target": "<entity>" } }
- Use "map" function with "mappings" array for transforms. Each mapping has "path" (dot notation) and "logic" (value or JSONLogic).
- Use "http_call" with "connector" (by name) for external API calls. Do not embed URLs or credentials in workflows.
- Use "channel_call" with "channel" (by name) for in-process inter-channel invocation.
- Task conditions use { "var": "data.<entity>.<field>" } to reference parsed data.

Output ONLY the JSON workflow. No explanation.

Validation Pipeline

Every AI-generated workflow should go through this pipeline before reaching production:

  1. Generate: use your LLM with the prompt template above
  2. Validate: POST /api/v1/admin/workflows/validate to check structure
  3. Create as draft: POST /api/v1/admin/workflows (workflows are created as drafts by default, not loaded into the engine)
  4. Dry-run: POST /api/v1/admin/workflows/{id}/test with representative test data
  5. Check the trace: verify the right tasks ran, the right ones were skipped, and output matches expectations
  6. Activate: PATCH /api/v1/admin/workflows/{id}/status with "status": "active"

CI/CD Pipeline

Integrate AI workflow generation into your deployment pipeline. Workflows are JSON files that version, diff, and review like any other config.

AI generates workflow → commit as JSON → CI runs dry-run → review → import

GitHub Actions example:

name: Validate AI Workflows
on:
  pull_request:
    paths: ['workflows/**/*.json']

jobs:
  validate:
    runs-on: ubuntu-latest
    services:
      orion:
        image: ghcr.io/goplasmatic/orion:latest
        ports: ['8080:8080']
    steps:
      - uses: actions/checkout@v4

      - name: Import workflows (as drafts)
        run: |
          for file in workflows/**/*.json; do
            curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
              -H "Content-Type: application/json" \
              -d @"$file"
          done

      - name: Dry-run test cases
        run: |
          for test in workflows/tests/**/*.json; do
            WORKFLOW_ID=$(jq -r '.workflow_id' "$test")
            DATA=$(jq -c '.data' "$test")
            EXPECTED=$(jq -c '.expected_output' "$test")

            RESULT=$(curl -s -X POST \
              "http://localhost:8080/api/v1/admin/workflows/${WORKFLOW_ID}/test" \
              -H "Content-Type: application/json" \
              -d "$DATA")

            OUTPUT=$(echo "$RESULT" | jq -c '.output')
            if [ "$OUTPUT" != "$EXPECTED" ]; then
              echo "FAIL: $test"
              echo "Expected: $EXPECTED"
              echo "Got: $OUTPUT"
              exit 1
            fi
          done

      - name: Deploy to production
        if: github.event_name == 'push' && github.ref == 'refs/heads/main'
        run: |
          for file in workflows/**/*.json; do
            curl -s -X POST "${{ secrets.ORION_URL }}/api/v1/admin/workflows" \
              -H "Content-Type: application/json" \
              -d @"$file"
          done

Test case format: store test cases alongside workflows:

workflows/
  fraud-detection.json       # The workflow
  tests/
    fraud-high-risk.json     # Test case
    fraud-clear.json         # Test case

Each test case:

{
  "workflow_id": "fraud-detection",
  "data": { "data": { "amount": 15000, "country": "US" } },
  "expected_output": { "order": { "amount": 15000, "risk": "high", "requires_review": true } }
}

Safety Guardrails

AI-generated workflows get the same governance as hand-written ones:

  • Version history: every workflow change is recorded. Roll back if an AI-generated workflow misbehaves.
  • Draft status: workflows are created as draft by default and are not loaded into the engine until explicitly activated.
  • Dry-run before activate: test with representative data and inspect the full execution trace.
  • Audit trail: every workflow version is recorded in the workflows table with incrementing version numbers.
  • Connectors isolate secrets: AI generates workflows that reference connector names, never credentials.

Common Workflow Patterns

The parse-then-process pattern

Every workflow that reads input data must start with parse_json. Without it, task conditions referencing data.X see empty context.

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
    { "id": "process", "condition": { ">": [{ "var": "data.order.total" }, 100] }, "function": { "..." : "..." } }
  ]
}

Task-level vs workflow-level conditions

  • Workflow-level condition: determines whether the entire workflow matches. Set to true for workflows that always run.
  • Task-level condition: determines whether a specific task within a matched workflow executes. Use for branching logic within a pipeline.
{
  "condition": true,
  "tasks": [
    { "id": "always", "function": { "..." : "..." } },
    { "id": "conditional", "condition": { ">": [{ "var": "data.amount" }, 500] }, "function": { "..." : "..." } }
  ]
}

External API calls with connectors

Keep credentials in connectors, reference them by name in workflows:

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "event" } } },
    { "id": "notify", "function": { "name": "http_call", "input": {
        "connector": "slack-webhook",
        "method": "POST",
        "body_logic": { "var": "data.event" }
    }}}
  ]
}

Inter-channel composition with channel_call

Invoke another channel’s workflow in-process for service composition:

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
    { "id": "enrich", "function": { "name": "channel_call", "input": {
        "channel": "customer-lookup",
        "data_logic": { "var": "data.order.customer_id" },
        "response_path": "data.customer"
    }}},
    { "id": "process", "condition": { "==": [{ "var": "data.customer.tier" }, "vip"] }, "function": { "..." : "..." } }
  ]
}

Config Reference

All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.

CLI Commands

orion-server                                       # Start the server (default)
orion-server -c config.toml                        # Start with a config file
orion-server validate-config                       # Validate config without starting
orion-server validate-config -c config.toml        # Validate a specific config file
orion-server migrate                               # Run database migrations
orion-server migrate --dry-run                     # Preview pending migrations
orion-server lint path/to/workflow.json            # Strict-validate a workflow JSON file
orion-server dry-run -w workflow.json -i input.json # Execute a workflow against a sample payload
orion-server test-connectivity                     # Probe DB (and Kafka if enabled)

All subcommands honour ${VAR} / ${VAR:-default} substitution in the loaded config file, so the same config.toml can be reused across environments.

Database Backend

The database backend is selected at runtime from the storage.url scheme. No rebuild needed:

BackendURL FormatExample
SQLitesqlite:sqlite:orion.db or sqlite::memory:
PostgreSQLpostgres://postgres://user:pass@host/db
MySQLmysql://mysql://user:pass@host/db
# SQLite (default)
orion-server

# PostgreSQL
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server

# MySQL
ORION_STORAGE__URL="mysql://user:pass@localhost/orion" orion-server

Migrations for all backends are embedded in the binary and the correct set is selected automatically at startup.

Complete Config File

[server]
host = "0.0.0.0"
port = 8080
# shutdown_drain_secs = 30     # HTTP connection drain timeout on shutdown

# [server.tls]
# enabled = false
# cert_path = "cert.pem"
# key_path = "key.pem"

[storage]
url = "sqlite:orion.db"       # Database URL (sqlite:, postgres://, mysql://)
# max_connections = 25          # Connection pool max connections
# min_connections = 5           # Connection pool min connections
# busy_timeout_ms = 5000        # SQLite busy timeout (ignored for other backends)
# acquire_timeout_secs = 5      # Connection pool acquire timeout
# idle_timeout_secs = 0         # Connection idle timeout (0 = no timeout)
# backup_dir = "./backups"      # Directory where POST /api/v1/admin/backups writes files (SQLite only)

[ingest]
max_payload_size = 1048576     # Maximum payload size in bytes (1 MB)

[engine]
# health_check_timeout_secs = 2   # Timeout for engine read lock in health checks
# reload_timeout_secs = 10        # Timeout for engine write lock during reload
# max_channel_call_depth = 10     # Max recursion depth for channel_call
# default_channel_call_timeout_ms = 30000  # Default timeout for channel_call
# global_http_timeout_secs = 30   # Global timeout for HTTP client
# max_pool_cache_entries = 100    # Max cached connection pools for external connectors
# cache_cleanup_interval_secs = 60  # Cleanup interval for idle connection pools

# [engine.circuit_breaker]
# enabled = false                 # Enable circuit breakers for connectors
# failure_threshold = 5           # Failures before tripping the breaker
# recovery_timeout_secs = 30      # Cooldown before half-open probe
# max_breakers = 10000            # Max circuit breaker instances (LRU eviction)

[queue]
workers = 4                    # Concurrent async trace workers
buffer_size = 1000             # Channel buffer for pending traces
# shutdown_timeout_secs = 30    # Timeout for in-flight jobs during shutdown
# trace_retention_hours = 72    # How long to keep completed traces (0 = disabled)
# trace_cleanup_interval_secs = 3600  # Cleanup check interval
# processing_timeout_ms = 60000  # Per-trace processing timeout
# max_result_size_bytes = 1048576  # Max size of trace result (1 MB)
# max_queue_memory_bytes = 104857600  # Max memory for queued traces (100 MB)
# dlq_retry_enabled = true       # Enable dead letter queue retry
# dlq_max_retries = 5            # Max retry attempts for DLQ entries
# dlq_poll_interval_secs = 30    # DLQ retry poll interval

[rate_limit]
# enabled = false               # Enable platform-level rate limiting
# default_rps = 100             # Default requests per second
# default_burst = 50            # Default burst allowance

# [rate_limit.endpoints]
# admin_rps = 50                # Rate limit for admin routes
# data_rps = 200                # Rate limit for data routes

[admin_auth]
# enabled = false                          # Require authentication for admin endpoints
# api_keys = ["key-1", "key-2"]            # Accepted bearer tokens / API keys (any matching key authorises a request)
# header = "Authorization"                 # "Authorization" = Bearer format, other = raw key

[cors]
# allowed_origins = ["*"]       # Global CORS allowed origins

[kafka]
enabled = false
brokers = ["localhost:9092"]
group_id = "orion"
# processing_timeout_ms = 60000  # Per-message processing timeout
# max_inflight = 100             # Max in-flight messages
# lag_poll_interval_secs = 30    # Consumer lag poll interval

[[kafka.topics]]               # Map Kafka topics to channels
topic = "incoming-orders"
channel = "orders"

[kafka.dlq]
enabled = false
topic = "orion-dlq"

[logging]
level = "info"                 # trace, debug, info, warn, error
format = "pretty"              # pretty or json

[metrics]
enabled = false

[tracing]
# enabled = false                   # Enable OpenTelemetry trace export
# otlp_endpoint = "http://localhost:4317"  # OTLP gRPC endpoint
# service_name = "orion"            # Service name in traces
# sample_rate = 1.0                 # 0.0 (none) to 1.0 (all)
# debug_profile_enabled = false     # Allow per-request `X-Orion-Profile: 1` to attach `_orion.profile` to responses

[tracing.storage]
# Persistence policy for engine traces (rows in the `traces` table, surfaced via /api/v1/data/traces).
# Unrelated to the OpenTelemetry export above. Per-channel `config.tracing` overrides this default.
#   sync   — write inline before responding (strongest durability)
#   async  — enqueue to a bounded background queue (one DB write per task)
#   batch  — bounded queue, workers commit batch_size rows in one TX (highest throughput)
#   off    — skip persistence entirely
# mode = "sync"
# sample_rate = 1.0                  # 0.0–1.0 fraction of traces persisted
# errors_only = false                # Only persist traces that ended in an error
# max_pending = 10000                # Queue depth (async + batch modes)

[channels]                          # Control which channels this instance loads
# include = ["orders.*", "payments.*"]   # Glob patterns to include (empty = all)
# exclude = ["analytics.*"]              # Glob patterns to exclude

Environment Variable Substitution

Values in config.toml may reference process environment variables with ${VAR} (required, fails fast if unset) or ${VAR:-default} (optional with a fallback). Use $$ to escape a literal $. The same substitution also runs against connector config_json blobs at startup, so secrets can stay out of the database. The complementary env://VAR_NAME resolver kicks in after JSON parsing on connector string fields — ${VAR} rewrites text, env:// rewrites parsed values.

Environment Variable Overrides

Override any setting with environment variables using double-underscore nesting:

ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_RATE_LIMIT__ENABLED=true
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEYS="key-1,key-2"   # comma-separated; merged into admin_auth.api_keys
ORION_TRACING__ENABLED=true
ORION_TRACING__OTLP_ENDPOINT="http://jaeger:4317"
ORION_METRICS__ENABLED=true

Environment variables take precedence over config file values.

Built-in Capabilities

All capabilities are compiled into a single binary and controlled at runtime:

CapabilityConfigurationDefault
Database backendstorage.url schemeSQLite
Kafkakafka.enabledDisabled
OpenTelemetrytracing.enabledDisabled
TLS/HTTPSserver.tls.enabledDisabled
Swagger UIAlways at /docsEnabled
SQL connectorsdb_read/db_write functionsAlways available
Redis cachecache_read/cache_write with Redis backendAlways available
MongoDB connectormongo_read functionAlways available
Rate limitingrate_limit.enabledDisabled
Metricsmetrics.enabledDisabled

Production Checklist

  • Mount a persistent volume for orion.db (SQLite) or configure storage.url for PostgreSQL/MySQL
  • Enable admin API authentication: ORION_ADMIN_AUTH__ENABLED=true
  • Set structured logging: ORION_LOGGING__FORMAT=json
  • Enable Prometheus metrics: ORION_METRICS__ENABLED=true
  • Configure rate limiting for traffic protection (platform-level and per-channel)
  • Use per-crate filtering: RUST_LOG=orion=info
  • Enable OpenTelemetry: ORION_TRACING__ENABLED=true for distributed tracing
  • Enable TLS via the server.tls section
  • Enable circuit breakers: engine.circuit_breaker.enabled = true
  • Set trace retention: queue.trace_retention_hours for trace data lifecycle management