Introduction
Orion
The declarative services runtime for the AI era
Orion is a declarative services runtime written in Rust. Instead of writing, deploying, and operating a microservice for every piece of business logic, you declare what the service should do, and Orion runs it. Architectural governance — observability, rate limiting, circuit breakers, versioning, input validation, and more — is built in.
AI generates workflows, Orion provides the governance. The platform guarantees that every service gets health checks, metrics, retries, and error handling, regardless of how the workflow was created.
Is Orion Right for You?
| If you need to… | Orion? | Why |
|---|---|---|
| Turn business rules into live REST/Kafka services | Yes | Define logic as JSON workflows, deploy with one API call |
| Let AI generate and manage business logic | Yes | Built-in validation, dry-run testing, and draft-before-activate safety |
| Replace a handful of single-purpose microservices | Yes | One instance handles many channels, governance included |
| Use a rule engine like Drools | Not quite | Orion uses JSONLogic. Lightweight and AI-friendly, but not a full RETE-based rule engine |
| Embed a workflow engine library in your app | No | Orion is a standalone runtime. For an embeddable engine, see dataflow-rs |
| Orchestrate long-running jobs (hours/days) | No | Use Temporal or Airflow. Orion is optimized for request-response and event processing |
| Run a full API gateway with plugin ecosystem | No | Use Kong or Envoy. Orion focuses on service logic, not proxy features |
Three Primitives
You build services in Orion with three things:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Channel │──────▶│ Workflow │──────▶│ Connector │
│ (endpoint) │ │ (logic) │ │ (external) │
└─────────────┘ └──────────────┘ └─────────────┘
| Primitive | What it is | Example |
|---|---|---|
| Channel | A service endpoint: sync (REST, HTTP) or async (Kafka) | POST /orders, GET /users/{id}, Kafka topic order.placed |
| Workflow | A pipeline of tasks that defines what the service does | Parse → validate → enrich → transform → respond |
| Connector | A named connection to an external system, with auth and retries | Stripe API, PostgreSQL, Redis, Kafka cluster |
Design-time: define channels, build workflows, configure connectors, test with dry-run, manage versions, all through the admin API.
Runtime: Orion routes traffic to channels, executes workflows, calls connectors, and handles observability automatically.
Your First Service in 2 Minutes
No code. No Dockerfile. No CI pipeline.
1. Start Orion
brew install GoPlasmatic/tap/orion-server # or: curl installer, cargo install
orion-server
2. Create a workflow (AI-generated or hand-written)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
-H "Content-Type: application/json" \
-d '{
"workflow_id": "high-value-order",
"name": "High-Value Order",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse payload", "function": {
"name": "parse_json",
"input": { "source": "payload", "target": "order" }
}},
{ "id": "flag", "name": "Flag order",
"condition": { ">": [{ "var": "data.order.total" }, 10000] },
"function": {
"name": "map",
"input": { "mappings": [
{ "path": "data.order.flagged", "logic": true },
{ "path": "data.order.alert", "logic": {
"cat": ["High-value order: $", { "var": "data.order.total" }]
}}
]}
}}
]
}'
# Activate it
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/high-value-order/status \
-H "Content-Type: application/json" -d '{"status": "active"}'
3. Create a channel (the service endpoint)
curl -s -X POST http://localhost:8080/api/v1/admin/channels \
-H "Content-Type: application/json" \
-d '{ "channel_id": "orders", "name": "orders", "channel_type": "sync",
"protocol": "http", "route_pattern": "/orders",
"methods": ["POST"], "workflow_id": "high-value-order" }'
# Activate
curl -s -X PATCH http://localhost:8080/api/v1/admin/channels/orders/status \
-H "Content-Type: application/json" -d '{"status": "active"}'
4. Send a request — your service is live
curl -s -X POST http://localhost:8080/api/v1/data/orders \
-H "Content-Type: application/json" \
-d '{ "data": { "order_id": "ORD-9182", "total": 25000 } }'
{
"status": "ok",
"data": {
"order": {
"order_id": "ORD-9182",
"total": 25000,
"flagged": true,
"alert": "High-value order: $25000"
}
}
}
That’s it. Rate limiting, metrics, health checks, and request tracing are already active. Change the threshold? One API call. No rebuild, no redeploy, no restart.
What’s Built In
Every channel gets production-grade features without writing a line of code:
| Feature | What it does | Configuration |
|---|---|---|
| Rate limiting | Throttle requests per client or globally | requests_per_second, burst, JSONLogic key |
| Timeouts | Cancel slow workflows, return 504 | timeout_ms per channel |
| Input validation | Reject bad requests at the boundary | JSONLogic with headers, query, path access |
| Backpressure | Shed load when overwhelmed, return 503 | max_concurrent (semaphore-based) |
| CORS | Control browser cross-origin access | allowed_origins per channel |
| Circuit breakers | Stop cascading failures to external services | Automatic per connector, admin API to inspect/reset |
| Versioning | Draft → active → archived lifecycle | Automatic version history, rollout percentages |
| Observability | Prometheus metrics, structured logs, distributed tracing | Always on, zero configuration |
| Health checks | Component-level status with degradation detection | GET /health, automatic |
| Deduplication | Prevent duplicate processing via idempotency keys | Idempotency-Key header, configurable window |
| Response caching | Cache responses for identical requests | TTL-based, configurable cache key fields |
Performance
7K+ workflow requests/sec on a single instance (Apple M2 Pro, release build, 50 concurrent connections):
| Scenario | Req/sec | Avg Latency | P99 Latency |
|---|---|---|---|
| Simple workflow (1 task) | 7,417 | 6.70 ms | 16.80 ms |
| Complex workflow (4 tasks) | 7,044 | 7.00 ms | 23.50 ms |
| 12 workflows on one channel | 6,894 | 7.20 ms | 17.30 ms |
Pre-compiled JSONLogic, zero-downtime hot-reload, lock-free reads, SQLite WAL mode, async-first on Tokio.
Architecture Overview
Three Primitives
Services in Orion are composed from three building blocks:
| Primitive | Role | Examples |
|---|---|---|
| Channel | Service endpoint: sync (REST, HTTP) or async (Kafka) | POST /orders, GET /users/{id}, Kafka topic order.placed |
| Workflow | Pipeline of tasks that defines what the service does | Parse → validate → enrich → transform → respond |
| Connector | Named connection to an external system with auth and retries | Stripe API, PostgreSQL, Redis, Kafka cluster |
Channels receive traffic. Workflows process it. Connectors reach out to external systems. Everything else (rate limiting, metrics, circuit breakers, versioning) is handled by the platform.
Deployment Topology
Before Orion
Every piece of business logic is its own service to build, deploy, and operate, each with its own infrastructure stack:
4 services x (code + Dockerfile + CI pipeline + health checks + metrics agent + log agent + sidecar proxy + scaling policy + secret config + canary rollout) = dozens of components to build, wire, and keep running.
After Orion
One Orion instance replaces all four:
No API gateway needed. Governance is built in. One binary to deploy.
The best of both worlds: each channel and workflow is independently versioned, testable, and deployable. The modularity of microservices with the operational simplicity of a monolith.
Deploy Anywhere
Single binary. SQLite by default, no database to provision, no runtime dependencies. Need more scale? Swap to PostgreSQL or MySQL by changing storage.url. No rebuild needed.
Same channel definitions work in any topology: run everything in one instance, split channels across instances with include/exclude filters, or deploy as sidecars.
Request Processing Flow
- Route Resolution: REST pattern matching finds the channel, or falls back to name lookup
- Channel Registry: enforces deduplication, rate limits, input validation, backpressure, and checks the response cache
- Engine: the workflow engine sits behind a double-Arc (
Arc<RwLock<Arc<Engine>>>) allowing zero-downtime swaps - Workflow Matcher: evaluates JSONLogic conditions and rollout percentages to pick the right workflow
- Task Pipeline: executes functions in order (parse, map, filter, http_call, db_read, etc.)
Sync and Async
Sync POST /api/v1/data/{channel} → immediate response
Async POST /api/v1/data/{channel}/async → returns trace_id, poll later
REST GET /api/v1/data/orders/{id} → matched by route pattern
Kafka topic: order.placed → consumed automatically
Sync channels respond immediately. Async channels return a trace ID; poll GET /api/v1/data/traces/{id} for results. Kafka channels consume from topics configured in the DB or config file.
Bridging is a pattern, not a feature. A sync workflow can publish_kafka and return 202. An async channel picks it up from there.
Service Composition
Most platforms require HTTP calls between services, adding latency, failure modes, and serialization overhead. Orion’s channel_call invokes another channel’s workflow in-process with zero network round-trip:
POST /orders (order-processing workflow)
├── parse_json → extract order data
├── channel_call → "inventory-check" channel (in-process)
├── channel_call → "customer-lookup" channel (in-process)
├── map → compute pricing with enriched data
└── publish_json → return combined result
Each composed channel has its own workflow, versioning, and governance, but calls between them are function calls, not network hops. Cycle detection prevents infinite recursion.
Built-in Task Functions
| Function | Description |
|---|---|
parse_json | Parse payload into the data context |
parse_xml | Parse XML payloads into structured JSON |
filter | Allow or halt processing based on JSONLogic conditions |
map | Transform and reshape JSON using JSONLogic expressions |
validation | Enforce required fields and constraints |
http_call | Invoke downstream APIs via connectors |
channel_call | Invoke another channel’s workflow in-process |
db_read / db_write | Execute SQL queries, return rows/affected count |
cache_read / cache_write | Read/write to in-memory or Redis cache |
mongo_read | Query MongoDB collections |
publish_json / publish_xml | Serialize data to JSON or XML output |
publish_kafka | Publish messages to Kafka topics |
log | Emit structured log entries |
Architectural Characteristics
Orion provides production-grade capabilities across eight architectural dimensions. Each subcategory below links to its detailed documentation.
C Creational · S Structural · B Behavioral
Click a node to expand its capabilities. Click any leaf node to jump to its documentation.
Observability — S
| Area | Capabilities |
|---|---|
| Structured Logging | JSON & pretty-print formats · Configurable log levels · Per-request context · Per-crate filtering |
| Prometheus Metrics | Request counters & error rates · Latency histograms · Circuit breaker metrics · Rate limit rejections |
| Distributed Tracing | W3C Trace Context · OpenTelemetry OTLP export · Configurable sampling rate · Per-task span tracking |
| Health Monitoring | Component-level health checks · Automatic degradation · Request ID propagation · Kubernetes liveness & readiness probes |
Resilience — S
| Area | Capabilities |
|---|---|
| Circuit Breakers | Lock-free state machine · Per-connector isolation · Auto-recovery after cooldown · Admin API to inspect & reset |
| Retry & Backoff | Exponential backoff (capped 60 s) · Configurable max retries · Retryable error detection |
| Timeouts | Per-channel enforcement · Workflow execution limits · Per-connector query timeout |
| Fault Tolerance | Graceful shutdown (SIGTERM/SIGINT) · Connection draining · Dead letter queue with retry · Panic recovery middleware |
Security — B
| Area | Capabilities |
|---|---|
| Secret Management | Auto-masked API responses · Credential isolation via connectors |
| Input Validation | Per-channel JSONLogic rules · Payload size limits · Header & query param access |
| Network Security | SSRF protection (private IP blocking) · TLS/HTTPS support · Security headers (CSP, X-Frame-Options) |
| Access Control | Admin API authentication · Per-channel CORS enforcement · Origin allowlist |
| Data Safety | Parameterized SQL queries · Injection protection · URL validation |
Scalability — C
| Area | Capabilities |
|---|---|
| Rate Limiting | Token bucket algorithm · Per-client keying via JSONLogic · Platform & per-channel limits |
| Backpressure | Semaphore concurrency limits · 503 load shedding · Per-channel configuration |
| Async Processing | Multi-worker trace queue · Bounded buffer channels · DLQ retry processor |
| Horizontal Scaling | Stateless instances · Channel include/exclude filters · Multi-database backends |
Deployability — C
| Area | Capabilities |
|---|---|
| Packaging | Single binary · SQLite, PostgreSQL, MySQL · Minimal footprint |
| Containerization | Multi-stage Docker build · Non-root execution · Built-in health probes |
| Configuration | TOML + env var overrides · Sensible defaults · Runtime configuration |
| Distribution | Homebrew tap · Shell & PowerShell installers · Multi-platform binaries |
Extensibility — S
| Area | Capabilities |
|---|---|
| Connectors | HTTP & Webhooks · Kafka pub/sub · Database (SQL) · Cache (Memory & Redis) · Storage (S3/GCS) · MongoDB (NoSQL) |
| Custom Functions | Async function handlers · Built-in function library · JSONLogic expressions |
| Channel Protocols | REST with route matching (sync) · Simple HTTP (sync) · Kafka (async) |
Availability — C
| Area | Capabilities |
|---|---|
| Hot-Reload | Zero-downtime engine swap · Channel registry rebuild · Kafka consumer restart |
| Canary Rollouts | Percentage-based traffic split · Gradual migration · Instant rollback |
| Versioning | Draft / Active / Archived lifecycle · Multi-version history · Workflow import & export |
| Performance | Response caching · Request deduplication · Connection pool caching |
Maintainability — B
| Area | Capabilities |
|---|---|
| Admin APIs | Full CRUD for all entities · Version management · Engine control · OpenAPI / Swagger UI |
| CI/CD Integration | Bulk import & export · Pre-deploy validation · GitOps-friendly |
| Testing | Dry-run execution · Workflow validation · Step-by-step traces |
| Operations | Audit logging · Database backup & restore · Config validation CLI |
Observability
Orion provides structured logging, Prometheus metrics, distributed tracing, and health monitoring out of the box. No sidecars, no agents. Everything runs inside the single binary.
Structured Logging
Orion emits structured logs in JSON or pretty-printed format, configurable at runtime:
[logging]
level = "info" # trace, debug, info, warn, error
format = "pretty" # pretty or json
JSON format is recommended for production. It integrates directly with log aggregators like Loki, Datadog, or CloudWatch:
ORION_LOGGING__FORMAT=json
ORION_LOGGING__LEVEL=info
Per-crate filtering with RUST_LOG gives fine-grained control:
RUST_LOG=orion=debug,tower_http=warn,sqlx=warn
| Level | Usage |
|---|---|
error | Failures that need attention |
warn | Degraded behavior (circuit breakers, retries) |
info | Request lifecycle, engine reloads, startup/shutdown |
debug | Detailed processing, SQL queries, connector calls |
trace | Fine-grained internal state |
Every request carries a UUID x-request-id header. Pass your own or let Orion generate one. The ID propagates through logs and responses for end-to-end correlation.
Prometheus Metrics
Enable metrics and scrape at GET /metrics (Prometheus text format):
[metrics]
enabled = true
| Metric | Type | Labels | Description |
|---|---|---|---|
messages_total | Counter | channel, status | Total messages processed |
message_duration_seconds | Histogram | channel | Processing latency |
active_workflows | Gauge | — | Workflows loaded in engine |
errors_total | Counter | type | Errors encountered |
http_requests_total | Counter | method, path, status | HTTP requests served |
http_request_duration_seconds | Histogram | method, path, status | HTTP request latency |
db_query_duration_seconds | Histogram | operation | Database query latency |
engine_reloads_total | Counter | status | Engine reload events |
engine_reload_duration_seconds | Histogram | — | Engine reload latency |
circuit_breaker_trips_total | Counter | connector, channel | Circuit breaker trip events |
circuit_breaker_rejections_total | Counter | connector, channel | Requests rejected by open breakers |
channel_executions_total | Counter | channel | Channel invocations |
rate_limit_rejections_total | Counter | client | Rate-limited requests |
Distributed Tracing
Enable OpenTelemetry trace export with OTLP gRPC:
[tracing]
enabled = true
otlp_endpoint = "http://localhost:4317"
service_name = "orion"
sample_rate = 1.0 # 0.0 (none) to 1.0 (all)
- W3C Trace Context extraction and propagation: incoming
traceparentheaders are respected - Per-request spans with channel, workflow, and task attributes
- OTLP gRPC export to Jaeger, Tempo, or any compatible collector
- Configurable sampling rate for production use
- Trace context injected into outbound
http_callrequests for full distributed traces
Health Monitoring
Orion exposes three health endpoints for different operational needs.
Component health: GET /health returns component-level status with automatic degradation detection:
{
"status": "ok",
"version": "0.1.0",
"uptime_seconds": 3600,
"workflows_loaded": 42,
"components": {
"database": "ok",
"engine": "ok"
}
}
The health check tests the database with SELECT 1 and verifies engine availability with a configurable lock timeout. If either check fails, the endpoint returns 503 Service Unavailable with "status": "degraded".
Kubernetes probes:
| Endpoint | Purpose | Behavior |
|---|---|---|
GET /healthz | Liveness probe | Always returns 200. If the process is running, it’s alive |
GET /readyz | Readiness probe | Returns 200 only when DB is reachable, engine is loaded, and startup is complete; 503 otherwise |
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
httpGet:
path: /readyz
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
Engine status: GET /api/v1/admin/engine/status returns a detailed breakdown:
{
"version": "0.1.0",
"uptime_seconds": 3600,
"workflows_count": 42,
"active_workflows": 38,
"channels": ["orders", "events", "alerts"]
}
Resilience
Orion protects your services from cascading failures, transient errors, and overload with circuit breakers, automatic retries, timeouts, and graceful degradation, all built into the runtime.
Circuit Breakers
Every connector gets automatic circuit breaker protection. When failures exceed a threshold, the breaker opens and short-circuits requests to prevent cascading failures.
| State | Behavior |
|---|---|
| Closed | Normal operation; requests flow through |
| Open | Requests rejected immediately (503) after failure threshold exceeded |
| Half-Open | After cooldown, one probe request allowed to test recovery |
The circuit breaker uses a lock-free state machine with per-connector isolation. Configure globally:
[engine.circuit_breaker]
enabled = true
failure_threshold = 5 # Failures before tripping the breaker
recovery_timeout_secs = 30 # Cooldown before half-open probe
max_breakers = 10000 # Max breaker instances (LRU eviction)
Inspect and reset breakers via the admin API:
# List all circuit breaker states
curl -s http://localhost:8080/api/v1/admin/connectors/circuit-breakers
# Reset a specific breaker
curl -s -X POST http://localhost:8080/api/v1/admin/connectors/circuit-breakers/{key}
Retry-Backoff
All HTTP connectors support automatic retries with exponential backoff, capped at 60 seconds:
{
"retry": {
"max_retries": 5,
"retry_delay_ms": 500
}
}
Delay doubles on each retry: 500ms → 1s → 2s → 4s → … → capped at 60s.
Retries are configured per-connector, so each external service can have its own retry policy. The mechanism automatically detects retryable errors (network failures, 5xx responses) and skips non-retryable ones (4xx client errors).
All connector types (HTTP, DB, Cache, MongoDB, Storage) support the same retry configuration.
Timeouts
Timeouts are enforced at multiple levels to prevent runaway requests:
Per-channel timeout: set in the channel’s config_json to limit workflow execution time:
{
"timeout_ms": 5000
}
If the workflow exceeds this limit, the request returns 504 Gateway Timeout.
Per-connector query timeout: for database connectors:
{
"query_timeout_ms": 30000,
"connect_timeout_ms": 5000
}
Global HTTP timeout: for the shared HTTP client used by http_call:
[engine]
global_http_timeout_secs = 30
Engine lock timeouts: prevent health checks and reloads from blocking indefinitely:
[engine]
health_check_timeout_secs = 2
reload_timeout_secs = 10
Fault Tolerance
Graceful shutdown: Orion handles SIGTERM and SIGINT with a controlled shutdown sequence:
- HTTP server stops accepting new connections
- In-flight requests drain (configurable via
shutdown_drain_secs, default 30s) - Kafka consumer (if enabled) is signaled to stop
- Trace cleanup task is stopped
- DLQ retry consumer is stopped
- Async trace queue drains with timeout
- OpenTelemetry spans are flushed (if enabled)
- Process exits
Dead letter queue: failed async traces are stored in the trace_dlq database table with automatic retry:
[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30
For Kafka, failed messages can also be routed to a configurable DLQ topic:
[kafka.dlq]
enabled = true
topic = "orion-dlq"
Fault-tolerant pipelines: set continue_on_error: true on a workflow to keep the task pipeline running even if individual tasks fail. Errors are collected in the response rather than halting execution:
{
"status": "ok",
"data": { "req": { "action": "test-call" } },
"errors": [
{ "code": "TASK_ERROR", "task_id": "call", "message": "HTTP request failed..." }
]
}
Panic recovery: the outermost middleware layer (CatchPanicLayer) catches panics in any handler, returning a 500 response instead of crashing the process.
Security
Orion enforces security at every layer: secrets are isolated in connectors, inputs are validated before processing, network requests are checked for SSRF, and admin endpoints are protected by authentication.
Secret Management
Sensitive fields are automatically masked in all API responses. Fields named token, password, key, secret, api_key, and connection_string are returned as "******". Secrets are stored but never exposed through the API.
# Create a connector with real credentials
curl -s -X POST http://localhost:8080/api/v1/admin/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "payments-api",
"connector_type": "http",
"config": {
"type": "http",
"url": "https://api.stripe.com/v1",
"auth": { "type": "bearer", "token": "sk-live-secret-token" }
}
}'
# Read it back (secrets are masked)
curl -s http://localhost:8080/api/v1/admin/connectors/<id>
# auth.token → "******"
Workflows reference connectors by name ("connector": "payments-api"). They never see or embed actual credentials. This means AI-generated workflows can be safely created and shared without risk of credential exposure.
Input Validation
Each channel can define JSONLogic validation rules evaluated against incoming requests before workflow execution:
{
"validation_logic": {
"and": [
{ "!!": [{ "var": "data.order_id" }] },
{ ">": [{ "var": "data.amount" }, 0] }
]
}
}
If validation fails, the request is rejected with 400 Bad Request before any workflow logic runs.
Validation rules have access to:
data.*: request body fieldsheaders.*: HTTP headersquery.*: query string parameterspath.*: path parameters (for REST channels)
Payload size limits are enforced globally to prevent oversized requests:
[ingest]
max_payload_size = 1048576 # 1 MB
Network Security
SSRF protection: HTTP connectors validate URLs to prevent Server-Side Request Forgery. By default, requests to private/internal IP addresses (RFC 1918, loopback, link-local) are blocked:
{
"name": "external-api",
"connector_type": "http",
"config": {
"type": "http",
"url": "https://api.example.com",
"allow_private_urls": false
}
}
Set allow_private_urls: true only when calling internal services.
TLS/HTTPS: enable TLS termination in the server:
[server.tls]
enabled = true
cert_path = "cert.pem"
key_path = "key.pem"
Security headers: set on all responses:
| Header | Value |
|---|---|
X-Content-Type-Options | nosniff |
X-Frame-Options | DENY |
Content-Security-Policy | default-src 'none'; frame-ancestors 'none' |
Referrer-Policy | strict-origin-when-cross-origin |
Permissions-Policy | camera=(), microphone=(), geolocation=() |
Strict-Transport-Security | Set when TLS is enabled |
Access Control
Admin API authentication: protect admin endpoints with bearer token or API key:
[admin_auth]
enabled = true
api_key = "your-secret-key"
# header = "Authorization" # Bearer format (default)
# header = "X-API-Key" # Raw key format
When header is "Authorization", the key is expected as Bearer <key>. For any other header name, the raw key value is matched directly.
# Bearer token
curl -H "Authorization: Bearer your-secret-key" \
http://localhost:8080/api/v1/admin/workflows
# API key via custom header
curl -H "X-API-Key: your-secret-key" \
http://localhost:8080/api/v1/admin/workflows
Per-channel CORS: configure allowed origins per channel in config_json:
{
"cors": {
"allowed_origins": ["https://app.example.com", "https://admin.example.com"]
}
}
Global CORS defaults are configured in the server config:
[cors]
allowed_origins = ["*"] # Global default
Data Safety
Parameterized SQL queries: the db_read and db_write functions use parameterized queries to prevent SQL injection:
{
"function": {
"name": "db_read",
"input": {
"connector": "orders-db",
"query": "SELECT * FROM orders WHERE customer_id = $1",
"params": [{ "var": "data.customer_id" }],
"output": "data.orders"
}
}
}
Values are always passed as parameters, never interpolated into SQL strings.
URL validation: connector URLs are validated at creation time. Combined with SSRF protection, this prevents workflows from making requests to unexpected destinations.
Injection protection: JSONLogic expressions are evaluated in a sandboxed environment. User-supplied data cannot escape the data context or execute arbitrary code.
Scalability
Orion handles high-throughput workloads with token-bucket rate limiting, semaphore-based backpressure, async processing queues, and stateless horizontal scaling, all configurable per channel.
Rate Limiting
Rate limiting operates at two levels: platform-wide (all requests) and per-channel (individual service endpoints).
Platform-level: enable in config:
[rate_limit]
enabled = true
default_rps = 100
default_burst = 50
[rate_limit.endpoints]
admin_rps = 50
data_rps = 200
Per-channel: configure in the channel’s config_json:
{
"rate_limit": {
"requests_per_second": 100,
"burst": 50
}
}
Rate limiting uses the token bucket algorithm: tokens replenish at the configured rate, and burst allows short spikes above the steady-state limit. When the bucket is empty, requests receive 429 Too Many Requests.
Per-client keying: use JSONLogic to compute rate limit keys from request data, enabling per-user or per-tenant limits:
{
"rate_limit": {
"requests_per_second": 10,
"burst": 5,
"key_logic": { "var": "headers.x-api-key" }
}
}
Rate limiter state is per-instance (in-memory). In multi-instance deployments, divide the configured RPS by the number of instances to approximate global limits, or use sticky sessions at the load balancer.
Backpressure
Semaphore-based concurrency limits prevent any single channel from overwhelming the system:
{
"backpressure": {
"max_concurrent": 200
}
}
When all semaphore permits are taken, additional requests receive 503 Service Unavailable immediately. This is load shedding. The system sheds excess load rather than queuing unboundedly, which protects latency for requests that are admitted.
Each channel has its own independent backpressure semaphore, so a spike in one channel doesn’t affect others.
Async Processing
For workloads that don’t need immediate responses, Orion supports async processing via a bounded trace queue:
# Submit for async processing (returns immediately with a trace ID)
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
-H "Content-Type: application/json" \
-d '{ "data": { "order_id": "ORD-123" } }'
# Poll for the result
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}
The queue is backed by tokio::sync::mpsc channels with configurable concurrency:
[queue]
workers = 4 # Concurrent trace workers
buffer_size = 1000 # Channel buffer for pending traces
processing_timeout_ms = 60000 # Per-trace processing timeout
max_result_size_bytes = 1048576 # Max size of trace result (1 MB)
max_queue_memory_bytes = 104857600 # Max memory for queued traces (100 MB)
Failed traces go to the dead letter queue with automatic retry:
[queue]
dlq_retry_enabled = true
dlq_max_retries = 5
dlq_poll_interval_secs = 30
Completed traces are cleaned up automatically based on retention policy:
[queue]
trace_retention_hours = 72
trace_cleanup_interval_secs = 3600
Horizontal Scaling
Orion is designed for single-instance simplicity with multi-instance capability. Each instance is stateless; all persistent data lives in the shared database.
What works across instances:
| Component | How It Works |
|---|---|
| Database | All instances share the same database (PostgreSQL or MySQL recommended) |
| Kafka consumers | Consumer groups handle partition assignment automatically |
| Traces | Stored in the shared database; queries return consistent results |
| Workflows & Channels | Definitions live in the database; all instances load the same set |
| Audit logs | Stored in the shared database regardless of which instance handles the request |
Per-Instance State
The following components use in-memory state that is local to each instance:
| Component | Impact | Workaround |
|---|---|---|
| Rate Limiting | 3 instances at 100 RPS = 300 RPS effective global limit | Sticky sessions; divide configured RPS by instance count |
| Request Deduplication | Same idempotency key on two instances → processed twice | Sticky sessions, or Redis-backed dedup store |
| Response Caching | Lower cache hit rates (each instance has a cold cache) | Sticky sessions, or Redis-backed cache connector |
| Circuit Breakers | One instance may trip while others keep sending | Acceptable; monitor /health on each instance |
| Engine State | POST /admin/engine/reload only reloads the receiving instance | Script reload to hit all instances (see below) |
Reload all instances:
for host in $INSTANCE_HOSTS; do
curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
-H "Authorization: Bearer $API_KEY"
done
Alternatively, use a rolling restart strategy with your orchestrator (e.g., Kubernetes rolling deployment).
Topology Control
Use channel include/exclude filters to run different Orion instances for different channel groups:
# Instance A: order processing
[channels]
include = ["orders.*", "payments.*"]
# Instance B: analytics and reporting
[channels]
include = ["analytics.*", "reports.*"]
This enables microservice-style deployment where each instance handles a subset of channels, all sharing the same database.
Database Backend Recommendations
| Backend | Single Instance | Multiple Instances | Notes |
|---|---|---|---|
| SQLite | Recommended | Not recommended | WAL mode supports concurrent reads but only one writer. File-based, cannot be shared across hosts. |
| PostgreSQL | Supported | Recommended | Full multi-connection support. Use connection pooling (PgBouncer) for many instances. |
| MySQL | Supported | Supported | Ensure READ-COMMITTED isolation for best concurrency. |
For multi-instance deployments, use PostgreSQL with connection pooling (PgBouncer). Script engine reloads to broadcast to all instances after workflow or channel changes.
Deployability
Orion ships as a single binary with embedded migrations, sensible defaults, and multiple installation methods. No runtime dependencies. You’re up and running immediately.
Packaging
Orion compiles into a single binary (orion-server) with everything built in:
- All three database backends (SQLite, PostgreSQL, MySQL) with embedded migrations
- Kafka producer and consumer
- OpenTelemetry trace export
- TLS termination
- Swagger UI at
/docs
No feature flags, no plugins, no shared libraries to manage. The binary auto-detects the database backend from the URL scheme at startup.
| Capability | Configuration | Default |
|---|---|---|
| Database backend | storage.url scheme | SQLite |
| Kafka | kafka.enabled | Disabled |
| OpenTelemetry | tracing.enabled | Disabled |
| TLS/HTTPS | server.tls.enabled | Disabled |
| Swagger UI | Always at /docs | Enabled |
| Metrics | metrics.enabled | Disabled |
With the default SQLite backend, there are zero external dependencies.
Containerization
Orion uses a multi-stage Docker build for minimal image size:
docker build -t orion .
The Dockerfile uses rust:1.93-slim for building and debian:trixie-slim for the runtime image. Key characteristics:
- Non-root execution: the container runs as a non-root user
- Built-in health probes:
/healthz(liveness) and/readyz(readiness) are available without additional configuration
Docker with SQLite persistence: SQLite stores data in a local file. Without a persistent volume, data is lost when the container restarts:
docker run -p 8080:8080 \
-v orion-data:/app/data \
-e ORION_STORAGE__URL=sqlite:/app/data/orion.db \
orion
With Docker Compose:
services:
orion:
image: orion
ports:
- "8080:8080"
environment:
ORION_STORAGE__URL: sqlite:/app/data/orion.db
volumes:
- orion-data:/app/data
volumes:
orion-data:
For production, PostgreSQL or MySQL is recommended. Point ORION_STORAGE__URL to your database server.
Configuration
All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.
TOML config file: pass with -c:
orion-server -c config.toml
Environment variable overrides: use double-underscore nesting to override any setting:
ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEY="your-secret-key"
Environment variables take precedence over the config file, making it easy to customize per environment without changing files.
Runtime configuration: channels carry their own runtime config (rate limits, timeouts, CORS, validation) via config_json, changeable through the admin API without restarts.
Distribution
Orion is available through multiple installation methods:
Homebrew (macOS and Linux):
brew install GoPlasmatic/tap/orion-server
Shell installer (Linux/macOS):
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh
PowerShell installer (Windows):
powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"
Docker:
docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest
From source:
cargo install --git https://github.com/GoPlasmatic/Orion
Multi-platform binaries are published for Linux (x86_64, aarch64), macOS (x86_64, aarch64), and Windows (x86_64).
Extensibility
Orion integrates with external systems through connectors, exposes custom logic via async function handlers, and supports multiple channel protocols for different ingestion patterns.
Connectors
Connectors are named external service configurations. Secrets stay in connectors, out of your workflows.
Authentication
Three auth schemes are supported:
| Auth Type | Fields | Example |
|---|---|---|
bearer | token | { "type": "bearer", "token": "sk-..." } |
basic | username, password | { "type": "basic", "username": "user", "password": "pass" } |
apikey | header, key | { "type": "apikey", "header": "X-API-Key", "key": "abc123" } |
Header Precedence
When http_call builds a request, headers are applied in this order (later layers override earlier ones):
| Priority | Source | Example |
|---|---|---|
| 1 (lowest) | Connector default headers | "headers": {"x-source": "orion"} in connector config |
| 2 | Connector auth | Bearer token, Basic auth, API key |
| 3 | Default content-type | application/json (only when a body is present) |
| 4 (highest) | Task-level headers | "headers": {"content-type": "text/xml"} in the task input |
Task-level headers always win. This means a workflow developer can override content-type, authorization, or any other header set by the connector.
Secret Masking
Sensitive fields (token, password, key, secret, api_key, connection_string) are automatically masked as "******" in all API responses. Secrets are stored but never exposed through the API. Workflows reference connectors by name; they never see or embed actual credentials.
HTTP Connector
REST API calls, webhooks, and external service integration:
{
"name": "payments-api",
"connector_type": "http",
"config": {
"type": "http",
"url": "https://api.stripe.com/v1",
"auth": { "type": "bearer", "token": "sk-..." },
"headers": { "x-source": "orion" },
"retry": { "max_retries": 3, "retry_delay_ms": 1000 },
"max_response_size": 10485760,
"allow_private_urls": false
}
}
| Field | Default | Description |
|---|---|---|
url | required | Base URL for all requests |
method | "" | Default HTTP method |
headers | {} | Default headers applied to every request |
auth | null | Authentication config (bearer, basic, or apikey) |
retry | 3 retries, 1000ms | Retry with exponential backoff |
max_response_size | 10 MB | Maximum response body size to prevent OOM |
allow_private_urls | false | Allow requests to private/internal IPs (SSRF protection) |
Kafka Connector
Produce to Kafka topics:
{
"name": "event-bus",
"connector_type": "kafka",
"config": {
"type": "kafka",
"brokers": ["kafka1:9092", "kafka2:9092"],
"topic": "events",
"group_id": "orion-producer"
}
}
Use the publish_kafka task function with optional JSONLogic for dynamic keys and values:
{
"function": {
"name": "publish_kafka",
"input": {
"connector": "event-bus",
"topic": "processed-orders",
"key_logic": { "var": "data.order_id" }
}
}
}
| Field | Required | Description |
|---|---|---|
connector | Yes | Kafka connector name |
topic | Yes | Target topic |
key_logic | No | JSONLogic expression for partition key |
value_logic | No | JSONLogic expression for message value (default: message.data) |
Kafka consumer configuration: map topics to channels in your config file:
[kafka]
enabled = true
brokers = ["localhost:9092"]
group_id = "orion"
[[kafka.topics]]
topic = "incoming-orders"
channel = "orders"
Async channels with protocol: "kafka" can also register topics via the API (DB-driven). Config-file and DB-driven topics are merged; duplicates are deduplicated with config-file entries taking precedence. The consumer restarts automatically on engine reload when the topic set changes.
Metadata injection: Kafka metadata is automatically injected into every message:
| Field | Description |
|---|---|
kafka_topic | Source topic name |
kafka_key | Message key (if present) |
kafka_partition | Partition number |
kafka_offset | Offset within partition |
Access these in workflows via { "var": "metadata.kafka_topic" }.
Dead letter queue: failed messages are routed to a configurable DLQ topic:
[kafka.dlq]
enabled = true
topic = "orion-dlq"
Consumer settings:
| Config | Default | Description |
|---|---|---|
kafka.processing_timeout_ms | 60000 | Per-message processing timeout |
kafka.max_inflight | 100 | Max in-flight messages |
kafka.lag_poll_interval_secs | 30 | Consumer lag polling interval |
Database Connector (SQL)
Parameterized SQL queries against PostgreSQL, MySQL, or SQLite:
{
"name": "orders-db",
"connector_type": "db",
"config": {
"type": "db",
"connection_string": "postgres://user:pass@db-host:5432/orders",
"driver": "postgres",
"max_connections": 10,
"connect_timeout_ms": 5000,
"query_timeout_ms": 30000
}
}
| Field | Default | Description |
|---|---|---|
connection_string | required | Database URL (auto-masked in API responses) |
driver | "postgres" | Driver type: postgres, mysql, or sqlite |
max_connections | null | Connection pool max size |
connect_timeout_ms | null | Connection establishment timeout |
query_timeout_ms | null | Individual query timeout |
retry | 3 retries, 1000ms | Retry with exponential backoff |
Use db_read for SELECT (returns rows as JSON array) and db_write for INSERT/UPDATE/DELETE (returns affected count):
{
"function": {
"name": "db_read",
"input": {
"connector": "orders-db",
"query": "SELECT * FROM orders WHERE customer_id = $1",
"params": [{ "var": "data.customer_id" }],
"output": "data.orders"
}
}
}
Cache Connector
In-memory or Redis cache for lookups, session state, and temporary storage:
{
"name": "session-cache",
"connector_type": "cache",
"config": {
"type": "cache",
"backend": "redis",
"url": "redis://localhost:6379",
"default_ttl_secs": 300,
"max_connections": 10
}
}
| Field | Default | Description |
|---|---|---|
backend | required | "redis" or "memory" |
url | required (redis) | Redis connection URL |
default_ttl_secs | null | Default TTL for cache entries |
max_connections | null | Connection pool max size |
retry | 3 retries, 1000ms | Retry with exponential backoff |
Use cache_read and cache_write in workflows:
{
"function": {
"name": "cache_write",
"input": {
"connector": "session-cache",
"key": "session:user123",
"value": { "var": "data.session" },
"ttl_secs": 3600
}
}
}
Storage Connector (S3/GCS)
S3, GCS, or local filesystem for file operations:
{
"name": "uploads",
"connector_type": "storage",
"config": {
"type": "storage",
"provider": "s3",
"bucket": "my-uploads",
"region": "us-east-1",
"base_path": "/data"
}
}
| Field | Default | Description |
|---|---|---|
provider | required | Storage provider: s3, gcs, local |
bucket | null | Bucket or container name |
region | null | Cloud region |
base_path | null | Base path prefix for all operations |
retry | 3 retries, 1000ms | Retry with exponential backoff |
MongoDB Connector (NoSQL)
MongoDB document queries with BSON-to-JSON conversion:
{
"name": "analytics-db",
"connector_type": "db",
"config": {
"type": "db",
"connection_string": "mongodb://localhost:27017",
"driver": "mongodb"
}
}
Use mongo_read in workflows:
{
"function": {
"name": "mongo_read",
"input": {
"connector": "analytics-db",
"database": "analytics",
"collection": "events",
"filter": { "user_id": { "var": "data.user_id" } },
"output": "data.events"
}
}
}
Custom Functions
Orion provides 8 async function handlers that can be used in workflow tasks:
| Function | Description |
|---|---|
http_call | Call external APIs via HTTP connectors |
channel_call | Invoke another channel’s workflow in-process (no HTTP round-trip) |
db_read | Execute SELECT queries against SQL connectors |
db_write | Execute INSERT/UPDATE/DELETE against SQL connectors |
cache_read | Read from memory or Redis cache connectors |
cache_write | Write to memory or Redis cache connectors |
mongo_read | Query MongoDB collections |
publish_kafka | Produce messages to Kafka topics |
In addition to async functions, Orion includes a built-in function library for data transformation:
| Function | Description |
|---|---|
parse_json | Parse raw payload into structured data |
parse_xml | Parse XML payload into structured data |
filter | Filter arrays using JSONLogic conditions |
map | Transform data with field mappings |
validation | Validate data against JSONLogic rules |
log | Log data at a specified level |
JSONLogic expressions power all conditions and dynamic values. Use { "var": "data.field" } to reference data, { "cat": [...] } for string concatenation, arithmetic operators, and more. Dynamic paths (path_logic) and bodies (body_logic) let you compute URLs and request payloads from message data.
Channel Protocols
Channels support three protocol modes:
REST (Sync)
REST channels define route patterns for RESTful API routing with method and path matching:
{
"name": "order-detail",
"channel_type": "sync",
"protocol": "rest",
"methods": ["GET", "POST"],
"route_pattern": "/orders/{order_id}/items/{item_id}",
"workflow_id": "order-detail-workflow"
}
Path parameters are extracted and injected into the message metadata. Routes are matched by priority (descending) then specificity (segment count).
Simple HTTP (Sync)
Simple HTTP channels are matched by channel name. Requests to /api/v1/data/{channel-name} are routed directly:
{
"name": "orders",
"channel_type": "sync",
"protocol": "http",
"workflow_id": "order-processing"
}
Kafka (Async)
Kafka channels consume from topics and process messages asynchronously:
{
"name": "kafka-orders",
"channel_type": "async",
"protocol": "kafka",
"topic": "incoming-orders",
"consumer_group": "orion-orders",
"workflow_id": "order-processing"
}
DB-driven Kafka channels are automatically registered as consumers at startup and on engine reload. Add Kafka ingestion via the API without restarting Orion.
Availability
Orion supports zero-downtime engine reloads, percentage-based canary rollouts, full version lifecycle management, and response caching, enabling continuous delivery without service interruptions.
Hot-Reload
The engine is held in memory as Arc<RwLock<Arc<Engine>>>. A reload swaps the inner Arc<Engine> while existing readers continue using the old one. Zero dropped requests.
Trigger a reload:
curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload
A reload performs three operations atomically:
- Engine swap: rebuilds the engine from all active workflows and channels in the database
- Channel registry rebuild: reconstructs the route table, validation logic, rate limiters, backpressure semaphores, dedup stores, and response caches
- Kafka consumer restart: if the topic set changed, the Kafka consumer is stopped and restarted with the new topics
Reloads are triggered automatically on status changes (activate/archive) and deletes. Draft creates and updates do not trigger reload.
In multi-instance deployments, reload only affects the instance that receives the request. Script the reload to broadcast to all instances:
for host in $INSTANCE_HOSTS; do
curl -X POST "http://$host:8080/api/v1/admin/engine/reload" \
-H "Authorization: Bearer $API_KEY"
done
Canary Rollouts
Control traffic exposure for active workflows with rollout percentages:
# Activate a workflow at 10% rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
-H "Content-Type: application/json" \
-d '{"status": "active", "rollout_percentage": 10}'
# Increase rollout to 50%
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
-H "Content-Type: application/json" \
-d '{"rollout_percentage": 50}'
# Full rollout
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/rollout \
-H "Content-Type: application/json" \
-d '{"rollout_percentage": 100}'
The rollout percentage determines the probability that incoming requests are matched to this workflow. This enables:
- Gradual migration: slowly ramp traffic from 0% to 100%
- A/B testing: run two workflow versions at different percentages
- Instant rollback: set rollout to 0% or archive the workflow
Versioning
Both workflows and channels follow a draft → active → archived lifecycle with automatic version tracking:
# Create (starts as draft, version 1)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
-H "Content-Type: application/json" \
-d '{ "name": "Order Processor", ... }'
# Update (only drafts can be updated)
curl -s -X PUT http://localhost:8080/api/v1/admin/workflows/<id> \
-H "Content-Type: application/json" -d '{ ... }'
# Activate (loads into engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
-H "Content-Type: application/json" -d '{"status": "active"}'
# Create new version (new draft from active)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/versions
# Archive (removes from engine)
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/<id>/status \
-H "Content-Type: application/json" -d '{"status": "archived"}'
All versions are stored with incrementing version numbers. List the version history:
curl -s http://localhost:8080/api/v1/admin/workflows/<id>/versions
Import and export: bulk operations for GitOps and migration:
# Export workflows (as JSON)
curl -s http://localhost:8080/api/v1/admin/workflows/export?status=active
# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
-H "Content-Type: application/json" -d @workflows.json
Performance
Response caching: cache responses for identical requests to reduce redundant workflow execution:
{
"cache": {
"enabled": true,
"ttl_secs": 60,
"cache_key_fields": ["data.user_id", "data.action"]
}
}
Cache keys are computed from the specified fields. Cached responses are returned directly without executing the workflow. The cache backend is in-memory by default; Redis-backed caching is available via a cache connector.
Request deduplication: prevent duplicate processing using idempotency keys:
{
"deduplication": {
"header": "Idempotency-Key",
"retention_secs": 300
}
}
When a request with the same idempotency key arrives within the retention window, it returns 409 Conflict instead of re-processing.
Connection pool caching: external database and MongoDB connector pools are cached and reused across requests, with configurable pool sizes and idle timeouts:
[engine]
max_pool_cache_entries = 100
cache_cleanup_interval_secs = 60
Maintainability
Orion provides comprehensive admin APIs, CI/CD integration patterns, dry-run testing, and operational tools for managing services in production.
Admin APIs
Full CRUD operations for all entities through a RESTful admin API:
| Resource | Endpoints |
|---|---|
| Workflows | Create, read, update, delete, status management, versioning, rollout, dry-run test, import/export, validate |
| Channels | Create, read, update, delete, status management, versioning |
| Connectors | Create, read, update, delete, reload, circuit breaker inspection/reset |
| Engine | Status, hot-reload |
| Audit logs | List with filtering by action and resource type |
| Backup | Database export and restore |
Version management: both workflows and channels support the draft → active → archived lifecycle. Filter by status:
curl -s "http://localhost:8080/api/v1/admin/workflows?status=active"
curl -s "http://localhost:8080/api/v1/admin/channels?status=draft"
Engine control:
# Check engine status
curl -s http://localhost:8080/api/v1/admin/engine/status
# Hot-reload after changes
curl -s -X POST http://localhost:8080/api/v1/admin/engine/reload
OpenAPI / Swagger UI: interactive API documentation is always available at /docs, and the OpenAPI 3.0 spec at /api/v1/openapi.json.
CI/CD Integration
Orion workflows are JSON files that version, diff, and review like any other config.
Bulk import and export:
# Export active workflows
curl -s "http://localhost:8080/api/v1/admin/workflows/export?status=active" -o workflows.json
# Import workflows (created as drafts)
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/import \
-H "Content-Type: application/json" -d @workflows.json
Pre-deploy validation: validate workflow structure without creating:
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
-H "Content-Type: application/json" -d @workflow.json
GitOps pipeline: a typical CI/CD flow:
AI generates workflow → commit as JSON → CI validates & dry-runs → review → import → activate
GitHub Actions example:
name: Validate Workflows
on:
pull_request:
paths: ['workflows/**/*.json']
jobs:
validate:
runs-on: ubuntu-latest
services:
orion:
image: ghcr.io/goplasmatic/orion:latest
ports: ['8080:8080']
steps:
- uses: actions/checkout@v4
- name: Import and test workflows
run: |
for file in workflows/**/*.json; do
curl -sf -X POST http://localhost:8080/api/v1/admin/workflows \
-H "Content-Type: application/json" -d @"$file"
done
Tag-based organization: tag workflows for filtering:
{ "tags": ["fraud", "high-priority", "v2"] }
curl -s "http://localhost:8080/api/v1/admin/workflows?tag=fraud"
Testing
Dry-run execution: test a workflow against sample data without activating it:
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<id>/test \
-H "Content-Type: application/json" \
-d '{"data": {"amount": 50000, "currency": "USD"}}'
The response includes a full execution trace showing which tasks ran and which were skipped:
{
"matched": true,
"trace": {
"steps": [
{ "task_id": "parse", "result": "executed" },
{ "task_id": "high_risk", "result": "executed" },
{ "task_id": "normal_risk", "result": "skipped" }
]
},
"output": {
"txn": { "amount": 50000, "risk_level": "high", "requires_review": true }
}
}
Workflow validation: check that a workflow definition is structurally valid:
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/validate \
-H "Content-Type: application/json" -d @workflow.json
Step-by-step traces: async traces record the full execution path and can be retrieved for debugging:
# Submit async request
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
-H "Content-Type: application/json" -d '{ "data": { "order_id": "ORD-123" } }'
# Get trace with execution details
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}
Operations
Audit logging: all admin actions are recorded for compliance and debugging:
curl -s http://localhost:8080/api/v1/admin/audit-logs
curl -s "http://localhost:8080/api/v1/admin/audit-logs?action=activate&resource_type=workflow"
Each entry captures: principal, action, resource type, resource ID, details, and timestamp.
Database backup and restore:
# Export backup
curl -s -X POST http://localhost:8080/api/v1/admin/backup -o backup.json
# Restore from backup
curl -s -X POST http://localhost:8080/api/v1/admin/restore \
-H "Content-Type: application/json" -d @backup.json
Config validation CLI: validate your configuration without starting the server:
orion-server validate-config
orion-server validate-config -c config.toml
Database migrations: run or preview pending migrations:
orion-server migrate # Run migrations
orion-server migrate --dry-run # Preview pending migrations
Admin API
All admin endpoints are under /api/v1/admin/. When admin authentication is enabled, requests must include a valid bearer token or API key.
Channels
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/admin/channels | Create channel (as draft) |
| GET | /api/v1/admin/channels | List channels. Filter with ?status=, ?channel_type=, ?protocol= |
| GET | /api/v1/admin/channels/{id} | Get channel by ID |
| PUT | /api/v1/admin/channels/{id} | Update draft channel |
| DELETE | /api/v1/admin/channels/{id} | Delete channel (all versions) |
| PATCH | /api/v1/admin/channels/{id}/status | Change status (active/archived) |
| GET | /api/v1/admin/channels/{id}/versions | List channel version history |
| POST | /api/v1/admin/channels/{id}/versions | Create new draft version from active channel |
Workflows
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/admin/workflows | Create workflow (as draft; optional id field for custom IDs) |
| GET | /api/v1/admin/workflows | List workflows. Filter with ?tag=, ?status= |
| GET | /api/v1/admin/workflows/{id} | Get workflow by ID |
| PUT | /api/v1/admin/workflows/{id} | Update draft workflow |
| DELETE | /api/v1/admin/workflows/{id} | Delete workflow (all versions) |
| PATCH | /api/v1/admin/workflows/{id}/status | Change status (active/archived) |
| GET | /api/v1/admin/workflows/{id}/versions | List workflow version history |
| POST | /api/v1/admin/workflows/{id}/versions | Create new draft version from active workflow |
| PATCH | /api/v1/admin/workflows/{id}/rollout | Update rollout percentage |
| POST | /api/v1/admin/workflows/{id}/test | Dry-run on sample payload |
| POST | /api/v1/admin/workflows/import | Bulk import workflows (as drafts) |
| GET | /api/v1/admin/workflows/export | Export workflows. Filter with ?tag=, ?status= |
| POST | /api/v1/admin/workflows/validate | Validate workflow definition |
Connectors
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/admin/connectors | Create connector |
| GET | /api/v1/admin/connectors | List connectors (secrets masked) |
| GET | /api/v1/admin/connectors/{id} | Get connector by ID (secrets masked) |
| PUT | /api/v1/admin/connectors/{id} | Update connector |
| DELETE | /api/v1/admin/connectors/{id} | Delete connector |
| POST | /api/v1/admin/connectors/reload | Reload all connectors from DB |
| GET | /api/v1/admin/connectors/circuit-breakers | List circuit breaker states |
| POST | /api/v1/admin/connectors/circuit-breakers/{key} | Reset a circuit breaker |
Engine
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/admin/engine/status | Engine status (version, uptime, workflows count, channels) |
| POST | /api/v1/admin/engine/reload | Hot-reload channels and workflows |
Audit Logs
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/admin/audit-logs | List audit log entries. Filter with ?action=, ?resource_type= |
Backup & Restore
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/admin/backup | Export database backup |
| POST | /api/v1/admin/restore | Restore from backup |
Lifecycle
Both channels and workflows follow a draft → active → archived lifecycle:
- Create: entities are created as
draft(not loaded into the engine) - Update: only draft versions can be updated via
PUT - Activate:
PATCH /statuswith{"status": "active"}loads the entity into the engine - New version:
POST /versionscreates a new draft version from the active entity - Archive:
PATCH /statuswith{"status": "archived"}removes from the engine
A channel links to a workflow via workflow_id. Activating a channel makes it available for data processing; activating a workflow makes its logic available to the engine.
Authentication
Admin API endpoints support bearer token or API key authentication when enabled:
# Bearer token (default header: Authorization)
curl -H "Authorization: Bearer your-secret-key" \
http://localhost:8080/api/v1/admin/workflows
# API key via custom header
curl -H "X-API-Key: your-secret-key" \
http://localhost:8080/api/v1/admin/workflows
Configure via [admin_auth] in config or ORION_ADMIN_AUTH__ENABLED=true environment variable.
Error Response Format
All error responses follow a consistent structure:
{
"error": {
"code": "NOT_FOUND",
"message": "Workflow with id '...' not found"
}
}
| Code | HTTP Status | Description |
|---|---|---|
NOT_FOUND | 404 | Resource not found |
BAD_REQUEST | 400 | Invalid input |
UNAUTHORIZED | 401 | Missing or invalid credentials |
FORBIDDEN | 403 | Access denied |
CONFLICT | 409 | Duplicate or conflicting state |
RATE_LIMITED | 429 | Too many requests |
TIMEOUT | 504 | Workflow execution exceeded timeout |
SERVICE_UNAVAILABLE | 503 | Backpressure or circuit breaker open |
UNSUPPORTED_MEDIA_TYPE | 415 | Invalid content type |
INTERNAL_ERROR | 500 | Internal server error |
Data API
The data API handles runtime request processing: routing messages to channels, executing workflows, and returning results.
Endpoints
| Method | Path | Description |
|---|---|---|
POST | /api/v1/data/{channel} | Process message synchronously (simple channel name) |
POST | /api/v1/data/{channel}/async | Submit for async processing (returns trace ID) |
ANY | /api/v1/data/{path...} | REST route matching: method + path matched against channel route patterns |
ANY | /api/v1/data/{path...}/async | Async submission via REST route matching |
GET | /api/v1/data/traces | List traces. Filter with ?status=, ?channel=, ?mode= |
GET | /api/v1/data/traces/{id} | Poll async trace result |
Route Resolution
When a request arrives at /api/v1/data/{path}, Orion resolves the target channel in this order:
- Async check: strip trailing
/asyncsuffix (switches to async mode) - REST route table: match HTTP method + path against channel
route_patternvalues (e.g.,GET /orders/{order_id}) - Channel name fallback: direct lookup by single path segment (e.g.,
/api/v1/data/orders→ channel namedorders)
REST routes are matched by priority (descending) then specificity (segment count). Path parameters are extracted and injected into the message metadata.
Synchronous Processing
Send a POST to the channel name or a matching REST route:
# By channel name
curl -s -X POST http://localhost:8080/api/v1/data/orders \
-H "Content-Type: application/json" \
-d '{ "data": { "order_id": "ORD-123", "total": 25000 } }'
# By REST route pattern
curl -s -X GET http://localhost:8080/api/v1/data/orders/ORD-123/items/ITEM-1
Response:
{
"status": "ok",
"data": {
"order": { "order_id": "ORD-123", "total": 25000, "flagged": true }
},
"errors": []
}
Asynchronous Processing
Append /async to submit for background processing:
curl -s -X POST http://localhost:8080/api/v1/data/orders/async \
-H "Content-Type: application/json" \
-d '{ "data": { "order_id": "ORD-456" } }'
Response: returns immediately with a trace ID:
{
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "pending"
}
Poll for the result:
curl -s http://localhost:8080/api/v1/data/traces/550e8400-e29b-41d4-a716-446655440000
Trace statuses: pending → completed or failed.
Trace Endpoints
List and filter traces:
# List all traces
curl -s http://localhost:8080/api/v1/data/traces
# Filter by channel and status
curl -s "http://localhost:8080/api/v1/data/traces?channel=orders&status=completed"
# Filter by mode
curl -s "http://localhost:8080/api/v1/data/traces?mode=async"
Get a specific trace:
curl -s http://localhost:8080/api/v1/data/traces/{trace-id}
Operational Endpoints
| Method | Path | Description |
|---|---|---|
| GET | /health | Health check (200 OK / 503 degraded). Checks DB, engine, uptime |
| GET | /healthz | Kubernetes liveness probe. Always returns 200 |
| GET | /readyz | Kubernetes readiness probe. 503 if DB, engine, or startup not ready |
| GET | /metrics | Prometheus metrics (when enabled) |
| GET | /docs | Swagger UI |
| GET | /api/v1/openapi.json | OpenAPI 3.0 specification |
CLI Setup
Get Orion running on your machine in under a minute.
Installation
Choose your preferred method:
Homebrew (macOS and Linux):
brew install GoPlasmatic/tap/orion-server
Shell installer (Linux/macOS):
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.sh | sh
PowerShell (Windows):
powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion/releases/latest/download/orion-server-installer.ps1 | iex"
Docker:
docker run -p 8080:8080 ghcr.io/goplasmatic/orion:latest
From source (requires Rust 1.85+):
cargo install --git https://github.com/GoPlasmatic/Orion
First Run
Start Orion with default settings (SQLite, port 8080):
orion-server
Verify it’s running:
curl -s http://localhost:8080/health
{
"status": "ok",
"version": "0.1.0",
"uptime_seconds": 5,
"workflows_loaded": 0,
"components": {
"database": "ok",
"engine": "ok"
}
}
Swagger UI is available at http://localhost:8080/docs.
Configuration
Create a config file for custom settings:
orion-server -c config.toml
Or use environment variables for individual overrides:
ORION_SERVER__PORT=9090 \
ORION_LOGGING__FORMAT=json \
orion-server
Common configuration scenarios:
# Use PostgreSQL instead of SQLite
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server
# Enable admin authentication
ORION_ADMIN_AUTH__ENABLED=true \
ORION_ADMIN_AUTH__API_KEY="your-secret-key" \
orion-server
# Enable metrics and tracing
ORION_METRICS__ENABLED=true \
ORION_TRACING__ENABLED=true \
ORION_TRACING__OTLP_ENDPOINT="http://localhost:4317" \
orion-server
Validate a config file without starting the server:
orion-server validate-config -c config.toml
Create Your First Service
1. Create a workflow:
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
-H "Content-Type: application/json" \
-d '{
"id": "hello-world",
"name": "Hello World",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "req" }
}},
{ "id": "greet", "name": "Greet", "function": {
"name": "map", "input": { "mappings": [
{ "path": "data.req.greeting", "logic": {
"cat": ["Hello, ", { "var": "data.req.name" }, "!"]
}}
]}
}}
]
}'
2. Activate the workflow:
curl -s -X PATCH http://localhost:8080/api/v1/admin/workflows/hello-world/status \
-H "Content-Type: application/json" -d '{"status": "active"}'
3. Create and activate a channel:
curl -s -X POST http://localhost:8080/api/v1/admin/channels \
-H "Content-Type: application/json" \
-d '{ "channel_id": "hello", "name": "hello", "channel_type": "sync",
"protocol": "rest", "route_pattern": "/hello",
"methods": ["POST"], "workflow_id": "hello-world" }'
curl -s -X PATCH http://localhost:8080/api/v1/admin/channels/hello/status \
-H "Content-Type: application/json" -d '{"status": "active"}'
4. Test it:
curl -s -X POST http://localhost:8080/api/v1/data/hello \
-H "Content-Type: application/json" \
-d '{ "data": { "name": "World" } }'
{
"status": "ok",
"data": { "req": { "name": "World", "greeting": "Hello, World!" } },
"errors": []
}
Orion CLI
The Orion CLI provides a command-line interface and MCP server for managing Orion. No curl commands needed.
Homebrew (macOS and Linux):
brew install GoPlasmatic/tap/orion-cli
Shell installer (Linux/macOS):
curl --proto '=https' --tlsv1.2 -LsSf https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.sh | sh
PowerShell (Windows):
powershell -ExecutionPolicy ByPass -c "irm https://github.com/GoPlasmatic/Orion-cli/releases/latest/download/orion-cli-installer.ps1 | iex"
From source (requires Rust 1.85+):
cargo install --git https://github.com/GoPlasmatic/Orion-cli
Usage:
orion-cli config set-server http://localhost:8080
orion-cli health
orion-cli workflows list
orion-cli channels list
orion-cli send hello -d '{ "data": { "name": "World" } }'
See the CLI reference for the full command list, or set up the MCP Server for AI assistant integration.
Next Steps
- Browse the API Reference for all available endpoints
- Explore Production Features for observability, security, and resilience
- See the Config Reference for all configuration options
MCP Server Setup
Orion includes an MCP (Model Context Protocol) server that lets AI assistants like Claude manage workflows, channels, and connectors through natural language. The MCP server wraps the Orion admin API, giving AI agents full control over your Orion instance.
Prerequisites
- A running Orion instance (see CLI Setup)
- An MCP-compatible client (Claude Code, Claude Desktop, or other MCP clients)
Configuration
Add the Orion MCP server to your MCP client configuration.
Claude Code: add to your .claude/settings.json or project-level .mcp.json:
{
"mcpServers": {
"orion": {
"command": "orion-cli",
"args": ["mcp", "serve"],
"env": {
"ORION_SERVER_URL": "http://localhost:8080"
}
}
}
}
Claude Desktop: add to your Claude Desktop configuration file:
{
"mcpServers": {
"orion": {
"command": "orion-cli",
"args": ["mcp", "serve"],
"env": {
"ORION_SERVER_URL": "http://localhost:8080"
}
}
}
}
If admin authentication is enabled on your Orion instance, include the API key:
{
"env": {
"ORION_SERVER_URL": "http://localhost:8080",
"ORION_API_KEY": "your-secret-key"
}
}
Available Tools
The MCP server exposes the following tools to AI assistants:
| Tool | Description |
|---|---|
list_workflows | List all workflows with optional status/tag filters |
create_workflow | Create a new workflow (as draft) |
get_workflow | Get workflow details by ID |
update_workflow | Update a draft workflow |
activate_workflow | Activate a draft workflow |
archive_workflow | Archive a workflow |
test_workflow | Dry-run a workflow with sample data |
validate_workflow | Validate workflow structure |
list_channels | List all channels |
create_channel | Create a new channel |
activate_channel | Activate a draft channel |
list_connectors | List connectors (secrets masked) |
create_connector | Create a new connector |
engine_status | Get engine status |
reload_engine | Hot-reload the engine |
health_check | Check Orion health |
Usage Example
Once configured, you can use natural language to manage Orion:
“Create a workflow that parses incoming orders, flags any over $10,000, and adds a risk level field. Then create a channel called ‘orders’ that uses it.”
The AI assistant will use the MCP tools to:
- Create the workflow via
create_workflow - Test it with sample data via
test_workflow - Activate it via
activate_workflow - Create the channel via
create_channel - Activate the channel via
activate_channel
Troubleshooting
MCP server not connecting:
- Verify Orion is running:
curl http://localhost:8080/health - Check the
ORION_SERVER_URLenvironment variable is set correctly - Ensure
orion-cliis in your PATH
Authentication errors:
- Verify
ORION_API_KEYmatches your Orion instance’sadmin_auth.api_key - Check that admin auth is enabled on the Orion instance if you’re passing a key
Use Cases & Patterns
Real-world examples showing how AI generates Orion workflows from natural language. Every example follows the same pattern: describe what you need → AI generates the workflow → create a channel → send data → get results.
E-Commerce Order Classification
Classify orders into tiers and compute discounts.
AI prompt:
Create a workflow for the "orders" channel that:
1. Parses the payload into "order"
2. Assigns tiers based on amount:
- VIP: amount >= 500, discount 15%
- Premium: amount 100-500, discount 5%
- Standard: amount < 100, no discount
Generated workflow:
{
"name": "Order Classification",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse Payload", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "order" }
}},
{ "id": "vip_tier", "name": "Set VIP Tier",
"condition": { ">=": [{ "var": "data.order.amount" }, 500] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.order.tier", "logic": "vip" },
{ "path": "data.order.discount_pct", "logic": 15 }
]}}
},
{ "id": "premium_tier", "name": "Set Premium Tier",
"condition": { "and": [
{ ">=": [{ "var": "data.order.amount" }, 100] },
{ "<": [{ "var": "data.order.amount" }, 500] }
]},
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.order.tier", "logic": "premium" },
{ "path": "data.order.discount_pct", "logic": 5 }
]}}
},
{ "id": "standard_tier", "name": "Set Standard Tier",
"condition": { "<": [{ "var": "data.order.amount" }, 100] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.order.tier", "logic": "standard" },
{ "path": "data.order.discount_pct", "logic": 0 }
]}}
}
]
}
Send data:
curl -s -X POST http://localhost:8080/api/v1/data/orders \
-H "Content-Type: application/json" \
-d '{ "data": { "amount": 750, "product": "Diamond Ring" } }'
Response:
{
"status": "ok",
"data": {
"order": { "amount": 750, "product": "Diamond Ring", "tier": "vip", "discount_pct": 15 }
},
"errors": []
}
Key patterns: Task-level conditions, computed output fields.
IoT Sensor Alert Classification
Classify sensor readings into severity levels using range-based conditions.
AI prompt:
Create a workflow for the "sensors" channel that classifies temperature readings:
- Critical: temperature > 90 or below 0, set alert flag
- Warning: temperature 70-90, set alert flag
- Normal: temperature 0-70, no alert
Parse the payload into "reading" and set severity and alert fields.
Generated workflow:
{
"name": "Sensor Alert Pipeline",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse Payload", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "reading" }
}},
{ "id": "critical", "name": "Mark Critical",
"condition": { "or": [
{ ">": [{ "var": "data.reading.temperature" }, 90] },
{ "<": [{ "var": "data.reading.temperature" }, 0] }
]},
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.reading.severity", "logic": "critical" },
{ "path": "data.reading.alert", "logic": true }
]}}
},
{ "id": "warning", "name": "Mark Warning",
"condition": { "and": [
{ ">": [{ "var": "data.reading.temperature" }, 70] },
{ "<=": [{ "var": "data.reading.temperature" }, 90] }
]},
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.reading.severity", "logic": "warning" },
{ "path": "data.reading.alert", "logic": true }
]}}
},
{ "id": "normal", "name": "Mark Normal",
"condition": { "and": [
{ ">=": [{ "var": "data.reading.temperature" }, 0] },
{ "<=": [{ "var": "data.reading.temperature" }, 70] }
]},
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.reading.severity", "logic": "normal" },
{ "path": "data.reading.alert", "logic": false }
]}}
}
]
}
Send data:
curl -s -X POST http://localhost:8080/api/v1/data/sensors \
-H "Content-Type: application/json" \
-d '{ "data": { "temperature": 80, "sensor_id": "SENSOR-42" } }'
Response:
{
"status": "ok",
"data": {
"reading": { "temperature": 80, "sensor_id": "SENSOR-42", "severity": "warning", "alert": true }
},
"errors": []
}
| Input | Severity | Alert |
|---|---|---|
temperature: 45 | normal | false |
temperature: 80 | warning | true |
temperature: 95 | critical | true |
temperature: -5 | critical | true |
Key patterns: Range-based classification with and/or conditions, boolean flags.
Webhook Payload Transformation
Normalize incoming webhook payloads from different providers into a consistent internal schema.
AI prompt:
Create a workflow for the "webhooks" channel that normalizes webhook payloads from any provider:
- Map "origin" to "source"
- Map "type" to "event_type"
- Map "body" to "payload"
- Add a "processed" flag set to true
Output should be under data.normalized.
Generated workflow:
{
"name": "Webhook Transform Pipeline",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse Payload", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "event" }
}},
{ "id": "normalize", "name": "Normalize Schema", "function": {
"name": "map", "input": { "mappings": [
{ "path": "data.normalized.source", "logic": { "var": "data.event.origin" } },
{ "path": "data.normalized.event_type", "logic": { "var": "data.event.type" } },
{ "path": "data.normalized.payload", "logic": { "var": "data.event.body" } },
{ "path": "data.normalized.processed", "logic": true }
]}
}}
]
}
Send data:
curl -s -X POST http://localhost:8080/api/v1/data/webhooks \
-H "Content-Type: application/json" \
-d '{ "data": { "origin": "github", "type": "push", "body": {"ref": "refs/heads/main"} } }'
Response:
{
"status": "ok",
"data": {
"normalized": { "source": "github", "event_type": "push", "payload": {"ref": "refs/heads/main"}, "processed": true }
},
"errors": []
}
Missing optional fields produce null, no errors. This makes the pipeline safe for partial payloads from different webhook providers.
Key patterns: Schema mapping with var, null-safe field access, static enrichment.
Notification Routing
Route notifications to different delivery channels based on severity.
AI prompt:
Create a workflow for the "notifications" channel that routes by severity:
- Log all notifications
- Send email for anything except "low" severity
- Send SMS only for "high" and "critical" severity
Parse the payload into "notification".
Generated workflow:
{
"name": "Notification Router",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse Payload", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "notification" }
}},
{ "id": "log_all", "name": "Log All Notifications", "function": {
"name": "map", "input": { "mappings": [
{ "path": "data.notification.logged", "logic": true }
]}
}},
{ "id": "email", "name": "Send Email",
"condition": { "!=": [{ "var": "data.notification.severity" }, "low"] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.notification.email_sent", "logic": true }
]}}
},
{ "id": "sms", "name": "Send SMS for High/Critical",
"condition": { "in": [{ "var": "data.notification.severity" }, ["high", "critical"]] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.notification.sms_sent", "logic": true }
]}}
}
]
}
Send data:
curl -s -X POST http://localhost:8080/api/v1/data/notifications \
-H "Content-Type: application/json" \
-d '{ "data": { "message": "Disk usage at 92%", "severity": "high" } }'
Response:
{
"status": "ok",
"data": {
"notification": { "message": "Disk usage at 92%", "severity": "high", "logged": true, "email_sent": true, "sms_sent": true }
},
"errors": []
}
| Severity | Logged | SMS | |
|---|---|---|---|
| low | yes | no | no |
| medium | yes | yes | no |
| high | yes | yes | yes |
| critical | yes | yes | yes |
In production, replace the map tasks with http_call tasks pointing to your email and SMS connectors.
Key patterns: Task-level condition gating, in operator for set membership, progressive pipeline.
Compliance Risk Classification
Classify transactions by risk level and use dry-run testing to verify workflows before activating them.
AI prompt:
Create a workflow for the "compliance" channel that classifies transaction risk:
- High risk: amount > 10000, requires manual review
- Normal risk: amount <= 10000, no review needed
Parse the payload into "txn".
Generated workflow:
{
"name": "Risk Classifier",
"condition": true,
"tasks": [
{ "id": "parse", "name": "Parse Payload", "function": {
"name": "parse_json", "input": { "source": "payload", "target": "txn" }
}},
{ "id": "high_risk", "name": "Flag High Risk",
"condition": { ">": [{ "var": "data.txn.amount" }, 10000] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.txn.risk_level", "logic": "high" },
{ "path": "data.txn.requires_review", "logic": true }
]}}
},
{ "id": "normal_risk", "name": "Normal Risk",
"condition": { "<=": [{ "var": "data.txn.amount" }, 10000] },
"function": { "name": "map", "input": { "mappings": [
{ "path": "data.txn.risk_level", "logic": "normal" },
{ "path": "data.txn.requires_review", "logic": false }
]}}
}
]
}
Dry-run before going live:
curl -s -X POST http://localhost:8080/api/v1/admin/workflows/<workflow-id>/test \
-H "Content-Type: application/json" \
-d '{"data": {"amount": 50000, "currency": "USD"}}'
{
"matched": true,
"trace": { "steps": [
{ "task_id": "parse", "result": "executed" },
{ "task_id": "high_risk", "result": "executed" },
{ "task_id": "normal_risk", "result": "skipped" }
]},
"output": { "txn": { "amount": 50000, "currency": "USD", "risk_level": "high", "requires_review": true } }
}
The trace shows exactly which tasks ran and which were skipped. Verify the logic is correct before a single real transaction flows through.
Key patterns: Dry-run verification, execution trace inspection, regulatory workflow.
AI Workflow & CI/CD
AI writes workflows, not services. Instead of generating microservices that need their own governance, LLMs generate Orion workflows: constrained JSON that the platform validates, versions, and monitors automatically.
Prompt Templates
Structure your LLM prompts to produce valid Orion workflows. Here’s a reusable system prompt:
You generate Orion workflows in JSON format. Workflows have:
- name, condition (JSONLogic or true), continue_on_error (optional boolean)
- tasks: array of { id, name, condition (optional, JSONLogic), function: { name, input } }
- Every workflow starts with a parse_json task: { "name": "parse_json", "input": { "source": "payload", "target": "<entity>" } }
- Use "map" function with "mappings" array for transforms. Each mapping has "path" (dot notation) and "logic" (value or JSONLogic).
- Use "http_call" with "connector" (by name) for external API calls. Do not embed URLs or credentials in workflows.
- Use "channel_call" with "channel" (by name) for in-process inter-channel invocation.
- Task conditions use { "var": "data.<entity>.<field>" } to reference parsed data.
Output ONLY the JSON workflow. No explanation.
Validation Pipeline
Every AI-generated workflow should go through this pipeline before reaching production:
- Generate: use your LLM with the prompt template above
- Validate:
POST /api/v1/admin/workflows/validateto check structure - Create as draft:
POST /api/v1/admin/workflows(workflows are created as drafts by default, not loaded into the engine) - Dry-run:
POST /api/v1/admin/workflows/{id}/testwith representative test data - Check the trace: verify the right tasks ran, the right ones were skipped, and output matches expectations
- Activate:
PATCH /api/v1/admin/workflows/{id}/statuswith"status": "active"
CI/CD Pipeline
Integrate AI workflow generation into your deployment pipeline. Workflows are JSON files that version, diff, and review like any other config.
AI generates workflow → commit as JSON → CI runs dry-run → review → import
GitHub Actions example:
name: Validate AI Workflows
on:
pull_request:
paths: ['workflows/**/*.json']
jobs:
validate:
runs-on: ubuntu-latest
services:
orion:
image: ghcr.io/goplasmatic/orion:latest
ports: ['8080:8080']
steps:
- uses: actions/checkout@v4
- name: Import workflows (as drafts)
run: |
for file in workflows/**/*.json; do
curl -s -X POST http://localhost:8080/api/v1/admin/workflows \
-H "Content-Type: application/json" \
-d @"$file"
done
- name: Dry-run test cases
run: |
for test in workflows/tests/**/*.json; do
WORKFLOW_ID=$(jq -r '.workflow_id' "$test")
DATA=$(jq -c '.data' "$test")
EXPECTED=$(jq -c '.expected_output' "$test")
RESULT=$(curl -s -X POST \
"http://localhost:8080/api/v1/admin/workflows/${WORKFLOW_ID}/test" \
-H "Content-Type: application/json" \
-d "$DATA")
OUTPUT=$(echo "$RESULT" | jq -c '.output')
if [ "$OUTPUT" != "$EXPECTED" ]; then
echo "FAIL: $test"
echo "Expected: $EXPECTED"
echo "Got: $OUTPUT"
exit 1
fi
done
- name: Deploy to production
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
run: |
for file in workflows/**/*.json; do
curl -s -X POST "${{ secrets.ORION_URL }}/api/v1/admin/workflows" \
-H "Content-Type: application/json" \
-d @"$file"
done
Test case format: store test cases alongside workflows:
workflows/
fraud-detection.json # The workflow
tests/
fraud-high-risk.json # Test case
fraud-clear.json # Test case
Each test case:
{
"workflow_id": "fraud-detection",
"data": { "data": { "amount": 15000, "country": "US" } },
"expected_output": { "order": { "amount": 15000, "risk": "high", "requires_review": true } }
}
Safety Guardrails
AI-generated workflows get the same governance as hand-written ones:
- Version history: every workflow change is recorded. Roll back if an AI-generated workflow misbehaves.
- Draft status: workflows are created as
draftby default and are not loaded into the engine until explicitly activated. - Dry-run before activate: test with representative data and inspect the full execution trace.
- Audit trail: every workflow version is recorded in the
workflowstable with incrementing version numbers. - Connectors isolate secrets: AI generates workflows that reference connector names, never credentials.
Common Workflow Patterns
The parse-then-process pattern
Every workflow that reads input data must start with parse_json. Without it, task conditions referencing data.X see empty context.
{
"tasks": [
{ "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
{ "id": "process", "condition": { ">": [{ "var": "data.order.total" }, 100] }, "function": { "..." : "..." } }
]
}
Task-level vs workflow-level conditions
- Workflow-level condition: determines whether the entire workflow matches. Set to
truefor workflows that always run. - Task-level condition: determines whether a specific task within a matched workflow executes. Use for branching logic within a pipeline.
{
"condition": true,
"tasks": [
{ "id": "always", "function": { "..." : "..." } },
{ "id": "conditional", "condition": { ">": [{ "var": "data.amount" }, 500] }, "function": { "..." : "..." } }
]
}
External API calls with connectors
Keep credentials in connectors, reference them by name in workflows:
{
"tasks": [
{ "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "event" } } },
{ "id": "notify", "function": { "name": "http_call", "input": {
"connector": "slack-webhook",
"method": "POST",
"body_logic": { "var": "data.event" }
}}}
]
}
Inter-channel composition with channel_call
Invoke another channel’s workflow in-process for service composition:
{
"tasks": [
{ "id": "parse", "function": { "name": "parse_json", "input": { "source": "payload", "target": "order" } } },
{ "id": "enrich", "function": { "name": "channel_call", "input": {
"channel": "customer-lookup",
"data_logic": { "var": "data.order.customer_id" },
"response_path": "data.customer"
}}},
{ "id": "process", "condition": { "==": [{ "var": "data.customer.tier" }, "vip"] }, "function": { "..." : "..." } }
]
}
Config Reference
All settings have sensible defaults. You can run Orion with no config file at all. orion-server just works.
CLI Commands
orion-server # Start the server (default)
orion-server -c config.toml # Start with a config file
orion-server validate-config # Validate config without starting
orion-server validate-config -c config.toml # Validate a specific config file
orion-server migrate # Run database migrations
orion-server migrate --dry-run # Preview pending migrations
Database Backend
The database backend is selected at runtime from the storage.url scheme. No rebuild needed:
| Backend | URL Format | Example |
|---|---|---|
| SQLite | sqlite: | sqlite:orion.db or sqlite::memory: |
| PostgreSQL | postgres:// | postgres://user:pass@host/db |
| MySQL | mysql:// | mysql://user:pass@host/db |
# SQLite (default)
orion-server
# PostgreSQL
ORION_STORAGE__URL="postgres://user:pass@localhost/orion" orion-server
# MySQL
ORION_STORAGE__URL="mysql://user:pass@localhost/orion" orion-server
Migrations for all backends are embedded in the binary and the correct set is selected automatically at startup.
Complete Config File
[server]
host = "0.0.0.0"
port = 8080
# shutdown_drain_secs = 30 # HTTP connection drain timeout on shutdown
# [server.tls]
# enabled = false
# cert_path = "cert.pem"
# key_path = "key.pem"
[storage]
url = "sqlite:orion.db" # Database URL (sqlite:, postgres://, mysql://)
# max_connections = 25 # Connection pool max connections
# min_connections = 5 # Connection pool min connections
# busy_timeout_ms = 5000 # SQLite busy timeout (ignored for other backends)
# acquire_timeout_secs = 5 # Connection pool acquire timeout
# idle_timeout_secs = 0 # Connection idle timeout (0 = no timeout)
[ingest]
max_payload_size = 1048576 # Maximum payload size in bytes (1 MB)
[engine]
# health_check_timeout_secs = 2 # Timeout for engine read lock in health checks
# reload_timeout_secs = 10 # Timeout for engine write lock during reload
# max_channel_call_depth = 10 # Max recursion depth for channel_call
# default_channel_call_timeout_ms = 30000 # Default timeout for channel_call
# global_http_timeout_secs = 30 # Global timeout for HTTP client
# max_pool_cache_entries = 100 # Max cached connection pools for external connectors
# cache_cleanup_interval_secs = 60 # Cleanup interval for idle connection pools
# [engine.circuit_breaker]
# enabled = false # Enable circuit breakers for connectors
# failure_threshold = 5 # Failures before tripping the breaker
# recovery_timeout_secs = 30 # Cooldown before half-open probe
# max_breakers = 10000 # Max circuit breaker instances (LRU eviction)
[queue]
workers = 4 # Concurrent async trace workers
buffer_size = 1000 # Channel buffer for pending traces
# shutdown_timeout_secs = 30 # Timeout for in-flight jobs during shutdown
# trace_retention_hours = 72 # How long to keep completed traces (0 = disabled)
# trace_cleanup_interval_secs = 3600 # Cleanup check interval
# processing_timeout_ms = 60000 # Per-trace processing timeout
# max_result_size_bytes = 1048576 # Max size of trace result (1 MB)
# max_queue_memory_bytes = 104857600 # Max memory for queued traces (100 MB)
# dlq_retry_enabled = true # Enable dead letter queue retry
# dlq_max_retries = 5 # Max retry attempts for DLQ entries
# dlq_poll_interval_secs = 30 # DLQ retry poll interval
[rate_limit]
# enabled = false # Enable platform-level rate limiting
# default_rps = 100 # Default requests per second
# default_burst = 50 # Default burst allowance
# [rate_limit.endpoints]
# admin_rps = 50 # Rate limit for admin routes
# data_rps = 200 # Rate limit for data routes
[admin_auth]
# enabled = false # Require authentication for admin endpoints
# api_key = "your-secret-key" # The API key or bearer token
# header = "Authorization" # "Authorization" = Bearer format, other = raw key
[cors]
# allowed_origins = ["*"] # Global CORS allowed origins
[kafka]
enabled = false
brokers = ["localhost:9092"]
group_id = "orion"
# processing_timeout_ms = 60000 # Per-message processing timeout
# max_inflight = 100 # Max in-flight messages
# lag_poll_interval_secs = 30 # Consumer lag poll interval
[[kafka.topics]] # Map Kafka topics to channels
topic = "incoming-orders"
channel = "orders"
[kafka.dlq]
enabled = false
topic = "orion-dlq"
[logging]
level = "info" # trace, debug, info, warn, error
format = "pretty" # pretty or json
[metrics]
enabled = false
[tracing]
# enabled = false # Enable OpenTelemetry trace export
# otlp_endpoint = "http://localhost:4317" # OTLP gRPC endpoint
# service_name = "orion" # Service name in traces
# sample_rate = 1.0 # 0.0 (none) to 1.0 (all)
[channels] # Control which channels this instance loads
# include = ["orders.*", "payments.*"] # Glob patterns to include (empty = all)
# exclude = ["analytics.*"] # Glob patterns to exclude
Environment Variable Overrides
Override any setting with environment variables using double-underscore nesting:
ORION_SERVER__PORT=9090
ORION_STORAGE__URL="postgres://user:pass@localhost/orion"
ORION_KAFKA__ENABLED=true
ORION_LOGGING__FORMAT=json
ORION_RATE_LIMIT__ENABLED=true
ORION_ADMIN_AUTH__ENABLED=true
ORION_ADMIN_AUTH__API_KEY="your-secret-key"
ORION_TRACING__ENABLED=true
ORION_TRACING__OTLP_ENDPOINT="http://jaeger:4317"
ORION_METRICS__ENABLED=true
Environment variables take precedence over config file values.
Built-in Capabilities
All capabilities are compiled into a single binary and controlled at runtime:
| Capability | Configuration | Default |
|---|---|---|
| Database backend | storage.url scheme | SQLite |
| Kafka | kafka.enabled | Disabled |
| OpenTelemetry | tracing.enabled | Disabled |
| TLS/HTTPS | server.tls.enabled | Disabled |
| Swagger UI | Always at /docs | Enabled |
| SQL connectors | db_read/db_write functions | Always available |
| Redis cache | cache_read/cache_write with Redis backend | Always available |
| MongoDB connector | mongo_read function | Always available |
| Rate limiting | rate_limit.enabled | Disabled |
| Metrics | metrics.enabled | Disabled |
Production Checklist
- Mount a persistent volume for
orion.db(SQLite) or configurestorage.urlfor PostgreSQL/MySQL - Enable admin API authentication:
ORION_ADMIN_AUTH__ENABLED=true - Set structured logging:
ORION_LOGGING__FORMAT=json - Enable Prometheus metrics:
ORION_METRICS__ENABLED=true - Configure rate limiting for traffic protection (platform-level and per-channel)
- Use per-crate filtering:
RUST_LOG=orion=info - Enable OpenTelemetry:
ORION_TRACING__ENABLED=truefor distributed tracing - Enable TLS via the
server.tlssection - Enable circuit breakers:
engine.circuit_breaker.enabled = true - Set trace retention:
queue.trace_retention_hoursfor trace data lifecycle management