Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Workflow

A Workflow is a collection of tasks that execute sequentially to process data.

Overview

Workflows provide:

  • Task Organization - Group related processing steps
  • Priority Ordering - Control execution order across workflows
  • Conditional Execution - Only run when conditions are met
  • Error Handling - Continue or stop on errors

Workflow Structure

{
    "id": "user_processor",
    "name": "User Processor",
    "priority": 1,
    "condition": { "==": [{"var": "metadata.type"}, "user"] },
    "continue_on_error": false,
    "tasks": [
        {
            "id": "validate_user",
            "name": "Validate User",
            "function": { ... }
        },
        {
            "id": "enrich_user",
            "name": "Enrich User Data",
            "function": { ... }
        }
    ]
}

Fields

FieldTypeRequiredDescription
idstringYesUnique workflow identifier
namestringNoHuman-readable name
prioritynumberNoExecution order (default: 0)
conditionJSONLogicNoWhen to execute workflow
continue_on_errorbooleanNoContinue on task failure (default: false)
tasksarrayYesTasks to execute

Creating Workflows

From JSON String

#![allow(unused)]
fn main() {
use dataflow_rs::Workflow;

let workflow = Workflow::from_json(r#"{
    "id": "my_workflow",
    "name": "My Workflow",
    "tasks": [...]
}"#)?;
}

From File

#![allow(unused)]
fn main() {
let workflow = Workflow::from_file("workflows/my_workflow.json")?;
}

Priority Ordering

Workflows execute in priority order (lowest first):

// Executes first (priority 1)
{
    "id": "validation",
    "priority": 1,
    "tasks": [...]
}

// Executes second (priority 2)
{
    "id": "transformation",
    "priority": 2,
    "tasks": [...]
}

// Executes last (priority 10)
{
    "id": "notification",
    "priority": 10,
    "tasks": [...]
}

Conditional Execution

Use JSONLogic conditions to control when workflows run:

{
    "id": "premium_user_workflow",
    "condition": {
        "and": [
            {"==": [{"var": "metadata.type"}, "user"]},
            {"==": [{"var": "data.premium"}, true]}
        ]
    },
    "tasks": [...]
}

Common Condition Patterns

// Match metadata type
{"==": [{"var": "metadata.type"}, "order"]}

// Check data exists
{"!!": {"var": "data.email"}}

// Multiple conditions
{"and": [
    {">=": [{"var": "data.amount"}, 100]},
    {"==": [{"var": "data.currency"}, "USD"]}
]}

// Either condition
{"or": [
    {"==": [{"var": "metadata.source"}, "api"]},
    {"==": [{"var": "metadata.source"}, "webhook"]}
]}

Error Handling

Stop on Error (Default)

{
    "id": "strict_workflow",
    "continue_on_error": false,
    "tasks": [...]
}

If any task fails, the workflow stops and the error is recorded.

Continue on Error

{
    "id": "resilient_workflow",
    "continue_on_error": true,
    "tasks": [...]
}

Tasks continue executing even if previous tasks fail. Errors are collected in message.errors.

Task Dependencies

Tasks within a workflow execute sequentially, allowing later tasks to depend on earlier results:

{
    "id": "pipeline",
    "tasks": [
        {
            "id": "fetch_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "temp_data.fetched", "logic": {"var": "data.source"}}
                    ]
                }
            }
        },
        {
            "id": "process_data",
            "function": {
                "name": "map",
                "input": {
                    "mappings": [
                        {"path": "data.result", "logic": {"var": "temp_data.fetched"}}
                    ]
                }
            }
        }
    ]
}

Try It

Try changing metadata.type to something other than “user” to see the workflow skip.