Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Introduction

Orion is a declarative services runtime written in Rust. Instead of writing, deploying, and operating a microservice for every piece of business logic, you declare what the service should do, and Orion runs it. Architectural governance — observability, rate limiting, circuit breakers, versioning, input validation, and more — is built in.

AI generates workflows, Orion provides the governance. The platform guarantees that every service gets health checks, metrics, retries, and error handling, regardless of how the workflow was created.

Is Orion Right for You?

If you need to…Orion?Why
Turn business rules into live REST/Kafka servicesYesDefine logic as JSON workflows, deploy with one API call
Let AI generate and manage business logicYesBuilt-in validation, dry-run testing, and draft-before-activate safety
Replace a handful of single-purpose microservicesYesOne instance handles many channels, governance included
Use a rule engine like DroolsNot quiteOrion uses JSONLogic. Lightweight and AI-friendly, but not a full RETE-based rule engine
Embed a workflow engine library in your appNoOrion is a standalone runtime. For an embeddable engine, see dataflow-rs
Orchestrate long-running jobs (hours/days)NoUse Temporal or Airflow. Orion is optimized for request-response and event processing
Run a full API gateway with plugin ecosystemNoUse Kong or Envoy. Orion focuses on service logic, not proxy features

Three Primitives

You build services in Orion with three things:

┌─────────────┐       ┌──────────────┐       ┌─────────────┐
│   Channel   │──────▶│   Workflow   │──────▶│  Connector  │
│  (endpoint) │       │   (logic)    │       │  (external) │
└─────────────┘       └──────────────┘       └─────────────┘
PrimitiveWhat it isExample
ChannelA service endpoint: sync (REST, HTTP) or async (Kafka)POST /orders, GET /users/{id}, Kafka topic order.placed
WorkflowA pipeline of tasks that defines what the service doesParse → validate → enrich → transform → respond
ConnectorA named connection to an external system, with auth and retriesStripe API, PostgreSQL, Redis, Kafka cluster

Design-time: define channels, build workflows, configure connectors, test with dry-run, manage versions, all through the admin API.

Runtime: Orion routes traffic to channels, executes workflows, calls connectors, and handles observability automatically.

Your First Service in 2 Minutes

No code. No Dockerfile. No CI pipeline.

1. Start Orion

brew install GoPlasmatic/tap/orion-server   # or: curl installer, cargo install
orion-server

2. Create a workflow (AI-generated or hand-written)

curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
  -H "Content-Type: application/json" \
  -d '{
    "workflow_id": "high-value-order",
    "name": "High-Value Order",
    "condition": true,
    "tasks": [
      { "id": "parse", "name": "Parse payload", "function": {
          "name": "parse_json",
          "input": { "source": "payload", "target": "order" }
      }},
      { "id": "flag", "name": "Flag order",
        "condition": { ">": [{ "var": "data.order.total" }, 10000] },
        "function": {
          "name": "map",
          "input": { "mappings": [
            { "path": "data.order.flagged", "logic": true },
            { "path": "data.order.alert", "logic": {
              "cat": ["High-value order: $", { "var": "data.order.total" }]
            }}
          ]}
      }}
    ]
  }'

# Activate it
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/high-value-order/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

3. Create a channel (the service endpoint)

curl -s -X POST http://localhost:8080/api/v1/admin/channels \
  -H "Content-Type: application/json" \
  -d '{ "channel_id": "orders", "name": "orders", "channel_type": "sync",
        "protocol": "http", "route_pattern": "/orders",
        "methods": ["POST"], "workflow_id": "high-value-order" }'

# Activate
curl -s -X PATCH http://localhost:8080/api/v1/admin/channels/orders/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

4. Send a request — your service is live

curl -s -X POST http://localhost:8080/api/v1/data/orders \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-9182", "total": 25000 } }'
{
  "status": "ok",
  "data": {
    "order": {
      "order_id": "ORD-9182",
      "total": 25000,
      "flagged": true,
      "alert": "High-value order: $25000"
    }
  }
}

That’s it. Rate limiting, metrics, health checks, and request tracing are already active. Change the threshold? One API call. No rebuild, no redeploy, no restart.

What’s Built In

Every channel gets production-grade features without writing a line of code:

FeatureWhat it doesConfiguration
Rate limitingThrottle requests per client or globallyrequests_per_second, burst, JSONLogic key
TimeoutsCancel slow workflows, return 504timeout_ms per channel
Input validationReject bad requests at the boundaryJSONLogic with headers, query, path access
BackpressureShed load when overwhelmed, return 503max_concurrent (semaphore-based)
CORSControl browser cross-origin accessallowed_origins per channel
Circuit breakersStop cascading failures to external servicesAutomatic per connector, admin API to inspect/reset
VersioningDraft → active → archived lifecycleAutomatic version history, rollout percentages
ObservabilityPrometheus metrics, structured logs, distributed tracingAlways on, zero configuration
Health checksComponent-level status with degradation detectionGET /health, automatic
DeduplicationPrevent duplicate processing via idempotency keysIdempotency-Key header, configurable window
Response cachingCache responses for identical requestsTTL-based, configurable cache key fields

Performance

7K+ workflow requests/sec on a single instance (Apple M2 Pro, release build, 50 concurrent connections):

ScenarioReq/secAvg LatencyP99 Latency
Simple workflow (1 task)7,4176.70 ms16.80 ms
Complex workflow (4 tasks)7,0447.00 ms23.50 ms
12 workflows on one channel6,8947.20 ms17.30 ms

Pre-compiled JSONLogic, zero-downtime hot-reload, lock-free reads, SQLite WAL mode, async-first on Tokio.

Architecture Overview

Three Primitives

Services in Orion are composed from three building blocks:

PrimitiveRoleExamples
ChannelService endpoint: sync (REST, HTTP) or async (Kafka)POST /orders, GET /users/{id}, Kafka topic order.placed
WorkflowPipeline of tasks that defines what the service doesParse → validate → enrich → transform → respond
ConnectorNamed connection to an external system with auth and retriesStripe API, PostgreSQL, Redis, Kafka cluster

Channels receive traffic. Workflows process it. Connectors reach out to external systems. Everything else (rate limiting, metrics, circuit breakers, versioning) is handled by the platform.

Deployment Topology

Before Orion

Every piece of business logic is its own service to build, deploy, and operate, each with its own infrastructure stack:

4 services x (code + Dockerfile + CI pipeline + health checks + metrics agent + log agent + sidecar proxy + scaling policy + secret config + canary rollout) = dozens of components to build, wire, and keep running.

After Orion

One Orion instance replaces all four:

No API gateway needed. Governance is built in. One binary to deploy.

The best of both worlds: each channel and workflow is independently versioned, testable, and deployable. The modularity of microservices with the operational simplicity of a monolith.

Deploy Anywhere

Single binary. SQLite by default, no database to provision, no runtime dependencies. Need more scale? Swap to PostgreSQL or MySQL by changing storage.url. No rebuild needed.

Same channel definitions work in any topology: run everything in one instance, split channels across instances with include/exclude filters, or deploy as sidecars.

Request Processing Flow

  1. Route Resolution: REST pattern matching finds the channel, or falls back to name lookup
  2. Channel Registry: enforces deduplication, rate limits, input validation, backpressure, and checks the response cache
  3. Engine: the workflow engine sits behind a double-Arc (Arc<RwLock<Arc<Engine>>>) allowing zero-downtime swaps
  4. Workflow Matcher: evaluates JSONLogic conditions and rollout percentages to pick the right workflow
  5. Task Pipeline: executes functions in order (parse, map, filter, http_call, db_read, etc.)

Sync and Async

Sync     POST /api/v1/data/{channel}         → immediate response
Async    POST /api/v1/data/{channel}/async   → returns trace_id, poll later

REST     GET /api/v1/data/orders/{id}        → matched by route pattern
Kafka    topic: order.placed                 → consumed automatically

Sync channels respond immediately. Async channels return a trace ID; poll GET /api/v1/data/traces/{id} for results. Kafka channels consume from topics configured in the DB or config file.

Bridging is a pattern, not a feature. A sync workflow can publish_kafka and return 202. An async channel picks it up from there.

Service Composition

Most platforms require HTTP calls between services, adding latency, failure modes, and serialization overhead. Orion’s channel_call invokes another channel’s workflow in-process with zero network round-trip:

POST /orders (order-processing workflow)
  ├── parse_json         → extract order data
  ├── channel_call       → "inventory-check" channel (in-process)
  ├── channel_call       → "customer-lookup" channel (in-process)
  ├── map                → compute pricing with enriched data
  └── publish_json       → return combined result

Each composed channel has its own workflow, versioning, and governance, but calls between them are function calls, not network hops. Cycle detection prevents infinite recursion.

Built-in Task Functions

FunctionDescription
parse_jsonParse payload into the data context
parse_xmlParse XML payloads into structured JSON
filterAllow or halt processing based on JSONLogic conditions
mapTransform and reshape JSON using JSONLogic expressions
validationEnforce required fields and constraints
http_callInvoke downstream APIs via connectors
channel_callInvoke another channel’s workflow in-process
db_read / db_writeExecute SQL queries, return rows/affected count
cache_read / cache_writeRead/write to in-memory or Redis cache
mongo_readQuery MongoDB collections
publish_json / publish_xmlSerialize data to JSON or XML output
publish_kafkaPublish messages to Kafka topics
logEmit structured log entries

Architectural Characteristics

Orion provides production-grade capabilities across eight architectural dimensions. Each subcategory below links to its detailed documentation.

C Creational · S Structural · B Behavioral

Click a node to expand its capabilities. Click any leaf node to jump to its documentation.


Observability — S

AreaCapabilities
Structured LoggingJSON & pretty-print formats · Configurable log levels · Per-request context · Per-crate filtering
Prometheus MetricsRequest counters & error rates · Latency histograms · Circuit breaker metrics · Rate limit rejections
Distributed TracingW3C Trace Context · OpenTelemetry OTLP export · Configurable sampling rate · Per-task span tracking
Health MonitoringComponent-level health checks · Automatic degradation · Request ID propagation · Kubernetes liveness & readiness probes

Resilience — S

AreaCapabilities
Circuit BreakersLock-free state machine · Per-connector isolation · Auto-recovery after cooldown · Admin API to inspect & reset
Retry & BackoffExponential backoff (capped 60 s) · Configurable max retries · Retryable error detection
TimeoutsPer-channel enforcement · Workflow execution limits · Per-connector query timeout
Fault ToleranceGraceful shutdown (SIGTERM/SIGINT) · Connection draining · Dead letter queue with retry · Panic recovery middleware

Security — B

AreaCapabilities
Secret ManagementAuto-masked API responses · Credential isolation via connectors
Input ValidationPer-channel JSONLogic rules · Payload size limits · Header & query param access
Network SecuritySSRF protection (private IP blocking) · TLS/HTTPS support · Security headers (CSP, X-Frame-Options)
Access ControlAdmin API authentication · Per-channel CORS enforcement · Origin allowlist
Data SafetyParameterized SQL queries · Injection protection · URL validation

Scalability — C

AreaCapabilities
Rate LimitingToken bucket algorithm · Per-client keying via JSONLogic · Platform & per-channel limits
BackpressureSemaphore concurrency limits · 503 load shedding · Per-channel configuration
Async ProcessingMulti-worker trace queue · Bounded buffer channels · DLQ retry processor
Horizontal ScalingStateless instances · Channel include/exclude filters · Multi-database backends

Deployability — C

AreaCapabilities
PackagingSingle binary · SQLite, PostgreSQL, MySQL · Minimal footprint
ContainerizationMulti-stage Docker build · Non-root execution · Built-in health probes
ConfigurationTOML + env var overrides · Sensible defaults · Runtime configuration
DistributionHomebrew tap · Shell & PowerShell installers · Multi-platform binaries

Extensibility — S

AreaCapabilities
ConnectorsHTTP & Webhooks · Kafka pub/sub · Database (SQL) · Cache (Memory & Redis) · Storage (S3/GCS) · MongoDB (NoSQL)
Custom FunctionsAsync function handlers · Built-in function library · JSONLogic expressions
Channel ProtocolsREST with route matching (sync) · Simple HTTP (sync) · Kafka (async)

Availability — C

AreaCapabilities
Hot-ReloadZero-downtime engine swap · Channel registry rebuild · Kafka consumer restart
Canary RolloutsPercentage-based traffic split · Gradual migration · Instant rollback
VersioningDraft / Active / Archived lifecycle · Multi-version history · Workflow import & export
PerformanceResponse caching · Request deduplication · Connection pool caching

Maintainability — B

AreaCapabilities
Admin APIsFull CRUD for all entities · Version management · Engine control · OpenAPI / Swagger UI
CI/CD IntegrationBulk import & export · Pre-deploy validation · GitOps-friendly
TestingDry-run execution · Workflow validation · Step-by-step traces
OperationsAudit logging · Database backup & restore · Config validation CLI

Observability

Orion provides structured logging, Prometheus metrics, distributed tracing, and health monitoring out of the box. No sidecars, no agents. Everything runs inside the single binary.

Structured Logging

Orion emits structured logs in JSON or pretty-printed format, configurable at runtime:

[logging]
level = "info"        # trace, debug, info, warn, error
format = "pretty"     # pretty or json

JSON format is recommended for production. It integrates directly with log aggregators like Loki, Datadog, or CloudWatch:

ORION_LOGGING__FORMAT=json
ORION_LOGGING__LEVEL=info

Per-crate filtering with RUST_LOG gives fine-grained control:

RUST_LOG=orion=debug,tower_http=warn,sqlx=warn
LevelUsage
errorFailures that need attention
warnDegraded behavior (circuit breakers, retries)
infoRequest lifecycle, engine reloads, startup/shutdown
debugDetailed processing, SQL queries, connector calls
traceFine-grained internal state

Every request carries a UUID x-request-id header. Pass your own or let Orion generate one. The ID propagates through logs and responses for end-to-end correlation.

Prometheus Metrics

Enable metrics and scrape at GET /metrics (Prometheus text format):

[metrics]
enabled = true
MetricTypeLabelsDescription
messages_totalCounterchannel, statusTotal messages processed
message_duration_secondsHistogramchannelProcessing latency
active_workflowsGaugeWorkflows loaded in engine
errors_totalCountertypeErrors encountered
http_requests_totalCountermethod, path, statusHTTP requests served
http_request_duration_secondsHistogrammethod, path, statusHTTP request latency
db_query_duration_secondsHistogramoperationDatabase query latency
engine_reloads_totalCounterstatusEngine reload events
engine_reload_duration_secondsHistogramEngine reload latency
circuit_breaker_trips_totalCounterconnector, channelCircuit breaker trip events
circuit_breaker_rejections_totalCounterconnector, channelRequests rejected by open breakers
channel_executions_totalCounterchannelChannel invocations
rate_limit_rejections_totalCounterclientRate-limited requests

Distributed Tracing

Enable OpenTelemetry trace export with OTLP gRPC:

[tracing]
enabled = true
otlp_endpoint = "http://localhost:4317"
service_name = "orion"
sample_rate = 1.0    # 0.0 (none) to 1.0 (all)
  • W3C Trace Context extraction and propagation: incoming traceparent headers are respected
  • Per-request spans with channel, workflow, and task attributes
  • OTLP gRPC export to Jaeger, Tempo, or any compatible collector
  • Configurable sampling rate for production use
  • Trace context injected into outbound http_call requests for full distributed traces

Health Monitoring

Orion exposes three health endpoints for different operational needs.

Component health: GET /health returns component-level status with automatic degradation detection:

{
  "status": "ok",
  "version": "0.1.0",
  "uptime_seconds": 3600,
  "workflows_loaded": 42,
  "components": {
    "database": "ok",
    "engine": "ok"
  }
}

The health check tests the database with SELECT 1 and verifies engine availability with a configurable lock timeout. If either check fails, the endpoint returns 503 Service Unavailable with "status": "degraded".

Kubernetes probes:

EndpointPurposeBehavior
GET /healthzLiveness probeAlways returns 200. If the process is running, it’s alive
GET /readyzReadiness probeReturns 200 only when DB is reachable, engine is loaded, and startup is complete; 503 otherwise
livenessProbe:
  httpGet:
    path: /healthz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 10
readinessProbe:
  httpGet:
    path: /readyz
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5

Engine status: GET /api/v1/admin/engine/status returns a detailed breakdown:

{
  "version": "0.1.0",
  "uptime_seconds": 3600,
  "workflows_count": 42,
  "active_workflows": 38,
  "channels": ["orders", "events", "alerts"]
}

Resilience

Orion protects your services from cascading failures, transient errors, and overload with circuit breakers, automatic retries, timeouts, and graceful degradation, all built into the runtime.

Circuit Breakers

Every connector gets automatic circuit breaker protection. When failures exceed a threshold, the breaker opens and short-circuits requests to prevent cascading failures.

StateBehavior
ClosedNormal operation; requests flow through
OpenRequests rejected immediately (503) after failure threshold exceeded
Half-OpenAfter cooldown, one probe request allowed to test recovery

The circuit breaker uses a lock-free state machine with per-connector isolation. Configure globally:

[engine.circuit_breaker]
enabled = true
failure_threshold = 5          # Failures before tripping the breaker
recovery_timeout_secs = 30     # Cooldown before half-open probe
max_breakers = 10000           # Max breaker instances (LRU eviction)

Inspect and reset breakers via the admin API:

# List all circuit breaker states
curl -s http://localhost:8080/api/v1/admin/connectors/circuit-breakers

# Reset a specific breaker
curl -s -X POST http://localhost:8080/api/v1/admin/connectors/circuit-breakers/{key}

Retry-Backoff

All HTTP connectors support automatic retries with exponential backoff, capped at 60 seconds:

{
  "retry": {
    "max_retries": 5,
    "retry_delay_ms": 500
  }
}

Delay doubles on each retry: 500ms → 1s → 2s → 4s → … → capped at 60s.

Retries are configured per-connector, so each external service can have its own retry policy. The mechanism automatically detects retryable errors (network failures, 5xx responses) and skips non-retryable ones (4xx client errors).

All connector types (HTTP, DB, Cache, MongoDB, Storage) support the same retry configuration.

Timeouts

Timeouts are enforced at multiple levels to prevent runaway requests:

Per-channel timeout: set in the channel’s config_json to limit workflow execution time:

{
  "timeout_ms": 5000
}

If the workflow exceeds this limit, the request returns 504 Gateway Timeout.

Per-connector query timeout: for database connectors:

{
  "query_timeout_ms": 30000,
  "connect_timeout_ms": 5000
}

Global HTTP timeout: for the shared HTTP client used by http_call:

[engine]
global_http_timeout_secs = 30

Engine lock timeouts: prevent health checks and reloads from blocking indefinitely:

[engine]
health_check_timeout_secs = 2
reload_timeout_secs = 10

Fault Tolerance

Graceful shutdown: Orion handles SIGTERM and SIGINT with a controlled shutdown sequence:

  1. HTTP server stops accepting new connections
  2. In-flight requests drain (configurable via shutdown_drain_secs, default 30s)
  3. Kafka consumer (if enabled) is signaled to stop
  4. Trace cleanup task is stopped
  5. DLQ retry consumer is stopped
  6. Async trace queue drains with timeout
  7. OpenTelemetry spans are flushed (if enabled)
  8. Process exits

Dead letter queue: failed async traces are stored in the trace_dlq database table with automatic retry:

[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30

For Kafka, failed messages can also be routed to a configurable DLQ topic:

[kafka.dlq]
enabled = true
topic = "orion-dlq"

Fault-tolerant pipelines: set continue_on_error: true on a workflow to keep the task pipeline running even if individual tasks fail. Errors are collected in the response rather than halting execution:

{
  "status": "ok",
  "data": { "req": { "action": "test-call" } },
  "errors": [
    { "code": "TASK_ERROR", "task_id": "call", "message": "HTTP request failed..." }
  ]
}

Panic recovery: the outermost middleware layer (CatchPanicLayer) catches panics in any handler, returning a 500 response instead of crashing the process.

Security

Orion enforces security at every layer: secrets are isolated in connectors, inputs are validated before processing, network requests are checked for SSRF, and admin endpoints are protected by authentication.

Secret Management

Sensitive fields are automatically masked in all API responses. Fields named token, password, key, secret, api_key, and connection_string are returned as "******". Secrets are stored but never exposed through the API.

# Create a connector with real credentials
curl -s -X POST http://localhost:8080/api/v1/admin/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "payments-api",
    "connector_type": "http",
    "config": {
      "type": "http",
      "url": "https://api.stripe.com/v1",
      "auth": { "type": "bearer", "token": "sk-live-secret-token" }
    }
  }'

# Read it back (secrets are masked)
curl -s http://localhost:8080/api/v1/admin/connectors/<id>
# auth.token → "******"

Workflows reference connectors by name ("connector": "payments-api"). They never see or embed actual credentials. This means AI-generated workflows can be safely created and shared without risk of credential exposure.

Input Validation

Each channel can define JSONLogic validation rules evaluated against incoming requests before workflow execution:

{
  "validation_logic": {
    "and": [
      { "!!": [{ "var": "data.order_id" }] },
      { ">": [{ "var": "data.amount" }, 0] }
    ]
  }
}

If validation fails, the request is rejected with 400 Bad Request before any workflow logic runs.

Validation rules have access to:

  • data.*: request body fields
  • headers.*: HTTP headers
  • query.*: query string parameters
  • path.*: path parameters (for REST channels)

Payload size limits are enforced globally to prevent oversized requests:

[ingest]
max_payload_size = 1048576   # 1 MB

Network Security

SSRF protection: HTTP connectors validate URLs to prevent Server-Side Request Forgery. By default, requests to private/internal IP addresses (RFC 1918, loopback, link-local) are blocked:

{
  "name": "external-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.example.com",
    "allow_private_urls": false
  }
}

Set allow_private_urls: true only when calling internal services.

TLS/HTTPS: enable TLS termination in the server:

[server.tls]
enabled = true
cert_path = "cert.pem"
key_path = "key.pem"

Security headers: set on all responses:

HeaderValue
X-Content-Type-Optionsnosniff
X-Frame-OptionsDENY
Content-Security-Policydefault-src 'none'; frame-ancestors 'none'
Referrer-Policystrict-origin-when-cross-origin
Permissions-Policycamera=(), microphone=(), geolocation=()
Strict-Transport-SecuritySet when TLS is enabled

Access Control

Admin API authentication: protect admin endpoints with bearer token or API key:

[admin_auth]
enabled = true
api_key = "your-secret-key"
# header = "Authorization"      # Bearer format (default)
# header = "X-API-Key"          # Raw key format

When header is "Authorization", the key is expected as Bearer <key>. For any other header name, the raw key value is matched directly.

# Bearer token
curl -H "Authorization: Bearer your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

# API key via custom header
curl -H "X-API-Key: your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

Per-channel CORS: configure allowed origins per channel in config_json:

{
  "cors": {
    "allowed_origins": ["https://app.example.com", "https://admin.example.com"]
  }
}

Global CORS defaults are configured in the server config:

[cors]
allowed_origins = ["*"]    # Global default

Data Safety

Parameterized SQL queries: the db_read and db_write functions use parameterized queries to prevent SQL injection:

{
  "function": {
    "name": "db_read",
    "input": {
      "connector": "orders-db",
      "query": "SELECT * FROM orders WHERE customer_id = $1",
      "params": [{ "var": "data.customer_id" }],
      "output": "data.orders"
    }
  }
}

Values are always passed as parameters, never interpolated into SQL strings.

URL validation: connector URLs are validated at creation time. Combined with SSRF protection, this prevents workflows from making requests to unexpected destinations.

Injection protection: JSONLogic expressions are evaluated in a sandboxed environment. User-supplied data cannot escape the data context or execute arbitrary code.

Scalability

Orion handles high-throughput workloads with token-bucket rate limiting, semaphore-based backpressure, async processing queues, and stateless horizontal scaling, all configurable per channel.

Rate Limiting

Rate limiting operates at two levels: platform-wide (all requests) and per-channel (individual service endpoints).

Platform-level: enable in config:

[rate_limit]
enabled = true
default_rps = 100
default_burst = 50

[rate_limit.endpoints]
admin_rps = 50
data_rps = 200

Per-channel: configure in the channel’s config_json:

{
  "rate_limit": {
    "requests_per_second": 100,
    "burst": 50
  }
}

Rate limiting uses the token bucket algorithm: tokens replenish at the configured rate, and burst allows short spikes above the steady-state limit. When the bucket is empty, requests receive 429 Too Many Requests.

Per-client keying: use JSONLogic to compute rate limit keys from request data, enabling per-user or per-tenant limits:

{
  "rate_limit": {
    "requests_per_second": 10,
    "burst": 5,
    "key_logic": { "var": "headers.x-api-key" }
  }
}

Rate limiter state is per-instance (in-memory). In multi-instance deployments, divide the configured RPS by the number of instances to approximate global limits, or use sticky sessions at the load balancer.

Backpressure

Semaphore-based concurrency limits prevent any single channel from overwhelming the system:

{
  "backpressure": {
    "max_concurrent": 200
  }
}

When all semaphore permits are taken, additional requests receive 503 Service Unavailable immediately. This is load shedding. The system sheds excess load rather than queuing unboundedly, which protects latency for requests that are admitted.

Each channel has its own independent backpressure semaphore, so a spike in one channel doesn’t affect others.

Async Processing

For workloads that don’t need immediate responses, Orion supports async processing via a bounded trace queue:

# Submit for async processing (returns immediately with a trace ID)
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-123" } }'

# Poll for the result
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

The queue is backed by tokio::sync::mpsc channels with configurable concurrency:

[queue]
workers = 4                       # Concurrent trace workers
buffer_size = 1000                # Channel buffer for pending traces
processing_timeout_ms = 60000     # Per-trace processing timeout
max_result_size_bytes = 1048576   # Max size of trace result (1 MB)
max_queue_memory_bytes = 104857600  # Max memory for queued traces (100 MB)

Failed traces go to the dead letter queue with automatic retry:

[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30

Completed traces are cleaned up automatically based on retention policy:

[queue]
trace_retention_hours = 72
trace_cleanup_interval_secs = 3600

Horizontal Scaling

Orion is designed for single-instance simplicity with multi-instance capability. Each instance is stateless; all persistent data lives in the shared database.

What works across instances:

ComponentHow It Works
DatabaseAll instances share the same database (PostgreSQL or MySQL recommended)
Kafka consumersConsumer groups handle partition assignment automatically
TracesStored in the shared database; queries return consistent results
Workflows & ChannelsDefinitions live in the database; all instances load the same set
Audit logsStored in the shared database regardless of which instance handles the request

Per-Instance State

The following components use in-memory state that is local to each instance:

ComponentImpactWorkaround
Rate Limiting3 instances at 100 RPS = 300 RPS effective global limitSticky sessions; divide configured RPS by instance count
Request DeduplicationSame idempotency key on two instances → processed twiceSticky sessions, or Redis-backed dedup store
Response CachingLower cache hit rates (each instance has a cold cache)Sticky sessions, or Redis-backed cache connector
Circuit BreakersOne instance may trip while others keep sendingAcceptable; monitor /health on each instance
Engine StatePOST /admin/engine/reload only reloads the receiving instanceScript reload to hit all instances (see below)

Reload all instances:

for host in $INSTANCE_HOSTS; do
  curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
    -H "Authorization: Bearer $API_KEY"
done

Alternatively, use a rolling restart strategy with your orchestrator (e.g., Kubernetes rolling deployment).

Topology Control

Use channel include/exclude filters to run different Orion instances for different channel groups:

# Instance A: order processing
[channels]
include = ["orders.*", "payments.*"]

# Instance B: analytics and reporting
[channels]
include = ["analytics.*", "reports.*"]

This enables microservice-style deployment where each instance handles a subset of channels, all sharing the same database.

Database Backend Recommendations

BackendSingle InstanceMultiple InstancesNotes
SQLiteRecommendedNot recommendedWAL mode supports concurrent reads but only one writer. File-based, cannot be shared across hosts.
PostgreSQLSupportedRecommendedFull multi-connection support. Use connection pooling (PgBouncer) for many instances.
MySQLSupportedSupportedEnsure READ-COMMITTED isolation for best concurrency.

For multi-instance deployments, use PostgreSQL with connection pooling (PgBouncer). Script engine reloads to broadcast to all instances after workflow or channel changes.

Deployability

Orion ships as a single binary with embedded migrations, sensible defaults, and multiple installation methods. No runtime dependencies. You’re up and running immediately.

Packaging

Orion compiles into a single binary (orion-server) with everything built in:

  • All three database backends (SQLite, PostgreSQL, MySQL) with embedded migrations
  • Kafka producer and consumer
  • OpenTelemetry trace export
  • TLS termination
  • Swagger UI at /docs

No feature flags, no plugins, no shared libraries to manage. The binary auto-detects the database backend from the URL scheme at startup.

CapabilityConfigurationDefault
Database backendstorage.url schemeSQLite
Kafkakafka.enabledDisabled
OpenTelemetrytracing.enabledDisabled
TLS/HTTPSserver.tls.enabledDisabled
Swagger UIAlways at /docsEnabled
Metricsmetrics.enabledDisabled

With the default SQLite backend, there are zero external dependencies.

Containerization

Orion uses a multi-stage Docker build for minimal image size:

docker build -t orion .

The Dockerfile uses rust:1.93-slim for building and debian:trixie-slim for the runtime image. Key characteristics:

  • Non-root execution: the container runs as a non-root user
  • Built-in health probes: /healthz (liveness) and /readyz (readiness) are available without additional configuration

Docker with SQLite persistence: SQLite stores data in a local file. Without a persistent volume, data is lost when the container restarts:

docker run -p 8080:8080 \
  -v orion-data:/app/data \
  -e ORION_STORAGE__URL=sqlite:/app/data/orion.db \
  orion

With Docker Compose:

services:
  orion:
    image: orion
    ports:
      - "8080:8080"
    environment:
      ORION_STORAGE__URL: sqlite:/app/data/orion.db
    volumes:
      - orion-data:/app/data

volumes:
  orion-data:

For production, PostgreSQL or MySQL is recommended. Point ORION_STORAGE__URL to your database server.

Configuration

All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.

TOML config file: pass with -c:

orion-server -c config.toml

Environment variable overrides: use double-underscore nesting to override any setting:

ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEY="your-secret-key"

Environment variables take precedence over the config file, making it easy to customize per environment without changing files.

Runtime configuration: channels carry their own runtime config (rate limits, timeouts, CORS, validation) via config_json, changeable through the admin API without restarts.

Distribution

Orion is available through multiple installation methods:

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-server

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh

PowerShell installer (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"

Docker:

docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest

From source:

cargo install --git https://github.com/GoPlasmatic/Orion

Multi-platform binaries are published for Linux (x86_64, aarch64), macOS (x86_64, aarch64), and Windows (x86_64).

Extensibility

Orion integrates with external systems through connectors, exposes custom logic via async function handlers, and supports multiple channel protocols for different ingestion patterns.

Connectors

Connectors are named external service configurations. Secrets stay in connectors, out of your workflows.

Authentication

Three auth schemes are supported:

Auth TypeFieldsExample
bearertoken{ "type": "bearer", "token": "sk-..." }
basicusername, password{ "type": "basic", "username": "user", "password": "pass" }
apikeyheader, key{ "type": "apikey", "header": "X-API-Key", "key": "abc123" }

Header Precedence

When http_call builds a request, headers are applied in this order (later layers override earlier ones):

PrioritySourceExample
1 (lowest)Connector default headers"headers": {"x-source": "orion"} in connector config
2Connector authBearer token, Basic auth, API key
3Default content-typeapplication/json (only when a body is present)
4 (highest)Task-level headers"headers": {"content-type": "text/xml"} in the task input

Task-level headers always win. This means a workflow developer can override content-type, authorization, or any other header set by the connector.

Secret Masking

Sensitive fields (token, password, key, secret, api_key, connection_string) are automatically masked as "******" in all API responses. Secrets are stored but never exposed through the API. Workflows reference connectors by name; they never see or embed actual credentials.

HTTP Connector

REST API calls, webhooks, and external service integration:

{
  "name": "payments-api",
  "connector_type": "http",
  "config": {
    "type": "http",
    "url": "https://api.stripe.com/v1",
    "auth": { "type": "bearer", "token": "sk-..." },
    "headers": { "x-source": "orion" },
    "retry": { "max_retries": 3, "retry_delay_ms": 1000 },
    "max_response_size": 10485760,
    "allow_private_urls": false
  }
}
FieldDefaultDescription
urlrequiredBase URL for all requests
method""Default HTTP method
headers{}Default headers applied to every request
authnullAuthentication config (bearer, basic, or apikey)
retry3 retries, 1000msRetry with exponential backoff
max_response_size10 MBMaximum response body size to prevent OOM
allow_private_urlsfalseAllow requests to private/internal IPs (SSRF protection)

Kafka Connector

Produce to Kafka topics:

{
  "name": "event-bus",
  "connector_type": "kafka",
  "config": {
    "type": "kafka",
    "brokers": ["kafka1:9092", "kafka2:9092"],
    "topic": "events",
    "group_id": "orion-producer"
  }
}

Use the publish_kafka task function with optional JSONLogic for dynamic keys and values:

{
  "function": {
    "name": "publish_kafka",
    "input": {
      "connector": "event-bus",
      "topic": "processed-orders",
      "key_logic": { "var": "data.order_id" }
    }
  }
}
FieldRequiredDescription
connectorYesKafka connector name
topicYesTarget topic
key_logicNoJSONLogic expression for partition key
value_logicNoJSONLogic expression for message value (default: message.data)

Kafka consumer configuration: map topics to channels in your config file:

[kafka]
enabled = true
brokers = ["localhost:9092"]
group_id = "orion"

[[kafka.topics]]
topic = "incoming-orders"
channel = "orders"

Async channels with protocol: "kafka" can also register topics via the API (DB-driven). Config-file and DB-driven topics are merged; duplicates are deduplicated with config-file entries taking precedence. The consumer restarts automatically on engine reload when the topic set changes.

Metadata injection: Kafka metadata is automatically injected into every message:

FieldDescription
kafka_topicSource topic name
kafka_keyMessage key (if present)
kafka_partitionPartition number
kafka_offsetOffset within partition

Access these in workflows via { "var": "metadata.kafka_topic" }.

Dead letter queue: failed messages are routed to a configurable DLQ topic:

[kafka.dlq]
enabled = true
topic = "orion-dlq"

Consumer settings:

ConfigDefaultDescription
kafka.processing_timeout_ms60000Per-message processing timeout
kafka.max_inflight100Max in-flight messages
kafka.lag_poll_interval_secs30Consumer lag polling interval

Database Connector (SQL)

Parameterized SQL queries against PostgreSQL, MySQL, or SQLite:

{
  "name": "orders-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "postgres://user:pass@db-host:5432/orders",
    "driver": "postgres",
    "max_connections": 10,
    "connect_timeout_ms": 5000,
    "query_timeout_ms": 30000
  }
}
FieldDefaultDescription
connection_stringrequiredDatabase URL (auto-masked in API responses)
driver"postgres"Driver type: postgres, mysql, or sqlite
max_connectionsnullConnection pool max size
connect_timeout_msnullConnection establishment timeout
query_timeout_msnullIndividual query timeout
retry3 retries, 1000msRetry with exponential backoff

Use db_read for SELECT (returns rows as JSON array) and db_write for INSERT/UPDATE/DELETE (returns affected count):

{
  "function": {
    "name": "db_read",
    "input": {
      "connector": "orders-db",
      "query": "SELECT * FROM orders WHERE customer_id = $1",
      "params": [{ "var": "data.customer_id" }],
      "output": "data.orders"
    }
  }
}

Cache Connector

In-memory or Redis cache for lookups, session state, and temporary storage:

{
  "name": "session-cache",
  "connector_type": "cache",
  "config": {
    "type": "cache",
    "backend": "redis",
    "url": "redis://localhost:6379",
    "default_ttl_secs": 300,
    "max_connections": 10
  }
}
FieldDefaultDescription
backendrequired"redis" or "memory"
urlrequired (redis)Redis connection URL
default_ttl_secsnullDefault TTL for cache entries
max_connectionsnullConnection pool max size
retry3 retries, 1000msRetry with exponential backoff

Use cache_read and cache_write in workflows:

{
  "function": {
    "name": "cache_write",
    "input": {
      "connector": "session-cache",
      "key": "session:user123",
      "value": { "var": "data.session" },
      "ttl_secs": 3600
    }
  }
}

Storage Connector (S3/GCS)

S3, GCS, or local filesystem for file operations:

{
  "name": "uploads",
  "connector_type": "storage",
  "config": {
    "type": "storage",
    "provider": "s3",
    "bucket": "my-uploads",
    "region": "us-east-1",
    "base_path": "/data"
  }
}
FieldDefaultDescription
providerrequiredStorage provider: s3, gcs, local
bucketnullBucket or container name
regionnullCloud region
base_pathnullBase path prefix for all operations
retry3 retries, 1000msRetry with exponential backoff

MongoDB Connector (NoSQL)

MongoDB document queries with BSON-to-JSON conversion:

{
  "name": "analytics-db",
  "connector_type": "db",
  "config": {
    "type": "db",
    "connection_string": "mongodb://localhost:27017",
    "driver": "mongodb"
  }
}

Use mongo_read in workflows:

{
  "function": {
    "name": "mongo_read",
    "input": {
      "connector": "analytics-db",
      "database": "analytics",
      "collection": "events",
      "filter": { "user_id": { "var": "data.user_id" } },
      "output": "data.events"
    }
  }
}

Custom Functions

Orion provides 8 async function handlers that can be used in workflow tasks:

FunctionDescription
http_callCall external APIs via HTTP connectors
channel_callInvoke another channel’s workflow in-process (no HTTP round-trip)
db_readExecute SELECT queries against SQL connectors
db_writeExecute INSERT/UPDATE/DELETE against SQL connectors
cache_readRead from memory or Redis cache connectors
cache_writeWrite to memory or Redis cache connectors
mongo_readQuery MongoDB collections
publish_kafkaProduce messages to Kafka topics

In addition to async functions, Orion includes a built-in function library for data transformation:

FunctionDescription
parse_jsonParse raw payload into structured data
parse_xmlParse XML payload into structured data
filterFilter arrays using JSONLogic conditions
mapTransform data with field mappings
validationValidate data against JSONLogic rules
logLog data at a specified level

JSONLogic expressions power all conditions and dynamic values. Use { "var": "data.field" } to reference data, { "cat": [...] } for string concatenation, arithmetic operators, and more. Dynamic paths (path_logic) and bodies (body_logic) let you compute URLs and request payloads from message data.

Channel Protocols

Channels support three protocol modes:

REST (Sync)

REST channels define route patterns for RESTful API routing with method and path matching:

{
  "name": "order-detail",
  "channel_type": "sync",
  "protocol": "rest",
  "methods": ["GET", "POST"],
  "route_pattern": "/orders/{order_id}/items/{item_id}",
  "workflow_id": "order-detail-workflow"
}

Path parameters are extracted and injected into the message metadata. Routes are matched by priority (descending) then specificity (segment count).

Simple HTTP (Sync)

Simple HTTP channels are matched by channel name. Requests to /api/v1/data/{channel-name} are routed directly:

{
  "name": "orders",
  "channel_type": "sync",
  "protocol": "http",
  "workflow_id": "order-processing"
}

Kafka (Async)

Kafka channels consume from topics and process messages asynchronously:

{
  "name": "kafka-orders",
  "channel_type": "async",
  "protocol": "kafka",
  "topic": "incoming-orders",
  "consumer_group": "orion-orders",
  "workflow_id": "order-processing"
}

DB-driven Kafka channels are automatically registered as consumers at startup and on engine reload. Add Kafka ingestion via the API without restarting Orion.

Availability

Orion supports zero-downtime engine reloads, percentage-based canary rollouts, full version lifecycle management, and response caching, enabling continuous delivery without service interruptions.

Hot-Reload

The engine is held in memory as Arc<RwLock<Arc<Engine>>>. A reload swaps the inner Arc<Engine> while existing readers continue using the old one. Zero dropped requests.

Trigger a reload:

curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload

A reload performs three operations atomically:

  1. Engine swap: rebuilds the engine from all active workflows and channels in the database
  2. Channel registry rebuild: reconstructs the route table, validation logic, rate limiters, backpressure semaphores, dedup stores, and response caches
  3. Kafka consumer restart: if the topic set changed, the Kafka consumer is stopped and restarted with the new topics

Reloads are triggered automatically on status changes (activate/archive) and deletes. Draft creates and updates do not trigger reload.

In multi-instance deployments, reload only affects the instance that receives the request. Script the reload to broadcast to all instances:

for host in $INSTANCE_HOSTS; do
  curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
    -H "Authorization: Bearer $API_KEY"
done

Canary Rollouts

Control traffic exposure for active workflows with rollout percentages:

# Activate a workflow at 10% rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" \
  -d '{"status": "active", "rollout_percentage": 10}'

# Increase rollout to 50%
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
  -H "Content-Type: application/json" \
  -d '{"rollout_percentage": 50}'

# Full rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
  -H "Content-Type: application/json" \
  -d '{"rollout_percentage": 100}'

The rollout percentage determines the probability that incoming requests are matched to this workflow. This enables:

  • Gradual migration: slowly ramp traffic from 0% to 100%
  • A/B testing: run two workflow versions at different percentages
  • Instant rollback: set rollout to 0% or archive the workflow

Versioning

Both workflows and channels follow a draft → active → archived lifecycle with automatic version tracking:

# Create (starts as draft, version 1)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
  -H "Content-Type: application/json" \
  -d '{ "name": "Order Processor", ... }'

# Update (only drafts can be updated)
curl -s -X PUT http://localhost:8080/api/v1/admin/workflows/<id> \
  -H "Content-Type: application/json" -d '{ ... }'

# Activate (loads into engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

# Create new version (new draft from active)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/versions

# Archive (removes from engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
  -H "Content-Type: application/json" -d '{"status": "archived"}'

All versions are stored with incrementing version numbers. List the version history:

curl -s http://localhost:8080/api/v1/admin/workflows/<id>/versions

Import and export: bulk operations for GitOps and migration:

# Export workflows (as JSON)
curl -s http://localhost:8080/api/v1/admin/workflows/export?status=active

# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
  -H "Content-Type: application/json" -d @workflows.json

Performance

Response caching: cache responses for identical requests to reduce redundant workflow execution:

{
  "cache": {
    "enabled": true,
    "ttl_secs": 60,
    "cache_key_fields": ["data.user_id", "data.action"]
  }
}

Cache keys are computed from the specified fields. Cached responses are returned directly without executing the workflow. The cache backend is in-memory by default; Redis-backed caching is available via a cache connector.

Request deduplication: prevent duplicate processing using idempotency keys:

{
  "deduplication": {
    "header": "Idempotency-Key",
    "retention_secs": 300
  }
}

When a request with the same idempotency key arrives within the retention window, it returns 409 Conflict instead of re-processing.

Connection pool caching: external database and MongoDB connector pools are cached and reused across requests, with configurable pool sizes and idle timeouts:

[engine]
max_pool_cache_entries = 100
cache_cleanup_interval_secs = 60

Maintainability

Orion provides comprehensive admin APIs, CI/CD integration patterns, dry-run testing, and operational tools for managing services in production.

Admin APIs

Full CRUD operations for all entities through a RESTful admin API:

ResourceEndpoints
WorkflowsCreate, read, update, delete, status management, versioning, rollout, dry-run test, import/export, validate
ChannelsCreate, read, update, delete, status management, versioning
ConnectorsCreate, read, update, delete, reload, circuit breaker inspection/reset
EngineStatus, hot-reload
Audit logsList with filtering by action and resource type
BackupDatabase export and restore

Version management: both workflows and channels support the draft → active → archived lifecycle. Filter by status:

curl -s "http://localhost:8080/api/v1/admin/workflows?status=active"
curl -s "http://localhost:8080/api/v1/admin/channels?status=draft"

Engine control:

# Check engine status
curl -s http://localhost:8080/api/v1/admin/engine/status

# Hot-reload after changes
curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload

OpenAPI / Swagger UI: interactive API documentation is always available at /docs, and the OpenAPI 3.0 spec at /api/v1/openapi.json.

CI/CD Integration

Orion workflows are JSON files that version, diff, and review like any other config.

Bulk import and export:

# Export active workflows
curl -s "http://localhost:8080/api/v1/admin/workflows/export?status=active" -o workflows.json

# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
  -H "Content-Type: application/json" -d @workflows.json

Pre-deploy validation: validate workflow structure without creating:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
  -H "Content-Type: application/json" -d @workflow.json

GitOps pipeline: a typical CI/CD flow:

AI generates workflow → commit as JSON → CI validates & dry-runs → review → import → activate

GitHub Actions example:

name: Validate Workflows
on:
  pull_request:
    paths: ['workflows/**/*.json']

jobs:
  validate:
    runs-on: ubuntu-latest
    services:
      orion:
        image: ghcr.io/goplasmatic/orion:latest
        ports: ['8080:8080']
    steps:
      - uses: actions/checkout@v4
      - name: Import and test workflows
        run: |
          for file in workflows/**/*.json; do
            curl -sf -X POST http://localhost:8080/api/v1/admin/workflows \
              -H "Content-Type: application/json" -d @"$file"
          done

Tag-based organization: tag workflows for filtering:

{ "tags": ["fraud", "high-priority", "v2"] }
curl -s "http://localhost:8080/api/v1/admin/workflows?tag=fraud"

Testing

Dry-run execution: test a workflow against sample data without activating it:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/test \
  -H "Content-Type: application/json" \
  -d '{"data": {"amount": 50000, "currency": "USD"}}'

The response includes a full execution trace showing which tasks ran and which were skipped:

{
  "matched": true,
  "trace": {
    "steps": [
      { "task_id": "parse", "result": "executed" },
      { "task_id": "high_risk", "result": "executed" },
      { "task_id": "normal_risk", "result": "skipped" }
    ]
  },
  "output": {
    "txn": { "amount": 50000, "risk_level": "high", "requires_review": true }
  }
}

Workflow validation: check that a workflow definition is structurally valid:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
  -H "Content-Type: application/json" -d @workflow.json

Step-by-step traces: async traces record the full execution path and can be retrieved for debugging:

# Submit async request
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" -d '{ "data": { "order_id": "ORD-123" } }'

# Get trace with execution details
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

Operations

Audit logging: all admin actions are recorded for compliance and debugging:

curl -s http://localhost:8080/api/v1/admin/audit-logs
curl -s "http://localhost:8080/api/v1/admin/audit-logs?action=activate&resource_type=workflow"

Each entry captures: principal, action, resource type, resource ID, details, and timestamp.

Database backup and restore:

# Export backup
curl -s -X POST http://localhost:8080/api/v1/admin/backup -o backup.json

# Restore from backup
curl -s -X POST http://localhost:8080/api/v1/admin/restore \
  -H "Content-Type: application/json" -d @backup.json

Config validation CLI: validate your configuration without starting the server:

orion-server validate-config
orion-server validate-config -c config.toml

Database migrations: run or preview pending migrations:

orion-server migrate              # Run migrations
orion-server migrate --dry-run    # Preview pending migrations

Admin API

All admin endpoints are under /api/v1/admin/. When admin authentication is enabled, requests must include a valid bearer token or API key.

Channels

MethodPathDescription
POST/api/v1/admin/channelsCreate channel (as draft)
GET/api/v1/admin/channelsList channels. Filter with ?status=, ?channel_type=, ?protocol=
GET/api/v1/admin/channels/{id}Get channel by ID
PUT/api/v1/admin/channels/{id}Update draft channel
DELETE/api/v1/admin/channels/{id}Delete channel (all versions)
PATCH/api/v1/admin/channels/{id}/statusChange status (active/archived)
GET/api/v1/admin/channels/{id}/versionsList channel version history
POST/api/v1/admin/channels/{id}/versionsCreate new draft version from active channel

Workflows

MethodPathDescription
POST/api/v1/admin/workflowsCreate workflow (as draft; optional id field for custom IDs)
GET/api/v1/admin/workflowsList workflows. Filter with ?tag=, ?status=
GET/api/v1/admin/workflows/{id}Get workflow by ID
PUT/api/v1/admin/workflows/{id}Update draft workflow
DELETE/api/v1/admin/workflows/{id}Delete workflow (all versions)
PATCH/api/v1/admin/workflows/{id}/statusChange status (active/archived)
GET/api/v1/admin/workflows/{id}/versionsList workflow version history
POST/api/v1/admin/workflows/{id}/versionsCreate new draft version from active workflow
PATCH/api/v1/admin/workflows/{id}/rolloutUpdate rollout percentage
POST/api/v1/admin/workflows/{id}/testDry-run on sample payload
POST/api/v1/admin/workflows/importBulk import workflows (as drafts)
GET/api/v1/admin/workflows/exportExport workflows. Filter with ?tag=, ?status=
POST/api/v1/admin/workflows/validateValidate workflow definition

Connectors

MethodPathDescription
POST/api/v1/admin/connectorsCreate connector
GET/api/v1/admin/connectorsList connectors (secrets masked)
GET/api/v1/admin/connectors/{id}Get connector by ID (secrets masked)
PUT/api/v1/admin/connectors/{id}Update connector
DELETE/api/v1/admin/connectors/{id}Delete connector
POST/api/v1/admin/connectors/reloadReload all connectors from DB
GET/api/v1/admin/connectors/circuit-breakersList circuit breaker states
POST/api/v1/admin/connectors/circuit-breakers/{key}Reset a circuit breaker

Engine

MethodPathDescription
GET/api/v1/admin/engine/statusEngine status (version, uptime, workflows count, channels)
POST/api/v1/admin/engine/reloadHot-reload channels and workflows

Audit Logs

MethodPathDescription
GET/api/v1/admin/audit-logsList audit log entries. Filter with ?action=, ?resource_type=

Backup & Restore

MethodPathDescription
POST/api/v1/admin/backupExport database backup
POST/api/v1/admin/restoreRestore from backup

Lifecycle

Both channels and workflows follow a draft → active → archived lifecycle:

  1. Create: entities are created as draft (not loaded into the engine)
  2. Update: only draft versions can be updated via PUT
  3. Activate: PATCH /status with {"status": "active"} loads the entity into the engine
  4. New version: POST /versions creates a new draft version from the active entity
  5. Archive: PATCH /status with {"status": "archived"} removes from the engine

A channel links to a workflow via workflow_id. Activating a channel makes it available for data processing; activating a workflow makes its logic available to the engine.

Authentication

Admin API endpoints support bearer token or API key authentication when enabled:

# Bearer token (default header: Authorization)
curl -H "Authorization: Bearer your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

# API key via custom header
curl -H "X-API-Key: your-secret-key" \
  http://localhost:8080/api/v1/admin/workflows

Configure via [admin_auth] in config or ORION_ADMIN_AUTH__ENABLED=true environment variable.

Error Response Format

All error responses follow a consistent structure:

{
  "error": {
    "code": "NOT_FOUND",
    "message": "Workflow with id '...' not found"
  }
}
CodeHTTP StatusDescription
NOT_FOUND404Resource not found
BAD_REQUEST400Invalid input
UNAUTHORIZED401Missing or invalid credentials
FORBIDDEN403Access denied
CONFLICT409Duplicate or conflicting state
RATE_LIMITED429Too many requests
TIMEOUT504Workflow execution exceeded timeout
SERVICE_UNAVAILABLE503Backpressure or circuit breaker open
UNSUPPORTED_MEDIA_TYPE415Invalid content type
INTERNAL_ERROR500Internal server error

Data API

The data API handles runtime request processing: routing messages to channels, executing workflows, and returning results.

Endpoints

MethodPathDescription
POST/api/v1/data/{channel}Process message synchronously (simple channel name)
POST/api/v1/data/{channel}/asyncSubmit for async processing (returns trace ID)
ANY/api/v1/data/{path...}REST route matching: method + path matched against channel route patterns
ANY/api/v1/data/{path...}/asyncAsync submission via REST route matching
GET/api/v1/data/tracesList traces. Filter with ?status=, ?channel=, ?mode=
GET/api/v1/data/traces/{id}Poll async trace result

Route Resolution

When a request arrives at /api/v1/data/{path}, Orion resolves the target channel in this order:

  1. Async check: strip trailing /async suffix (switches to async mode)
  2. REST route table: match HTTP method + path against channel route_pattern values (e.g., GET /orders/{order_id})
  3. Channel name fallback: direct lookup by single path segment (e.g., /api/v1/data/orders → channel named orders)

REST routes are matched by priority (descending) then specificity (segment count). Path parameters are extracted and injected into the message metadata.

Synchronous Processing

Send a POST to the channel name or a matching REST route:

# By channel name
curl -s -X POST http://localhost:8080/api/v1/data/orders \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-123", "total": 25000 } }'

# By REST route pattern
curl -s -X GET http://localhost:8080/api/v1/data/orders/ORD-123/items/ITEM-1

Response:

{
  "status": "ok",
  "data": {
    "order": { "order_id": "ORD-123", "total": 25000, "flagged": true }
  },
  "errors": []
}

Asynchronous Processing

Append /async to submit for background processing:

curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
  -H "Content-Type: application/json" \
  -d '{ "data": { "order_id": "ORD-456" } }'

Response: returns immediately with a trace ID:

{
  "trace_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "pending"
}

Poll for the result:

curl -s http://localhost:8080/api/v1/data/traces/550e8400-e29b-41d4-a716-446655440000

Trace statuses: pendingcompleted or failed.

Trace Endpoints

List and filter traces:

# List all traces
curl -s http://localhost:8080/api/v1/data/traces

# Filter by channel and status
curl -s "http://localhost:8080/api/v1/data/traces?channel=orders&status=completed"

# Filter by mode
curl -s "http://localhost:8080/api/v1/data/traces?mode=async"

Get a specific trace:

curl -s http://localhost:8080/api/v1/data/traces/{trace-id}

Operational Endpoints

MethodPathDescription
GET/healthHealth check (200 OK / 503 degraded). Checks DB, engine, uptime
GET/healthzKubernetes liveness probe. Always returns 200
GET/readyzKubernetes readiness probe. 503 if DB, engine, or startup not ready
GET/metricsPrometheus metrics (when enabled)
GET/docsSwagger UI
GET/api/v1/openapi.jsonOpenAPI 3.0 specification

CLI Setup

Get Orion running on your machine in under a minute.

Installation

Choose your preferred method:

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-server

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh

PowerShell (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"

Docker:

docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest

From source (requires Rust 1.85+):

cargo install --git https://github.com/GoPlasmatic/Orion

First Run

Start Orion with default settings (SQLite, port 8080):

orion-server

Verify it’s running:

curl -s http://localhost:8080/health
{
  "status": "ok",
  "version": "0.1.0",
  "uptime_seconds": 5,
  "workflows_loaded": 0,
  "components": {
    "database": "ok",
    "engine": "ok"
  }
}

Swagger UI is available at http://localhost:8080/docs.

Configuration

Create a config file for custom settings:

orion-server -c config.toml

Or use environment variables for individual overrides:

ORION_SERVER__PORT=9090 \
ORION_LOGGING__FORMAT=json \
orion-server

Common configuration scenarios:

# Use PostgreSQL instead of SQLite
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server

# Enable admin authentication
ORION_ADMIN_AUTH__ENABLED=true \
ORION_ADMIN_AUTH__API_KEY="your-secret-key" \
orion-server

# Enable metrics and tracing
ORION_METRICS__ENABLED=true \
ORION_TRACING__ENABLED=true \
ORION_TRACING__OTLP_ENDPOINT="http://localhost:4317" \
orion-server

Validate a config file without starting the server:

orion-server validate-config -c config.toml

Create Your First Service

1. Create a workflow:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
  -H "Content-Type: application/json" \
  -d '{
    "id": "hello-world",
    "name": "Hello World",
    "condition": true,
    "tasks": [
      { "id": "parse", "name": "Parse", "function": {
          "name": "parse_json", "input": { "source": "payload", "target": "req" }
      }},
      { "id": "greet", "name": "Greet", "function": {
          "name": "map", "input": { "mappings": [
            { "path": "data.req.greeting", "logic": {
              "cat": ["Hello, ", { "var": "data.req.name" }, "!"]
            }}
          ]}
      }}
    ]
  }'

2. Activate the workflow:

curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/hello-world/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

3. Create and activate a channel:

curl -s -X POST http://localhost:8080/api/v1/admin/channels \
  -H "Content-Type: application/json" \
  -d '{ "channel_id": "hello", "name": "hello", "channel_type": "sync",
        "protocol": "rest", "route_pattern": "/hello",
        "methods": ["POST"], "workflow_id": "hello-world" }'

curl -s -X PATCH http://localhost:8080/api/v1/admin/channels/hello/status \
  -H "Content-Type: application/json" -d '{"status": "active"}'

4. Test it:

curl -s -X POST http://localhost:8080/api/v1/data/hello \
  -H "Content-Type: application/json" \
  -d '{ "data": { "name": "World" } }'
{
  "status": "ok",
  "data": { "req": { "name": "World", "greeting": "Hello, World!" } },
  "errors": []
}

Orion CLI

The Orion CLI provides a command-line interface and MCP server for managing Orion. No curl commands needed.

Homebrew (macOS and Linux):

brew install GoPlasmatic/tap/orion-cli

Shell installer (Linux/macOS):

curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.sh | sh

PowerShell (Windows):

powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.ps1 | iex"

From source (requires Rust 1.85+):

cargo install --git https://github.com/GoPlasmatic/Orion-cli

Usage:

orion-cli config set-server http://localhost:8080
orion-cli health
orion-cli workflows list
orion-cli channels list
orion-cli send hello -d '{ "data": { "name": "World" } }'

See the CLI reference for the full command list, or set up the MCP Server for AI assistant integration.

Next Steps

MCP Server Setup

Orion includes an MCP (Model Context Protocol) server that lets AI assistants like Claude manage workflows, channels, and connectors through natural language. The MCP server wraps the Orion admin API, giving AI agents full control over your Orion instance.

Prerequisites

  • A running Orion instance (see CLI Setup)
  • An MCP-compatible client (Claude Code, Claude Desktop, or other MCP clients)

Configuration

Add the Orion MCP server to your MCP client configuration.

Claude Code: add to your .claude/settings.json or project-level .mcp.json:

{
  "mcpServers": {
    "orion": {
      "command": "orion-cli",
      "args": ["mcp", "serve"],
      "env": {
        "ORION_SERVER_URL": "http://localhost:8080"
      }
    }
  }
}

Claude Desktop: add to your Claude Desktop configuration file:

{
  "mcpServers": {
    "orion": {
      "command": "orion-cli",
      "args": ["mcp", "serve"],
      "env": {
        "ORION_SERVER_URL": "http://localhost:8080"
      }
    }
  }
}

If admin authentication is enabled on your Orion instance, include the API key:

{
  "env": {
    "ORION_SERVER_URL": "http://localhost:8080",
    "ORION_API_KEY": "your-secret-key"
  }
}

Available Tools

The MCP server exposes the following tools to AI assistants:

ToolDescription
list_workflowsList all workflows with optional status/tag filters
create_workflowCreate a new workflow (as draft)
get_workflowGet workflow details by ID
update_workflowUpdate a draft workflow
activate_workflowActivate a draft workflow
archive_workflowArchive a workflow
test_workflowDry-run a workflow with sample data
validate_workflowValidate workflow structure
list_channelsList all channels
create_channelCreate a new channel
activate_channelActivate a draft channel
list_connectorsList connectors (secrets masked)
create_connectorCreate a new connector
engine_statusGet engine status
reload_engineHot-reload the engine
health_checkCheck Orion health

Usage Example

Once configured, you can use natural language to manage Orion:

“Create a workflow that parses incoming orders, flags any over $10,000, and adds a risk level field. Then create a channel called ‘orders’ that uses it.”

The AI assistant will use the MCP tools to:

  1. Create the workflow via create_workflow
  2. Test it with sample data via test_workflow
  3. Activate it via activate_workflow
  4. Create the channel via create_channel
  5. Activate the channel via activate_channel

Troubleshooting

MCP server not connecting:

  • Verify Orion is running: curl http://localhost:8080/health
  • Check the ORION_SERVER_URL environment variable is set correctly
  • Ensure orion-cli is in your PATH

Authentication errors:

  • Verify ORION_API_KEY matches your Orion instance’s admin_auth.api_key
  • Check that admin auth is enabled on the Orion instance if you’re passing a key

Use Cases & Patterns

Real-world examples showing how AI generates Orion workflows from natural language. Every example follows the same pattern: describe what you need → AI generates the workflow → create a channel → send data → get results.

E-Commerce Order Classification

Classify orders into tiers and compute discounts.

AI prompt:

Create a workflow for the "orders" channel that:
1. Parses the payload into "order"
2. Assigns tiers based on amount:
   - VIP: amount >= 500, discount 15%
   - Premium: amount 100-500, discount 5%
   - Standard: amount < 100, no discount

Generated workflow:

{
  "name": "Order Classification",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "order" }
    }},
    { "id": "vip_tier", "name": "Set VIP Tier",
      "condition": { ">=": [{ "var": "data.order.amount" }, 500] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "vip" },
        { "path": "data.order.discount_pct", "logic": 15 }
      ]}}
    },
    { "id": "premium_tier", "name": "Set Premium Tier",
      "condition": { "and": [
        { ">=": [{ "var": "data.order.amount" }, 100] },
        { "<": [{ "var": "data.order.amount" }, 500] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "premium" },
        { "path": "data.order.discount_pct", "logic": 5 }
      ]}}
    },
    { "id": "standard_tier", "name": "Set Standard Tier",
      "condition": { "<": [{ "var": "data.order.amount" }, 100] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.order.tier", "logic": "standard" },
        { "path": "data.order.discount_pct", "logic": 0 }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/orders \
  -H "Content-Type: application/json" \
  -d '{ "data": { "amount": 750, "product": "Diamond Ring" } }'

Response:

{
  "status": "ok",
  "data": {
    "order": { "amount": 750, "product": "Diamond Ring", "tier": "vip", "discount_pct": 15 }
  },
  "errors": []
}

Key patterns: Task-level conditions, computed output fields.

IoT Sensor Alert Classification

Classify sensor readings into severity levels using range-based conditions.

AI prompt:

Create a workflow for the "sensors" channel that classifies temperature readings:
- Critical: temperature > 90 or below 0, set alert flag
- Warning: temperature 70-90, set alert flag
- Normal: temperature 0-70, no alert
Parse the payload into "reading" and set severity and alert fields.

Generated workflow:

{
  "name": "Sensor Alert Pipeline",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "reading" }
    }},
    { "id": "critical", "name": "Mark Critical",
      "condition": { "or": [
        { ">": [{ "var": "data.reading.temperature" }, 90] },
        { "<": [{ "var": "data.reading.temperature" }, 0] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "critical" },
        { "path": "data.reading.alert", "logic": true }
      ]}}
    },
    { "id": "warning", "name": "Mark Warning",
      "condition": { "and": [
        { ">": [{ "var": "data.reading.temperature" }, 70] },
        { "<=": [{ "var": "data.reading.temperature" }, 90] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "warning" },
        { "path": "data.reading.alert", "logic": true }
      ]}}
    },
    { "id": "normal", "name": "Mark Normal",
      "condition": { "and": [
        { ">=": [{ "var": "data.reading.temperature" }, 0] },
        { "<=": [{ "var": "data.reading.temperature" }, 70] }
      ]},
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.reading.severity", "logic": "normal" },
        { "path": "data.reading.alert", "logic": false }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/sensors \
  -H "Content-Type: application/json" \
  -d '{ "data": { "temperature": 80, "sensor_id": "SENSOR-42" } }'

Response:

{
  "status": "ok",
  "data": {
    "reading": { "temperature": 80, "sensor_id": "SENSOR-42", "severity": "warning", "alert": true }
  },
  "errors": []
}
InputSeverityAlert
temperature: 45normalfalse
temperature: 80warningtrue
temperature: 95criticaltrue
temperature: -5criticaltrue

Key patterns: Range-based classification with and/or conditions, boolean flags.

Webhook Payload Transformation

Normalize incoming webhook payloads from different providers into a consistent internal schema.

AI prompt:

Create a workflow for the "webhooks" channel that normalizes webhook payloads from any provider:
- Map "origin" to "source"
- Map "type" to "event_type"
- Map "body" to "payload"
- Add a "processed" flag set to true
Output should be under data.normalized.

Generated workflow:

{
  "name": "Webhook Transform Pipeline",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "event" }
    }},
    { "id": "normalize", "name": "Normalize Schema", "function": {
        "name": "map", "input": { "mappings": [
          { "path": "data.normalized.source", "logic": { "var": "data.event.origin" } },
          { "path": "data.normalized.event_type", "logic": { "var": "data.event.type" } },
          { "path": "data.normalized.payload", "logic": { "var": "data.event.body" } },
          { "path": "data.normalized.processed", "logic": true }
        ]}
    }}
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/webhooks \
  -H "Content-Type: application/json" \
  -d '{ "data": { "origin": "github", "type": "push", "body": {"ref": "refs/heads/main"} } }'

Response:

{
  "status": "ok",
  "data": {
    "normalized": { "source": "github", "event_type": "push", "payload": {"ref": "refs/heads/main"}, "processed": true }
  },
  "errors": []
}

Missing optional fields produce null, no errors. This makes the pipeline safe for partial payloads from different webhook providers.

Key patterns: Schema mapping with var, null-safe field access, static enrichment.

Notification Routing

Route notifications to different delivery channels based on severity.

AI prompt:

Create a workflow for the "notifications" channel that routes by severity:
- Log all notifications
- Send email for anything except "low" severity
- Send SMS only for "high" and "critical" severity
Parse the payload into "notification".

Generated workflow:

{
  "name": "Notification Router",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "notification" }
    }},
    { "id": "log_all", "name": "Log All Notifications", "function": {
        "name": "map", "input": { "mappings": [
          { "path": "data.notification.logged", "logic": true }
        ]}
    }},
    { "id": "email", "name": "Send Email",
      "condition": { "!=": [{ "var": "data.notification.severity" }, "low"] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.notification.email_sent", "logic": true }
      ]}}
    },
    { "id": "sms", "name": "Send SMS for High/Critical",
      "condition": { "in": [{ "var": "data.notification.severity" }, ["high", "critical"]] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.notification.sms_sent", "logic": true }
      ]}}
    }
  ]
}

Send data:

curl -s -X POST http://localhost:8080/api/v1/data/notifications \
  -H "Content-Type: application/json" \
  -d '{ "data": { "message": "Disk usage at 92%", "severity": "high" } }'

Response:

{
  "status": "ok",
  "data": {
    "notification": { "message": "Disk usage at 92%", "severity": "high", "logged": true, "email_sent": true, "sms_sent": true }
  },
  "errors": []
}
SeverityLoggedEmailSMS
lowyesnono
mediumyesyesno
highyesyesyes
criticalyesyesyes

In production, replace the map tasks with http_call tasks pointing to your email and SMS connectors.

Key patterns: Task-level condition gating, in operator for set membership, progressive pipeline.

Compliance Risk Classification

Classify transactions by risk level and use dry-run testing to verify workflows before activating them.

AI prompt:

Create a workflow for the "compliance" channel that classifies transaction risk:
- High risk: amount > 10000, requires manual review
- Normal risk: amount <= 10000, no review needed
Parse the payload into "txn".

Generated workflow:

{
  "name": "Risk Classifier",
  "condition": true,
  "tasks": [
    { "id": "parse", "name": "Parse Payload", "function": {
        "name": "parse_json", "input": { "source": "payload", "target": "txn" }
    }},
    { "id": "high_risk", "name": "Flag High Risk",
      "condition": { ">": [{ "var": "data.txn.amount" }, 10000] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.txn.risk_level", "logic": "high" },
        { "path": "data.txn.requires_review", "logic": true }
      ]}}
    },
    { "id": "normal_risk", "name": "Normal Risk",
      "condition": { "<=": [{ "var": "data.txn.amount" }, 10000] },
      "function": { "name": "map", "input": { "mappings": [
        { "path": "data.txn.risk_level", "logic": "normal" },
        { "path": "data.txn.requires_review", "logic": false }
      ]}}
    }
  ]
}

Dry-run before going live:

curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<workflow-id>/test \
  -H "Content-Type: application/json" \
  -d '{"data": {"amount": 50000, "currency": "USD"}}'
{
  "matched": true,
  "trace": { "steps": [
    { "task_id": "parse", "result": "executed" },
    { "task_id": "high_risk", "result": "executed" },
    { "task_id": "normal_risk", "result": "skipped" }
  ]},
  "output": { "txn": { "amount": 50000, "currency": "USD", "risk_level": "high", "requires_review": true } }
}

The trace shows exactly which tasks ran and which were skipped. Verify the logic is correct before a single real transaction flows through.

Key patterns: Dry-run verification, execution trace inspection, regulatory workflow.

AI Workflow & CI/CD

AI writes workflows, not services. Instead of generating microservices that need their own governance, LLMs generate Orion workflows: constrained JSON that the platform validates, versions, and monitors automatically.

Prompt Templates

Structure your LLM prompts to produce valid Orion workflows. Here’s a reusable system prompt:

You generate Orion workflows in JSON format. Workflows have:
- name, condition (JSONLogic or true), continue_on_error (optional boolean)
- tasks: array of { id, name, condition (optional, JSONLogic), function: { name, input } }
- Every workflow starts with a parse_json task: { "name": "parse_json", "input": { "source": "payload", "target": "<entity>" } }
- Use "map" function with "mappings" array for transforms. Each mapping has "path" (dot notation) and "logic" (value or JSONLogic).
- Use "http_call" with "connector" (by name) for external API calls. Do not embed URLs or credentials in workflows.
- Use "channel_call" with "channel" (by name) for in-process inter-channel invocation.
- Task conditions use { "var": "data.<entity>.<field>" } to reference parsed data.

Output ONLY the JSON workflow. No explanation.

Validation Pipeline

Every AI-generated workflow should go through this pipeline before reaching production:

  1. Generate: use your LLM with the prompt template above
  2. Validate: POST /api/v1/admin/workflows/validate to check structure
  3. Create as draft: POST /api/v1/admin/workflows (workflows are created as drafts by default, not loaded into the engine)
  4. Dry-run: POST /api/v1/admin/workflows/{id}/test with representative test data
  5. Check the trace: verify the right tasks ran, the right ones were skipped, and output matches expectations
  6. Activate: PATCH /api/v1/admin/workflows/{id}/status with "status": "active"

CI/CD Pipeline

Integrate AI workflow generation into your deployment pipeline. Workflows are JSON files that version, diff, and review like any other config.

AI generates workflow → commit as JSON → CI runs dry-run → review → import

GitHub Actions example:

name: Validate AI Workflows
on:
  pull_request:
    paths: ['workflows/**/*.json']

jobs:
  validate:
    runs-on: ubuntu-latest
    services:
      orion:
        image: ghcr.io/goplasmatic/orion:latest
        ports: ['8080:8080']
    steps:
      - uses: actions/checkout@v4

      - name: Import workflows (as drafts)
        run: |
          for file in workflows/**/*.json; do
            curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
              -H "Content-Type: application/json" \
              -d @"$file"
          done

      - name: Dry-run test cases
        run: |
          for test in workflows/tests/**/*.json; do
            WORKFLOW_ID=$(jq -r '.workflow_id' "$test")
            DATA=$(jq -c '.data' "$test")
            EXPECTED=$(jq -c '.expected_output' "$test")

            RESULT=$(curl -s -X POST \
              "http://localhost:8080/api/v1/admin/workflows/${WORKFLOW_ID}/test" \
              -H "Content-Type: application/json" \
              -d "$DATA")

            OUTPUT=$(echo "$RESULT" | jq -c '.output')
            if [ "$OUTPUT" != "$EXPECTED" ]; then
              echo "FAIL: $test"
              echo "Expected: $EXPECTED"
              echo "Got: $OUTPUT"
              exit 1
            fi
          done

      - name: Deploy to production
        if: github.event_name == 'push' && github.ref == 'refs/heads/main'
        run: |
          for file in workflows/**/*.json; do
            curl -s -X POST "${{ secrets.ORION_URL }}/api/v1/admin/workflows" \
              -H "Content-Type: application/json" \
              -d @"$file"
          done

Test case format: store test cases alongside workflows:

workflows/
  fraud-detection.json       # The workflow
  tests/
    fraud-high-risk.json     # Test case
    fraud-clear.json         # Test case

Each test case:

{
  "workflow_id": "fraud-detection",
  "data": { "data": { "amount": 15000, "country": "US" } },
  "expected_output": { "order": { "amount": 15000, "risk": "high", "requires_review": true } }
}

Safety Guardrails

AI-generated workflows get the same governance as hand-written ones:

  • Version history: every workflow change is recorded. Roll back if an AI-generated workflow misbehaves.
  • Draft status: workflows are created as draft by default and are not loaded into the engine until explicitly activated.
  • Dry-run before activate: test with representative data and inspect the full execution trace.
  • Audit trail: every workflow version is recorded in the workflows table with incrementing version numbers.
  • Connectors isolate secrets: AI generates workflows that reference connector names, never credentials.

Common Workflow Patterns

The parse-then-process pattern

Every workflow that reads input data must start with parse_json. Without it, task conditions referencing data.X see empty context.

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
    { "id": "process", "condition": { ">": [{ "var": "data.order.total" }, 100] }, "function": { "..." : "..." } }
  ]
}

Task-level vs workflow-level conditions

  • Workflow-level condition: determines whether the entire workflow matches. Set to true for workflows that always run.
  • Task-level condition: determines whether a specific task within a matched workflow executes. Use for branching logic within a pipeline.
{
  "condition": true,
  "tasks": [
    { "id": "always", "function": { "..." : "..." } },
    { "id": "conditional", "condition": { ">": [{ "var": "data.amount" }, 500] }, "function": { "..." : "..." } }
  ]
}

External API calls with connectors

Keep credentials in connectors, reference them by name in workflows:

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "event" } } },
    { "id": "notify", "function": { "name": "http_call", "input": {
        "connector": "slack-webhook",
        "method": "POST",
        "body_logic": { "var": "data.event" }
    }}}
  ]
}

Inter-channel composition with channel_call

Invoke another channel’s workflow in-process for service composition:

{
  "tasks": [
    { "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
    { "id": "enrich", "function": { "name": "channel_call", "input": {
        "channel": "customer-lookup",
        "data_logic": { "var": "data.order.customer_id" },
        "response_path": "data.customer"
    }}},
    { "id": "process", "condition": { "==": [{ "var": "data.customer.tier" }, "vip"] }, "function": { "..." : "..." } }
  ]
}

Config Reference

All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.

CLI Commands

orion-server                              # Start the server (default)
orion-server -c config.toml               # Start with a config file
orion-server validate-config              # Validate config without starting
orion-server validate-config -c config.toml  # Validate a specific config file
orion-server migrate                      # Run database migrations
orion-server migrate --dry-run            # Preview pending migrations

Database Backend

The database backend is selected at runtime from the storage.url scheme. No rebuild needed:

BackendURL FormatExample
SQLitesqlite:sqlite:orion.db or sqlite::memory:
PostgreSQLpostgres://postgres://user:pass@host/db
MySQLmysql://mysql://user:pass@host/db
# SQLite (default)
orion-server

# PostgreSQL
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server

# MySQL
ORION_STORAGE__URL="mysql://user:pass@localhost/orion" orion-server

Migrations for all backends are embedded in the binary and the correct set is selected automatically at startup.

Complete Config File

[server]
host = "0.0.0.0"
port = 8080
# shutdown_drain_secs = 30     # HTTP connection drain timeout on shutdown

# [server.tls]
# enabled = false
# cert_path = "cert.pem"
# key_path = "key.pem"

[storage]
url = "sqlite:orion.db"       # Database URL (sqlite:, postgres://, mysql://)
# max_connections = 25          # Connection pool max connections
# min_connections = 5           # Connection pool min connections
# busy_timeout_ms = 5000        # SQLite busy timeout (ignored for other backends)
# acquire_timeout_secs = 5      # Connection pool acquire timeout
# idle_timeout_secs = 0         # Connection idle timeout (0 = no timeout)

[ingest]
max_payload_size = 1048576     # Maximum payload size in bytes (1 MB)

[engine]
# health_check_timeout_secs = 2   # Timeout for engine read lock in health checks
# reload_timeout_secs = 10        # Timeout for engine write lock during reload
# max_channel_call_depth = 10     # Max recursion depth for channel_call
# default_channel_call_timeout_ms = 30000  # Default timeout for channel_call
# global_http_timeout_secs = 30   # Global timeout for HTTP client
# max_pool_cache_entries = 100    # Max cached connection pools for external connectors
# cache_cleanup_interval_secs = 60  # Cleanup interval for idle connection pools

# [engine.circuit_breaker]
# enabled = false                 # Enable circuit breakers for connectors
# failure_threshold = 5           # Failures before tripping the breaker
# recovery_timeout_secs = 30      # Cooldown before half-open probe
# max_breakers = 10000            # Max circuit breaker instances (LRU eviction)

[queue]
workers = 4                    # Concurrent async trace workers
buffer_size = 1000             # Channel buffer for pending traces
# shutdown_timeout_secs = 30    # Timeout for in-flight jobs during shutdown
# trace_retention_hours = 72    # How long to keep completed traces (0 = disabled)
# trace_cleanup_interval_secs = 3600  # Cleanup check interval
# processing_timeout_ms = 60000  # Per-trace processing timeout
# max_result_size_bytes = 1048576  # Max size of trace result (1 MB)
# max_queue_memory_bytes = 104857600  # Max memory for queued traces (100 MB)
# dlq_retry_enabled = true       # Enable dead letter queue retry
# dlq_max_retries = 5            # Max retry attempts for DLQ entries
# dlq_poll_interval_secs = 30    # DLQ retry poll interval

[rate_limit]
# enabled = false               # Enable platform-level rate limiting
# default_rps = 100             # Default requests per second
# default_burst = 50            # Default burst allowance

# [rate_limit.endpoints]
# admin_rps = 50                # Rate limit for admin routes
# data_rps = 200                # Rate limit for data routes

[admin_auth]
# enabled = false               # Require authentication for admin endpoints
# api_key = "your-secret-key"   # The API key or bearer token
# header = "Authorization"      # "Authorization" = Bearer format, other = raw key

[cors]
# allowed_origins = ["*"]       # Global CORS allowed origins

[kafka]
enabled = false
brokers = ["localhost:9092"]
group_id = "orion"
# processing_timeout_ms = 60000  # Per-message processing timeout
# max_inflight = 100             # Max in-flight messages
# lag_poll_interval_secs = 30    # Consumer lag poll interval

[[kafka.topics]]               # Map Kafka topics to channels
topic = "incoming-orders"
channel = "orders"

[kafka.dlq]
enabled = false
topic = "orion-dlq"

[logging]
level = "info"                 # trace, debug, info, warn, error
format = "pretty"              # pretty or json

[metrics]
enabled = false

[tracing]
# enabled = false                   # Enable OpenTelemetry trace export
# otlp_endpoint = "http://localhost:4317"  # OTLP gRPC endpoint
# service_name = "orion"            # Service name in traces
# sample_rate = 1.0                 # 0.0 (none) to 1.0 (all)

[channels]                          # Control which channels this instance loads
# include = ["orders.*", "payments.*"]   # Glob patterns to include (empty = all)
# exclude = ["analytics.*"]              # Glob patterns to exclude

Environment Variable Overrides

Override any setting with environment variables using double-underscore nesting:

ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_RATE_LIMIT__ENABLED=true
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEY="your-secret-key"
ORION_TRACING__ENABLED=true
ORION_TRACING__OTLP_ENDPOINT="http://jaeger:4317"
ORION_METRICS__ENABLED=true

Environment variables take precedence over config file values.

Built-in Capabilities

All capabilities are compiled into a single binary and controlled at runtime:

CapabilityConfigurationDefault
Database backendstorage.url schemeSQLite
Kafkakafka.enabledDisabled
OpenTelemetrytracing.enabledDisabled
TLS/HTTPSserver.tls.enabledDisabled
Swagger UIAlways at /docsEnabled
SQL connectorsdb_read/db_write functionsAlways available
Redis cachecache_read/cache_write with Redis backendAlways available
MongoDB connectormongo_read functionAlways available
Rate limitingrate_limit.enabledDisabled
Metricsmetrics.enabledDisabled

Production Checklist

  • Mount a persistent volume for orion.db (SQLite) or configure storage.url for PostgreSQL/MySQL
  • Enable admin API authentication: ORION_ADMIN_AUTH__ENABLED=true
  • Set structured logging: ORION_LOGGING__FORMAT=json
  • Enable Prometheus metrics: ORION_METRICS__ENABLED=true
  • Configure rate limiting for traffic protection (platform-level and per-channel)
  • Use per-crate filtering: RUST_LOG=orion=info
  • Enable OpenTelemetry: ORION_TRACING__ENABLED=true for distributed tracing
  • Enable TLS via the server.tls section
  • Enable circuit breakers: engine.circuit_breaker.enabled = true
  • Set trace retention: queue.trace_retention_hours for trace data lifecycle management