The Execution Engine is responsible for running workflows, managing node execution, propagating data, and handling errors.
Overview
CrystalFlow’s execution engine provides:
Plan-Based Execution Build execution plan, then execute with full control
Conditional Flow Support for if/else and switch/case branching
Event System Comprehensive events for monitoring execution
Error Handling Graceful error handling with detailed context
Cancellation Support for cancelling long-running executions
The Executor
The Executor class orchestrates workflow execution:
import { Executor } from '@crystalflow/core' ;
const executor = new Executor ();
const result = await executor . execute ( workflow );
Execution Options
const result = await executor . execute ( workflow , {
timeout: 30000 , // Timeout in milliseconds
variables: { apiKey: 'xxx' }, // Global variables
abortSignal: controller . signal // Cancellation signal
});
Maximum execution time in milliseconds (default: no timeout)
Global variables accessible to all nodes during execution
Optional AbortSignal for cancelling execution
Execution Plan Architecture
CrystalFlow uses a plan-based execution architecture that enables advanced control flow patterns:
How It Works
Build Phase : The workflow graph is analyzed and converted into an execution plan
Execution Phase : The plan is executed step-by-step with full control
// Simplified internal flow
const plan = planBuilder . build ( workflow );
const result = await planExecutor . execute ( plan , workflow , context );
Benefits
Conditional Execution Native support for if/else, switch/case branching
Loop Support Future support for for/while loops (planned)
Debugging Step-by-step debugging with breakpoints (planned)
Inspectability Full visibility into execution plan structure
Step Types
The execution plan consists of different step types:
NodeExecutionStep : Execute a single node
ConditionalStep : Evaluate condition and execute matching branch
LoopStep : Execute steps repeatedly (planned)
ParallelStep : Execute multiple steps in parallel (planned)
The plan-based architecture is transparent to users - workflows execute smoothly
while supporting advanced control flow patterns like conditionals and future loop support.
For a deep dive into the architecture, see the execution-plan-architecture.md document.
Execution Flow
The execution follows these steps:
Validation
Workflow is validated (connections, required inputs)
Context Creation
ExecutionContext is created with unique execution ID
Build Execution Plan
Workflow graph is analyzed and converted to execution plan with steps and branches
Step-by-Step Execution
Each step executes in order:
Node Step : Execute node, transfer data, validate, store results
Conditional Step : Evaluate condition, select and execute matching branch
Check for cancellation between steps
Branch Evaluation
When a conditional step is encountered:
Execute the conditional node to update its state
Call evaluateCondition() to get active branch
Execute steps in the matching branch
Skip other branches
Result Collection
All node results are collected and returned
Event System
The Executor emits events throughout execution:
Before Execution
executor . on ( 'beforeExecute' , ( context ) => {
console . log ( 'Starting execution:' , context . executionId );
});
Node Events
executor . on ( 'onNodeStart' , ( nodeId , node ) => {
console . log ( `Starting node ${ nodeId } ( ${ node . type } )` );
});
executor . on ( 'onNodeComplete' , ( nodeId , result ) => {
console . log ( `Node ${ nodeId } completed:` , result . outputs );
});
executor . on ( 'onNodeError' , ( nodeId , error ) => {
console . error ( `Node ${ nodeId } failed:` , error . message );
});
After Execution
executor . on ( 'afterExecute' , ( result ) => {
console . log ( 'Execution complete:' , result . status );
console . log ( 'Duration:' , result . duration , 'ms' );
});
Error Events
executor . on ( 'onError' , ( error ) => {
console . error ( 'Workflow execution failed:' , error );
});
Step Events
Monitor execution plan step execution:
executor . on ( 'onStepStart' , ( stepId , stepType ) => {
console . log ( `Starting step ${ stepId } of type ${ stepType } ` );
});
executor . on ( 'onStepComplete' , ( stepId , stepType ) => {
console . log ( `Step ${ stepId } completed` );
});
executor . on ( 'onStepError' , ( stepId , stepType , error ) => {
console . error ( `Step ${ stepId } failed:` , error );
});
Step types include:
ExecutionStepType.Node - Regular node execution
ExecutionStepType.Conditional - Conditional branch selection
ExecutionStepType.Loop - Loop execution (planned)
ExecutionStepType.Parallel - Parallel execution (planned)
Branch Events
Track conditional branch execution:
executor . on ( 'onBranchEnter' , ( nodeId , branchId , condition ) => {
console . log ( `Entering ${ condition } branch of ${ nodeId } ` );
});
executor . on ( 'onBranchExit' , ( nodeId , branchId ) => {
console . log ( `Exiting branch ${ branchId } of ${ nodeId } ` );
});
Example with IfNode:
const executor = new Executor ();
executor . on ( 'onBranchEnter' , ( nodeId , branchId , condition ) => {
// condition will be 'thenOutput' or 'elseOutput'
console . log ( `IfNode ${ nodeId } taking ${ condition } branch` );
});
executor . on ( 'onBranchExit' , ( nodeId , branchId ) => {
console . log ( `Completed ${ branchId } branch of ${ nodeId } ` );
});
const result = await executor . execute ( workflowWithIfNode );
// Output:
// "IfNode if-node-1 taking thenOutput branch"
// "Completed branch-id branch of if-node-1"
Branch events are useful for:
Debugging conditional logic
Visualizing execution paths
Tracking which branches execute in complex workflows
Performance profiling of different branches
Async Event Listeners
All event listeners support async functions - the executor waits for them to complete:
executor . on ( 'onNodeComplete' , async ( nodeId , result ) => {
// Executor waits for async operations
await saveToDatabase ( result );
await sendWebhook ( nodeId );
console . log ( 'Node result processed' );
});
Execution Result
The executor returns an ExecutionResult object:
interface ExecutionResult {
id : string ; // Unique execution ID
workflowId : string ; // Workflow identifier
status : 'success' | 'failed' | 'cancelled' ; // Execution status
startTime : Date ; // When execution started
endTime ?: Date ; // When execution ended
duration ?: number ; // Duration in ms
nodeResults : Map < string , NodeExecutionResult >; // Results for each node
error ?: Error ; // Error if failed
cancellationReason ?: CancellationReason ; // Reason if cancelled
cancelledAt ?: Date ; // When cancelled
}
Using Results
const result = await executor . execute ( workflow );
if ( result . status === 'success' ) {
// Access node outputs
result . nodeResults . forEach (( nodeResult , nodeId ) => {
console . log ( ` ${ nodeId } :` , nodeResult . outputs );
});
console . log ( 'Total duration:' , result . duration , 'ms' );
} else {
console . error ( 'Execution failed:' , result . error );
}
Data Propagation
Data flows automatically between connected nodes:
// Node 1 outputs: { result: 42 }
// Connected to Node 2 input 'value'
// Node 2 automatically receives: this.value = 42
How It Works
Source node executes and sets output values
Executor identifies connections from source outputs
Target node inputs are populated with output values
Target node executes with received data
Error Handling
The execution engine provides robust error handling:
Node Execution Errors
import { NodeExecutionError } from '@crystalflow/core' ;
try {
const result = await executor . execute ( workflow );
} catch ( error ) {
if ( error instanceof NodeExecutionError ) {
console . error ( `Node ${ error . nodeId } ( ${ error . nodeType } ) failed` );
console . error ( 'Execution ID:' , error . executionId );
console . error ( 'Workflow ID:' , error . workflowId );
console . error ( 'Cause:' , error . cause );
}
}
Validation Errors
import { ValidationError } from '@crystalflow/core' ;
try {
const result = await executor . execute ( workflow );
} catch ( error ) {
if ( error instanceof ValidationError ) {
console . error ( 'Workflow validation failed:' , error . message );
}
}
Timeout Errors
import { TimeoutError } from '@crystalflow/core' ;
try {
const result = await executor . execute ( workflow , { timeout: 5000 });
} catch ( error ) {
if ( error instanceof TimeoutError ) {
console . error ( 'Execution timed out after' , error . timeout , 'ms' );
}
}
Cancellation
Workflows can be cancelled during execution:
User-Initiated Cancellation
// Start execution
const promise = executor . execute ( workflow );
// Cancel after 2 seconds
setTimeout (() => {
executor . cancel ( executionId , CancellationReason . UserCancelled );
}, 2000 );
try {
await promise ;
} catch ( error ) {
if ( error instanceof UserCancelledError ) {
console . log ( 'User cancelled execution' );
}
}
External Signal Cancellation
const controller = new AbortController ();
// Start execution
const promise = executor . execute ( workflow , {
abortSignal: controller . signal
});
// Cancel from external signal
controller . abort ();
Timeout-Based Cancellation
try {
const result = await executor . execute ( workflow , {
timeout: 30000 // 30 seconds
});
} catch ( error ) {
if ( error instanceof TimeoutError ) {
console . log ( 'Execution timed out' );
}
}
Mid-Node Cancellation
Nodes can check for cancellation during long operations:
class LongRunningNode extends Node {
async execute () {
for ( let i = 0 ; i < 1000 ; i ++ ) {
// Check if execution was cancelled
this . checkCancellation ();
await processItem ( i );
}
}
}
Execution Context
The ExecutionContext provides runtime information to nodes:
class CustomNode extends Node {
execute () {
// Access execution context
const executionId = this . context . executionId ;
const workflowId = this . context . workflowId ;
const variables = this . context . variables ;
// Use context data
console . log ( `Executing in ${ executionId } ` );
}
}
Context Properties
Unique identifier for this execution
Identifier of the workflow being executed
Global variables passed to execution
Track execution performance with timing data:
const result = await executor . execute ( workflow );
console . log ( 'Total execution time:' , result . duration , 'ms' );
// Individual node timings
result . nodeResults . forEach (( nodeResult , nodeId ) => {
console . log ( ` ${ nodeId } : ${ nodeResult . duration } ms` );
});
Best Practices
Wrap execution in try-catch and handle different error types appropriately.
Use timeouts to prevent workflows from hanging indefinitely.
Use events for logging, monitoring, and debugging execution flow.
Check Cancellation in Long Nodes
For long-running nodes, periodically check for cancellation.
Use Variables for Configuration
Pass runtime configuration through variables rather than hardcoding.
Example: Complete Execution
import { Executor , CancellationReason } from '@crystalflow/core' ;
async function runWorkflow ( workflow ) {
const executor = new Executor ();
// Set up event listeners
executor . on ( 'beforeExecute' , ( context ) => {
console . log ( `Starting execution ${ context . executionId } ` );
});
executor . on ( 'onNodeStart' , ( nodeId , node ) => {
console . log ( `Executing ${ node . type } ` );
});
executor . on ( 'onNodeComplete' , async ( nodeId , result ) => {
// Async event handler - executor waits
await logToDatabase ( nodeId , result );
});
executor . on ( 'afterExecute' , ( result ) => {
console . log ( `Completed in ${ result . duration } ms` );
});
// Execute with options
try {
const result = await executor . execute ( workflow , {
timeout: 60000 ,
variables: {
apiKey: process . env . API_KEY ,
environment: 'production'
}
});
if ( result . status === 'success' ) {
console . log ( 'Workflow succeeded!' );
return result ;
}
} catch ( error ) {
console . error ( 'Execution failed:' , error );
throw error ;
}
}
Next Steps