The Executor class orchestrates workflow execution with events and state management.
Class: Executor
class Executor extends AsyncEventEmitter {
// Methods
execute(workflow: Workflow, options?: ExecutionOptions): Promise<ExecutionResult>;
cancel(executionId: string, reason?: CancellationReason): void;
cancelAll(reason?: CancellationReason): void;
isExecutionActive(executionId: string): boolean;
// Event Methods (inherited from AsyncEventEmitter)
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
off(event: string, listener: Function): this;
emit(event: string, ...args: any[]): Promise<boolean>;
}
Methods
execute()
Execute a workflow with options.
execute(workflow: Workflow, options?: ExecutionOptions): Promise<ExecutionResult>;
Workflow instance to execute.
Optional execution configuration:
timeout?: number - Maximum execution time (ms)
variables?: Record<string, any> - Workflow variables
abortSignal?: AbortSignal - External cancellation signal
Returns: Promise resolving to ExecutionResult.
Example:
import { Executor } from '@crystalflow/core';
const executor = new Executor();
const result = await executor.execute(workflow, {
timeout: 30000,
variables: { apiKey: 'xxx' }
});
console.log(result.status); // 'success'
console.log(result.duration); // 1523 (ms)
cancel()
Cancel a specific execution.
cancel(executionId: string, reason?: CancellationReason): void;
Cancellation reason (default: CancellationReason.UserCancelled).
Example:
let executionId: string;
executor.on('beforeExecute', (context) => {
executionId = context.executionId;
});
// Cancel after 5 seconds
setTimeout(() => {
executor.cancel(executionId, CancellationReason.UserCancelled);
}, 5000);
cancelAll()
Cancel all active executions.
cancelAll(reason?: CancellationReason): void;
Cancellation reason (default: CancellationReason.UserCancelled).
Example:
executor.cancelAll(CancellationReason.UserCancelled);
isExecutionActive()
Check if an execution is currently running.
isExecutionActive(executionId: string): boolean;
Returns: true if execution is active, false otherwise.
Example:
if (executor.isExecutionActive(executionId)) {
console.log('Execution is running');
}
Events
The Executor emits events throughout the execution lifecycle. All event listeners can be synchronous or asynchronous.
beforeExecute
Emitted before workflow execution starts.
executor.on('beforeExecute', (context: ExecutionContext) => {
console.log(`Starting execution: ${context.executionId}`);
});
Execution context with executionId, workflowId, variables, etc.
afterExecute
Emitted after workflow execution completes.
executor.on('afterExecute', (result: ExecutionResult) => {
console.log(`Execution ${result.status}: ${result.duration}ms`);
});
Complete execution result.
onNodeStart
Emitted when a node starts executing.
executor.on('onNodeStart', (nodeId: string, node: Node) => {
console.log(`Node ${nodeId} starting...`);
});
onNodeComplete
Emitted when a node completes successfully.
executor.on('onNodeComplete', (nodeId: string, result: NodeExecutionResult) => {
console.log(`Node ${nodeId} completed:`, result.outputs);
});
Node execution result with outputs, timing, etc.
onNodeError
Emitted when a node fails.
executor.on('onNodeError', (nodeId: string, error: Error) => {
console.error(`Node ${nodeId} failed:`, error);
});
onError
Emitted when workflow execution fails.
executor.on('onError', (error: Error) => {
console.error('Workflow failed:', error);
});
Error that caused workflow failure.
onCancellation
Emitted when execution is cancelled.
executor.on('onCancellation', (
executionId: string,
reason: CancellationReason,
cancelledAt: Date
) => {
console.log(`Execution ${executionId} cancelled: ${reason}`);
});
Timestamp of cancellation.
Types
ExecutionOptions
interface ExecutionOptions {
timeout?: number;
variables?: Record<string, any>;
abortSignal?: AbortSignal;
}
ExecutionResult
interface ExecutionResult {
id: string;
workflowId: string;
status: 'success' | 'failed' | 'cancelled';
startTime: Date;
endTime?: Date;
duration?: number;
nodeResults: Map<string, NodeExecutionResult>;
error?: Error;
cancellationReason?: CancellationReason;
cancelledAt?: Date;
}
NodeExecutionResult
interface NodeExecutionResult {
nodeId: string;
state: NodeState;
outputs: Record<string, any>;
error?: Error;
startTime: Date;
endTime?: Date;
duration?: number;
}
ExecutionContext
interface ExecutionContext {
executionId: string;
workflowId: string;
startTime: Date;
variables: Record<string, any>;
nodeResults: Map<string, NodeExecutionResult>;
}
CancellationReason
enum CancellationReason {
Timeout = 'timeout',
UserCancelled = 'user-cancelled',
ExternalSignal = 'external-signal',
ResourceLimit = 'resource-limit'
}
Usage Examples
Basic Execution
import { Executor } from '@crystalflow/core';
const executor = new Executor();
const result = await executor.execute(workflow);
console.log(result.status); // 'success'
With Events
const executor = new Executor();
executor.on('beforeExecute', (context) => {
console.log(`Starting: ${context.executionId}`);
});
executor.on('onNodeStart', (nodeId, node) => {
console.log(`Executing ${node.type} (${nodeId})`);
});
executor.on('onNodeComplete', (nodeId, result) => {
console.log(`Completed ${nodeId}:`, result.outputs);
});
executor.on('afterExecute', (result) => {
console.log(`Finished in ${result.duration}ms`);
});
await executor.execute(workflow);
Async Event Handlers
executor.on('onNodeComplete', async (nodeId, result) => {
// Executor waits for async handlers
await saveToDatabase(result);
await sendWebhook(result);
console.log('Persisted node result');
});
await executor.execute(workflow);
With Timeout
try {
const result = await executor.execute(workflow, {
timeout: 10000 // 10 seconds
});
} catch (error) {
if (error instanceof TimeoutError) {
console.error('Execution timed out');
}
}
With Cancellation
let executionId: string;
executor.on('beforeExecute', (context) => {
executionId = context.executionId;
});
executor.on('onCancellation', (id, reason) => {
console.log(`Cancelled: ${reason}`);
});
// Start execution
const promise = executor.execute(workflow);
// Cancel after 5 seconds
setTimeout(() => {
executor.cancel(executionId);
}, 5000);
try {
await promise;
} catch (error) {
if (error instanceof CancellationError) {
console.log('Execution was cancelled');
}
}
With AbortSignal
const controller = new AbortController();
// Start execution
const promise = executor.execute(workflow, {
abortSignal: controller.signal
});
// Cancel from outside
setTimeout(() => controller.abort(), 5000);
try {
await promise;
} catch (error) {
console.error('Cancelled:', error);
}
Progress Tracking
const totalNodes = workflow.nodes.size;
let completedNodes = 0;
executor.on('onNodeStart', (nodeId) => {
console.log(`Progress: ${completedNodes}/${totalNodes}`);
});
executor.on('onNodeComplete', (nodeId) => {
completedNodes++;
const progress = (completedNodes / totalNodes) * 100;
console.log(`Progress: ${progress.toFixed(0)}%`);
});
await executor.execute(workflow);
Error Recovery
executor.on('onNodeError', async (nodeId, error) => {
console.error(`Node ${nodeId} failed:`, error);
// Log to external service
await logError({
nodeId,
error: error.message,
timestamp: new Date()
});
});
executor.on('onError', async (error) => {
// Workflow failed - send notification
await sendAlertEmail({
subject: 'Workflow Failed',
body: error.message
});
});
try {
await executor.execute(workflow);
} catch (error) {
// Handle gracefully
console.log('Execution failed, but errors were logged');
}