The Node class is the foundation of all workflow nodes in CrystalFlow.
Class: Node
abstract class Node {
// Properties
id: string;
type: string;
metadata: NodeMetadata;
state: NodeState;
// Methods
abstract execute(): void | Promise<void>;
validate(): ValidationResult;
run(context?: ExecutionContext): Promise<void>;
reset(): void;
toJSON(): NodeJSON;
static fromJSON(json: NodeJSON): Node;
checkCancellation(): void;
}
Properties
Unique identifier for the node instance. Auto-generated on creation.
Node type identifier (e.g., 'math.add'). Set via @defineNode decorator.
Node metadata including label, category, description, and port definitions.
Current execution state: 'idle', 'executing', 'success', or 'error'.
Methods
execute()
Execute the node’s business logic. Must be implemented by all nodes.
abstract execute(): void | Promise<void>;
Example:
@defineNode({ type: 'math.add', label: 'Add' })
class AddNode extends Node {
@Input({ type: 'number', label: 'A' })
a: number = 0;
@Input({ type: 'number', label: 'B' })
b: number = 0;
@Output({ type: 'number', label: 'Result' })
result: number;
execute() {
this.result = this.a + this.b;
}
}
Async execution:
async execute() {
const response = await fetch(this.url);
this.data = await response.json();
}
validate()
Validate node configuration and inputs.
validate(): ValidationResult;
Returns: ValidationResult with isValid boolean and optional errors array.
Example:
validate() {
const result = super.validate();
if (this.url && !this.url.startsWith('http')) {
result.isValid = false;
result.errors = result.errors || [];
result.errors.push('URL must start with http:// or https://');
}
return result;
}
run()
Execute the node with validation and state management. Called by the execution engine.
run(context?: ExecutionContext): Promise<void>;
Optional execution context providing variables and timing info.
Flow:
- Set state to
'executing'
- Call
validate()
- Call
execute()
- Set state to
'success' or 'error'
Example:
// Usually called by executor
await node.run(executionContext);
// Direct execution (for testing)
await node.run();
reset()
Reset node state to idle and clear outputs.
Example:
node.reset();
console.log(node.state); // 'idle'
toJSON()
Serialize node to JSON format.
Returns: JSON representation with id, type, position, and data (inputs, outputs, properties).
Example:
const json = node.toJSON();
console.log(json);
// {
// id: '123',
// type: 'math.add',
// position: { x: 100, y: 100 },
// data: {
// inputs: { a: 5, b: 3 },
// outputs: { result: 8 },
// properties: {}
// }
// }
fromJSON()
Deserialize node from JSON format.
static fromJSON(json: NodeJSON): Node;
JSON representation of the node.
Returns: New node instance with restored state.
Example:
const json = { id: '123', type: 'math.add', data: { inputs: { a: 5, b: 3 } } };
const node = Node.fromJSON(json);
checkCancellation()
Check if execution has been cancelled. Throws CancellationError if cancelled.
checkCancellation(): void;
Example:
async execute() {
for (let i = 0; i < 1000; i++) {
this.checkCancellation(); // Throws if cancelled
await processItem(i);
}
}
Types
interface NodeMetadata {
type: string;
label: string;
category: string;
description?: string;
inputs: InputMetadata[];
outputs: OutputMetadata[];
properties: PropertyMetadata[];
}
NodeState
type NodeState = 'idle' | 'executing' | 'success' | 'error';
ValidationResult
interface ValidationResult {
isValid: boolean;
errors?: string[];
}
NodeJSON
interface NodeJSON {
id: string;
type: string;
position: { x: number; y: number };
data: {
inputs: Record<string, any>;
outputs: Record<string, any>;
properties: Record<string, any>;
};
}
Interface: IConditionalNode
Nodes implement this interface to enable conditional branching in workflows.
interface IConditionalNode {
/**
* Evaluate the condition and return the name of the active branch.
* Must return a valid output port name.
* Called after execute() to determine which branch executes.
*/
evaluateCondition(): string;
}
evaluateCondition()
Determines which branch should execute based on the node’s state.
evaluateCondition(): string;
Returns: The name of the output port representing the active branch.
Requirements:
- Must return a string matching an existing output port name
- Called after
execute() completes
- Must be deterministic based on node state
- Should not modify node state or perform side effects
Example Implementation:
import { Node, defineNode, Input, Output, IConditionalNode } from '@crystalflow/core';
@defineNode({
type: 'flow.if',
label: 'If',
category: 'Flow Control',
})
export class IfNode extends Node implements IConditionalNode {
@Input({ type: 'boolean', label: 'Condition', required: true })
condition!: boolean;
@Output({ type: 'any', label: 'Then' })
thenOutput?: unknown;
@Output({ type: 'any', label: 'Else' })
elseOutput?: unknown;
execute(): void {
if (this.condition) {
this.thenOutput = this.value;
} else {
this.elseOutput = this.value;
}
}
evaluateCondition(): string {
// Return output port name based on condition
return this.condition ? 'thenOutput' : 'elseOutput';
}
}
Custom Conditional Node:
@defineNode({
type: 'logic.greater-than',
label: 'Greater Than',
category: 'Logic',
})
export class GreaterThanNode extends Node implements IConditionalNode {
@Input({ type: 'number', label: 'A', required: true })
a!: number;
@Input({ type: 'number', label: 'B', required: true })
b!: number;
@Output({ type: 'any', label: 'Greater' })
greater?: unknown;
@Output({ type: 'any', label: 'Less or Equal' })
lessOrEqual?: unknown;
execute(): void {
if (this.a > this.b) {
this.greater = { a: this.a, b: this.b, result: true };
} else {
this.lessOrEqual = { a: this.a, b: this.b, result: false };
}
}
evaluateCondition(): string {
return this.a > this.b ? 'greater' : 'lessOrEqual';
}
}
The execution engine uses evaluateCondition() to build and execute conditional
branches. Only nodes connected to the returned output port will execute.
See also:
Examples
Basic Node
@defineNode({
type: 'text.uppercase',
label: 'Uppercase',
category: 'Text'
})
class UppercaseNode extends Node {
@Input({ type: 'string', label: 'Text' })
text: string = '';
@Output({ type: 'string', label: 'Result' })
result: string;
execute() {
this.result = this.text.toUpperCase();
}
}
Async Node with Validation
@defineNode({
type: 'http.get',
label: 'HTTP GET',
category: 'HTTP'
})
class HttpGetNode extends Node {
@Property({ type: 'string', label: 'URL', required: true })
url: string = '';
@Output({ type: 'any', label: 'Data' })
data: any;
validate() {
const result = super.validate();
if (!this.url) {
result.isValid = false;
result.errors = ['URL is required'];
}
return result;
}
async execute() {
const response = await fetch(this.url);
this.data = await response.json();
}
}
Long-Running Node with Cancellation
@defineNode({
type: 'data.batch-process',
label: 'Batch Process',
category: 'Data'
})
class BatchProcessNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Output({ type: 'any[]', label: 'Results' })
results: any[];
async execute() {
const processed = [];
for (let i = 0; i < this.items.length; i++) {
// Check if cancelled between items
this.checkCancellation();
const result = await this.processItem(this.items[i]);
processed.push(result);
}
this.results = processed;
}
private async processItem(item: any) {
// Simulate long operation
await new Promise(resolve => setTimeout(resolve, 1000));
return item * 2;
}
}