CrystalFlow’s execution engine provides a powerful event system for monitoring and controlling workflow execution.
Execution Basics
Execute a workflow with the Executor:
import { Executor } from '@crystalflow/core';
const executor = new Executor();
const result = await executor.execute(workflow);
console.log('Status:', result.status);
console.log('Duration:', result.duration, 'ms');
Execution Options
const result = await executor.execute(workflow, {
timeout: 30000, // Timeout in milliseconds
variables: { // Global variables
apiKey: 'your-key',
environment: 'production'
},
abortSignal: controller.signal // Cancellation signal
});
Event System
The Executor emits events throughout the workflow lifecycle:
Lifecycle Events
beforeExecute
Fired before workflow execution starts
onNodeStart
Fired when each node starts executing
onNodeComplete
Fired when each node completes successfully
onNodeError
Fired when a node encounters an error
afterExecute
Fired after workflow execution completes
Event Listeners
Before Execution
executor.on('beforeExecute', (context) => {
console.log('Starting execution:', context.executionId);
console.log('Workflow:', context.workflowId);
console.log('Variables:', context.variables);
});
Node Start
executor.on('onNodeStart', (nodeId, node) => {
console.log(`Executing node: ${nodeId}`);
console.log(`Type: ${node.type}`);
console.log(`Label: ${node.label}`);
});
Node Complete
executor.on('onNodeComplete', (nodeId, result) => {
console.log(`Node ${nodeId} completed`);
console.log('Outputs:', result.outputs);
console.log('Duration:', result.duration, 'ms');
});
Node Error
executor.on('onNodeError', (nodeId, error) => {
console.error(`Node ${nodeId} failed:`, error.message);
// Optionally handle specific errors
if (error instanceof ValidationError) {
console.error('Validation failed');
}
});
After Execution
executor.on('afterExecute', (result) => {
console.log('Execution complete');
console.log('Status:', result.status);
console.log('Total duration:', result.duration, 'ms');
if (result.status === 'success') {
console.log('All nodes executed successfully');
} else if (result.status === 'failed') {
console.error('Execution failed:', result.error);
}
});
General Error Handler
executor.on('onError', (error) => {
console.error('Workflow execution error:', error);
// Send to error tracking service
// sentryClient.captureException(error);
});
Async Event Handlers
All event listeners support async functions - the executor waits for them:
executor.on('onNodeComplete', async (nodeId, result) => {
// Executor waits for these async operations
await saveToDatabase(nodeId, result);
await sendWebhook({
nodeId,
outputs: result.outputs,
duration: result.duration
});
console.log('Node result persisted');
});
executor.on('beforeExecute', async (context) => {
// Load configuration from API
const config = await fetchConfiguration();
context.variables = { ...context.variables, ...config };
});
Execution Result
The execution returns a detailed result object:
interface ExecutionResult {
id: string; // Unique execution ID
workflowId: string; // Workflow identifier
status: ExecutionStatus; // 'success' | 'failed' | 'cancelled'
startTime: Date; // Start timestamp
endTime?: Date; // End timestamp
duration?: number; // Duration in ms
nodeResults: Map<string, NodeExecutionResult>; // Results per node
error?: Error; // Error if failed
cancellationReason?: string; // Reason if cancelled
cancelledAt?: Date; // Cancellation timestamp
}
Accessing Node Results
const result = await executor.execute(workflow);
if (result.status === 'success') {
// Iterate through all node results
result.nodeResults.forEach((nodeResult, nodeId) => {
console.log(`${nodeId}:`);
console.log(' State:', nodeResult.state);
console.log(' Outputs:', nodeResult.outputs);
console.log(' Duration:', nodeResult.duration, 'ms');
});
}
Complete Example
import { Executor, ExecutionError, NodeExecutionError } from '@crystalflow/core';
async function executeWorkflow(workflow) {
const executor = new Executor();
// Set up all event listeners
executor.on('beforeExecute', (context) => {
console.log(`\n=== Starting Workflow Execution ===`);
console.log(`Execution ID: ${context.executionId}`);
console.log(`Workflow ID: ${context.workflowId}\n`);
});
executor.on('onNodeStart', (nodeId, node) => {
console.log(`▶ Executing: ${node.type} (${nodeId})`);
});
executor.on('onNodeComplete', async (nodeId, result) => {
console.log(`✓ Completed: ${nodeId} in ${result.duration}ms`);
console.log(` Outputs:`, result.outputs);
// Persist results
await saveNodeResult(nodeId, result);
});
executor.on('onNodeError', (nodeId, error) => {
console.error(`✗ Failed: ${nodeId}`);
console.error(` Error: ${error.message}`);
});
executor.on('afterExecute', (result) => {
console.log(`\n=== Execution Complete ===`);
console.log(`Status: ${result.status}`);
console.log(`Total Duration: ${result.duration}ms`);
console.log(`Nodes Executed: ${result.nodeResults.size}`);
});
executor.on('onError', (error) => {
console.error('FATAL ERROR:', error.message);
});
// Execute with options
try {
const result = await executor.execute(workflow, {
timeout: 60000,
variables: {
apiKey: process.env.API_KEY,
environment: process.env.NODE_ENV
}
});
return result;
} catch (error) {
if (error instanceof NodeExecutionError) {
console.error(`Node ${error.nodeId} failed in execution ${error.executionId}`);
} else if (error instanceof ExecutionError) {
console.error(`Workflow execution failed:`, error.message);
}
throw error;
}
}
Cancellation Events
Cancellation events: The onCancellation event is available for tracking cancelled executions.
executor.on('onCancellation', (executionId, reason, cancelledAt) => {
console.log(`Execution ${executionId} cancelled`);
console.log(`Reason: ${reason}`);
console.log(`Cancelled at: ${cancelledAt}`);
});
// Cancel an execution
executor.cancel(executionId, CancellationReason.UserCancelled);
Progress Tracking
Track execution progress with event counts:
let totalNodes = 0;
let completedNodes = 0;
executor.on('beforeExecute', (context) => {
totalNodes = workflow.nodes.size;
completedNodes = 0;
});
executor.on('onNodeComplete', (nodeId, result) => {
completedNodes++;
const progress = (completedNodes / totalNodes) * 100;
console.log(`Progress: ${progress.toFixed(1)}%`);
});
UI Integration
Integrate events with React UI:
import React, { useState } from 'react';
import { Executor } from '@crystalflow/core';
function WorkflowExecutor({ workflow }) {
const [status, setStatus] = useState('idle');
const [progress, setProgress] = useState(0);
const [nodeStates, setNodeStates] = useState({});
const executeWorkflow = async () => {
const executor = new Executor();
executor.on('beforeExecute', () => {
setStatus('running');
setProgress(0);
});
executor.on('onNodeStart', (nodeId) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'running'
}));
});
executor.on('onNodeComplete', (nodeId, result) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'success'
}));
setProgress(prev => prev + (100 / workflow.nodes.size));
});
executor.on('onNodeError', (nodeId) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'error'
}));
});
executor.on('afterExecute', (result) => {
setStatus(result.status);
});
await executor.execute(workflow);
};
return (
<div>
<button onClick={executeWorkflow}>Execute</button>
<div>Status: {status}</div>
<div>Progress: {progress.toFixed(0)}%</div>
<div>
{Object.entries(nodeStates).map(([id, state]) => (
<div key={id}>{id}: {state}</div>
))}
</div>
</div>
);
}
Best Practices
Use onError and onNodeError listeners to handle failures gracefully.
Clean up resources in async event handlers (close connections, clear timers).
Keep event handlers lightweight - don’t block execution with heavy operations.
Use async handlers for database writes, API calls, and file operations.
Use timing data from results to identify slow nodes.
Next Steps