Skip to main content
CrystalFlow’s execution engine provides a powerful event system for monitoring and controlling workflow execution.

Execution Basics

Execute a workflow with the Executor:
import { Executor } from '@crystalflow/core';

const executor = new Executor();
const result = await executor.execute(workflow);

console.log('Status:', result.status);
console.log('Duration:', result.duration, 'ms');

Execution Options

const result = await executor.execute(workflow, {
  timeout: 30000,              // Timeout in milliseconds
  variables: {                 // Global variables
    apiKey: 'your-key',
    environment: 'production'
  },
  abortSignal: controller.signal  // Cancellation signal
});

Event System

The Executor emits events throughout the workflow lifecycle:

Lifecycle Events

1

beforeExecute

Fired before workflow execution starts
2

onNodeStart

Fired when each node starts executing
3

onNodeComplete

Fired when each node completes successfully
4

onNodeError

Fired when a node encounters an error
5

afterExecute

Fired after workflow execution completes

Event Listeners

Before Execution

executor.on('beforeExecute', (context) => {
  console.log('Starting execution:', context.executionId);
  console.log('Workflow:', context.workflowId);
  console.log('Variables:', context.variables);
});

Node Start

executor.on('onNodeStart', (nodeId, node) => {
  console.log(`Executing node: ${nodeId}`);
  console.log(`Type: ${node.type}`);
  console.log(`Label: ${node.label}`);
});

Node Complete

executor.on('onNodeComplete', (nodeId, result) => {
  console.log(`Node ${nodeId} completed`);
  console.log('Outputs:', result.outputs);
  console.log('Duration:', result.duration, 'ms');
});

Node Error

executor.on('onNodeError', (nodeId, error) => {
  console.error(`Node ${nodeId} failed:`, error.message);
  // Optionally handle specific errors
  if (error instanceof ValidationError) {
    console.error('Validation failed');
  }
});

After Execution

executor.on('afterExecute', (result) => {
  console.log('Execution complete');
  console.log('Status:', result.status);
  console.log('Total duration:', result.duration, 'ms');
  
  if (result.status === 'success') {
    console.log('All nodes executed successfully');
  } else if (result.status === 'failed') {
    console.error('Execution failed:', result.error);
  }
});

General Error Handler

executor.on('onError', (error) => {
  console.error('Workflow execution error:', error);
  // Send to error tracking service
  // sentryClient.captureException(error);
});

Async Event Handlers

All event listeners support async functions - the executor waits for them:
executor.on('onNodeComplete', async (nodeId, result) => {
  // Executor waits for these async operations
  await saveToDatabase(nodeId, result);
  await sendWebhook({
    nodeId,
    outputs: result.outputs,
    duration: result.duration
  });
  console.log('Node result persisted');
});

executor.on('beforeExecute', async (context) => {
  // Load configuration from API
  const config = await fetchConfiguration();
  context.variables = { ...context.variables, ...config };
});

Execution Result

The execution returns a detailed result object:
interface ExecutionResult {
  id: string;                    // Unique execution ID
  workflowId: string;            // Workflow identifier
  status: ExecutionStatus;       // 'success' | 'failed' | 'cancelled'
  startTime: Date;               // Start timestamp
  endTime?: Date;                // End timestamp
  duration?: number;             // Duration in ms
  nodeResults: Map<string, NodeExecutionResult>;  // Results per node
  error?: Error;                 // Error if failed
  cancellationReason?: string;   // Reason if cancelled
  cancelledAt?: Date;            // Cancellation timestamp
}

Accessing Node Results

const result = await executor.execute(workflow);

if (result.status === 'success') {
  // Iterate through all node results
  result.nodeResults.forEach((nodeResult, nodeId) => {
    console.log(`${nodeId}:`);
    console.log('  State:', nodeResult.state);
    console.log('  Outputs:', nodeResult.outputs);
    console.log('  Duration:', nodeResult.duration, 'ms');
  });
}

Complete Example

import { Executor, ExecutionError, NodeExecutionError } from '@crystalflow/core';

async function executeWorkflow(workflow) {
  const executor = new Executor();
  
  // Set up all event listeners
  executor.on('beforeExecute', (context) => {
    console.log(`\n=== Starting Workflow Execution ===`);
    console.log(`Execution ID: ${context.executionId}`);
    console.log(`Workflow ID: ${context.workflowId}\n`);
  });
  
  executor.on('onNodeStart', (nodeId, node) => {
    console.log(`▶ Executing: ${node.type} (${nodeId})`);
  });
  
  executor.on('onNodeComplete', async (nodeId, result) => {
    console.log(`✓ Completed: ${nodeId} in ${result.duration}ms`);
    console.log(`  Outputs:`, result.outputs);
    
    // Persist results
    await saveNodeResult(nodeId, result);
  });
  
  executor.on('onNodeError', (nodeId, error) => {
    console.error(`✗ Failed: ${nodeId}`);
    console.error(`  Error: ${error.message}`);
  });
  
  executor.on('afterExecute', (result) => {
    console.log(`\n=== Execution Complete ===`);
    console.log(`Status: ${result.status}`);
    console.log(`Total Duration: ${result.duration}ms`);
    console.log(`Nodes Executed: ${result.nodeResults.size}`);
  });
  
  executor.on('onError', (error) => {
    console.error('FATAL ERROR:', error.message);
  });
  
  // Execute with options
  try {
    const result = await executor.execute(workflow, {
      timeout: 60000,
      variables: {
        apiKey: process.env.API_KEY,
        environment: process.env.NODE_ENV
      }
    });
    
    return result;
  } catch (error) {
    if (error instanceof NodeExecutionError) {
      console.error(`Node ${error.nodeId} failed in execution ${error.executionId}`);
    } else if (error instanceof ExecutionError) {
      console.error(`Workflow execution failed:`, error.message);
    }
    throw error;
  }
}

Cancellation Events

Cancellation events: The onCancellation event is available for tracking cancelled executions.
executor.on('onCancellation', (executionId, reason, cancelledAt) => {
  console.log(`Execution ${executionId} cancelled`);
  console.log(`Reason: ${reason}`);
  console.log(`Cancelled at: ${cancelledAt}`);
});

// Cancel an execution
executor.cancel(executionId, CancellationReason.UserCancelled);

Progress Tracking

Track execution progress with event counts:
let totalNodes = 0;
let completedNodes = 0;

executor.on('beforeExecute', (context) => {
  totalNodes = workflow.nodes.size;
  completedNodes = 0;
});

executor.on('onNodeComplete', (nodeId, result) => {
  completedNodes++;
  const progress = (completedNodes / totalNodes) * 100;
  console.log(`Progress: ${progress.toFixed(1)}%`);
});

UI Integration

Integrate events with React UI:
import React, { useState } from 'react';
import { Executor } from '@crystalflow/core';

function WorkflowExecutor({ workflow }) {
  const [status, setStatus] = useState('idle');
  const [progress, setProgress] = useState(0);
  const [nodeStates, setNodeStates] = useState({});
  
  const executeWorkflow = async () => {
    const executor = new Executor();
    
    executor.on('beforeExecute', () => {
      setStatus('running');
      setProgress(0);
    });
    
    executor.on('onNodeStart', (nodeId) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'running'
      }));
    });
    
    executor.on('onNodeComplete', (nodeId, result) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'success'
      }));
      setProgress(prev => prev + (100 / workflow.nodes.size));
    });
    
    executor.on('onNodeError', (nodeId) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'error'
      }));
    });
    
    executor.on('afterExecute', (result) => {
      setStatus(result.status);
    });
    
    await executor.execute(workflow);
  };
  
  return (
    <div>
      <button onClick={executeWorkflow}>Execute</button>
      <div>Status: {status}</div>
      <div>Progress: {progress.toFixed(0)}%</div>
      <div>
        {Object.entries(nodeStates).map(([id, state]) => (
          <div key={id}>{id}: {state}</div>
        ))}
      </div>
    </div>
  );
}

Best Practices

Use onError and onNodeError listeners to handle failures gracefully.
Clean up resources in async event handlers (close connections, clear timers).
Keep event handlers lightweight - don’t block execution with heavy operations.
Use async handlers for database writes, API calls, and file operations.
Use timing data from results to identify slow nodes.

Next Steps