Orchestration
Build composable multi-step LLM workflows with chains and agents.
Overview
The orchestration module provides primitives for creating complex LLM workflows:
- Chains: Compose steps into pipelines with type safety
- Steps: Reusable building blocks for common operations
- Agents: ReAct-style loops that autonomously use tools
- Hooks: Observe and monitor workflow execution
Quick Start
import {
Chain,
llmStep,
mapStep,
userMessageStep,
createContext
} from 'meloqui/orchestration';
import { ChatClient } from 'meloqui';
import type { Message } from 'meloqui';
const client = new ChatClient({
provider: 'openai',
model: 'gpt-4o'
});
// Create a chain that adds a user message, calls the LLM, then extracts data
const chain = Chain.create(
'add-user-message',
userMessageStep((ctx) => `Summarize this text: ${ctx.data.text}`).execute
)
.pipe('summarize', llmStep(client).execute)
.pipe('extract', mapStep('extract-data', (data: { messages: Message[] }) => {
// After userMessageStep + llmStep, messages array contains the conversation
const assistantMessage = data.messages.find(m => m.role === 'assistant');
return {
summary: assistantMessage?.content || '',
wordCount: (assistantMessage?.content || '').split(' ').length
};
}).execute);
// Execute
const result = await chain.run(createContext({
text: 'Your input text here'
}));
if (result.ok) {
console.log('Summary:', result.value.data.summary);
console.log('Words:', result.value.data.wordCount);
}Core Concepts
Chains
A chain is a sequence of steps that execute in order. Each step transforms the context and passes it to the next step.
const chain = Chain.create('step1', step1)
.pipe('step2', step2)
.pipe('step3', step3);Chains are immutable – each method returns a new chain.
Context
Context flows through the chain immutably:
interface ChainContext<T> {
readonly data: T; // Your data
readonly messages: Message[]; // LLM conversation
readonly metadata: { // Execution metadata
readonly chainId: string;
readonly stepIndex: number;
readonly startTime: number;
};
}Create initial context:
const ctx = createContext({ myData: 'value' });Steps
Steps are functions that transform context:
type Step<TIn, TOut> = (
ctx: ChainContext<TIn>
) => Promise<ChainResult<ChainContext<TOut>>>;Built-in steps:
import {
userMessageStep,
systemMessageStep,
mapStep,
llmStep,
withRetry,
validateStep
} from 'meloqui/orchestration';Built-in Steps
userMessageStep
Add a user message to the context:
Chain.create('add-message', userMessageStep('Hello, AI!').execute)Dynamic content:
userMessageStep((ctx) => `Summarize: ${ctx.data.text}`).executesystemMessageStep
Add a system message:
systemMessageStep('You are a helpful assistant.').executemapStep
Transform data:
mapStep('extract-names', (data) => ({
...data,
names: extractNames(data.text)
})).executellmStep
Call the LLM:
llmStep(client).execute
llmStep(client, { temperature: 0.3 }).execute
llmStep(client, { systemPrompt: 'You are an expert.' }).executewithRetry
Add retry logic to any step:
withRetry(llmStep(client).execute, {
maxAttempts: 3,
initialBackoffMs: 1000,
backoffMultiplier: 2
})validateStep
Validate data against JSON Schema:
validateStep('validate-output', {
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number' }
},
required: ['name', 'age']
}).executeConditional Execution
when()
Execute step only if condition is true:
Chain.create('init', initStep)
.when(
(ctx) => ctx.data.value > 100,
'high-value-handler',
handleHighValue
)
.when(
(ctx) => ctx.data.value <= 100,
'low-value-handler',
handleLowValue
)branch()
If/else branching:
chain.branch(
(ctx) => ctx.data.type === 'question',
{ name: 'answer', step: answerStep },
{ name: 'acknowledge', step: acknowledgeStep }
)Observability
Add hooks to monitor execution:
const hooks = {
onStepStart: (name, ctx) => {
console.log(`Starting: ${name}`);
},
onStepEnd: (name, ctx, durationMs) => {
console.log(`Completed: ${name} (${durationMs}ms)`);
},
onStepError: (name, error) => {
console.error(`Failed: ${name}`, error);
},
onChainEnd: (result, totalMs) => {
console.log(`Chain complete (${totalMs}ms)`);
}
};
const result = await chain.withHooks(hooks).run(ctx);Agents
Agents are ReAct-style loops that autonomously use tools to complete tasks.
Creating an Agent
import { runAgent } from 'meloqui/orchestration';
import { ChatClient, ToolRegistry } from 'meloqui';
// Set up tools
const tools = new ToolRegistry();
tools.registerTool('search', searchFn, {
description: 'Search the web',
parameters: { /* schema */ }
});
// Create client with tools
const client = new ChatClient({
provider: 'openai',
model: 'gpt-4o',
tools
});
// Run agent
const result = await runAgent(
{
client,
maxIterations: 10,
systemPrompt: 'You are a research assistant.'
},
'What is the population of Tokyo?'
);
if (result.ok) {
console.log('Answer:', result.value.response);
console.log('Iterations:', result.value.iteration);
}Agent Configuration
interface AgentConfig {
client: ChatClient; // Client with tools
maxIterations?: number; // Max iterations (default: 10)
shouldStop?: (ctx, iteration) => boolean; // Custom stop
systemPrompt?: string; // System instructions
}Agent State
interface AgentState {
iteration: number; // Current iteration
done: boolean; // Completed?
response?: string; // Final response
toolCalls?: ToolCall[]; // Tool calls made
}How Agents Work
- Send messages to LLM with tools
- If LLM returns tool calls, execute them
- Add tool results to messages
- Repeat until:
- Max iterations reached
- Custom stop condition returns true
- LLM provides final response (no tool calls)
Error Handling
Chains use Result types for explicit error handling:
const result = await chain.run(ctx);
if (result.ok) {
// Success
console.log(result.value.data);
} else {
// Error
console.error(result.error.message);
console.error(result.error.stepName);
console.error(result.error.stepIndex);
}Errors include full context:
class ChainError extends ChatError {
readonly stepName: string;
readonly stepIndex: number;
readonly context: ChainContext;
}Advanced Patterns
Composing Chains
Chains can be converted to steps and composed:
const preprocessChain = Chain.create('clean', cleanStep)
.pipe('validate', validateStep('validate', schema).execute);
const analysisChain = Chain.create('analyze', analyzeStep)
.pipe('extract', extractStep);
// Compose into pipeline
const pipeline = Chain.create('preprocess', preprocessChain.asStep().execute)
.pipe('analyze', analysisChain.asStep().execute)
.pipe('format', formatStep);Custom Steps
Create your own steps:
const customStep: Step<InputType, OutputType> = async (ctx) => {
try {
const result = await myOperation(ctx.data);
return {
ok: true as const,
value: {
...ctx,
data: result
}
};
} catch (error) {
return {
ok: false as const,
error: new ChainError(
'Custom step failed',
'custom-step',
ctx.metadata.stepIndex,
ctx,
error as Error
)
};
}
};Workflows
The workflow module provides graph-based orchestration for complex multi-agent systems.
Overview
Workflows support two execution modes:
- Graph-driven: Static workflow graphs with explicit node connections
- LLM-driven: Dynamic routing where an LLM supervisor decides which specialist to call
Both modes share the same specialist definitions for consistency.
Defining Specialists
Specialists are focused agents with constrained capabilities:
import { defineSpecialist } from 'meloqui/orchestration/workflow';
const planner = defineSpecialist({
name: 'planner',
systemPrompt: `You create detailed implementation plans.
Current task: {{state.input}}`,
tools: ['read_file', 'search_code'],
maxIterations: 5
});
const coder = defineSpecialist({
name: 'coder',
systemPrompt: `You write clean, tested code.
Plan: {{state.planner}}`,
tools: ['read_file', 'write_file', 'run_tests'],
maxIterations: 10
});System prompts support template interpolation with {{state.nodeId}} syntax.
Graph-Driven Workflows
Define static workflow graphs:
import { defineWorkflow, WorkflowRunner } from 'meloqui/orchestration/workflow';
import { ChatClient } from 'meloqui';
// Define the workflow
const workflow = defineWorkflow({
specialists: { planner, coder },
nodes: [
{ id: 'input', type: 'input' },
{ id: 'plan', type: 'specialist', specialist: 'planner' },
{ id: 'review', type: 'checkpoint', config: {
message: 'Review the plan before coding'
}},
{ id: 'code', type: 'specialist', specialist: 'coder' },
{ id: 'output', type: 'output' }
],
edges: [
{ from: 'input', to: 'plan' },
{ from: 'plan', to: 'review' },
{ from: 'review', to: 'code' },
{ from: 'code', to: 'output' }
]
});
// Run the workflow
const client = new ChatClient({ provider: 'openai', model: 'gpt-4o' });
const runner = new WorkflowRunner({
client,
checkpointHandler: async (state, message) => {
console.log('Checkpoint:', message);
return { action: 'approve' };
}
});
const result = await runner.run(workflow, {
input: 'Build a todo app',
onProgress: (event) => console.log(event.type, event.nodeId)
});LLM-Driven Routing (Supervisor)
For dynamic workflows where an LLM decides routing:
import { createSupervisor, defineSpecialist } from 'meloqui/orchestration/workflow';
import { ChatClient } from 'meloqui';
const supervisor = createSupervisor({
client: new ChatClient({ provider: 'openai', model: 'gpt-4o' }),
specialists: {
planner: defineSpecialist({ name: 'planner', systemPrompt: 'You create plans' }),
coder: defineSpecialist({ name: 'coder', systemPrompt: 'You write code' }),
reviewer: defineSpecialist({ name: 'reviewer', systemPrompt: 'You review code' })
},
maxIterations: 10
});
const result = await supervisor.run('Build a todo app', {
onProgress: (event) => console.log(event.type, event.nodeId)
});
if (result.success) {
console.log('Output:', result.output);
}The supervisor uses a router pattern:
- Router decides which specialist to call next
- Selected specialist executes
- Control returns to router
- Router can return "done" to complete
Node Types
| Type | Purpose |
|---|---|
input | Entry point for workflow |
output | Exit point for workflow |
specialist | Calls a specialist agent |
condition | Branch based on state |
checkpoint | Human-in-the-loop approval |
Condition Nodes
Route based on state:
{
id: 'check-quality',
type: 'condition',
config: {
expression: 'state.coder?.includes("error") === false'
}
}With conditional edges:
{ from: 'check-quality', to: 'output', when: 'pass' },
{ from: 'check-quality', to: 'coder', when: 'fail' }Fluent Builder API
Build workflows programmatically:
import { Workflow } from 'meloqui/orchestration/workflow';
const workflow = Workflow.create()
.addSpecialist('planner', { systemPrompt: 'You plan' })
.addSpecialist('coder', { systemPrompt: 'You code' })
.addNode({ id: 'input', type: 'input' })
.addNode({ id: 'plan', type: 'specialist', specialist: 'planner' })
.addNode({ id: 'code', type: 'specialist', specialist: 'coder' })
.addNode({ id: 'output', type: 'output' })
.addEdge({ from: 'input', to: 'plan' })
.addEdge({ from: 'plan', to: 'code' })
.addEdge({ from: 'code', to: 'output' })
.build();Progress Events
Monitor workflow execution:
interface WorkflowProgressEvent {
type: 'node_start' | 'node_end' | 'checkpoint' | 'error';
nodeId: string;
state: WorkflowState;
timestamp: number;
duration?: number;
error?: string;
}Dev Specialist Presets
Common specialists for development workflows:
import { devSpecialists } from 'meloqui/orchestration/workflow';
const supervisor = createSupervisor({
client,
specialists: devSpecialists
});Available presets: planner, coder, reviewer, researcher.
Examples
See the examples directory for complete working examples:
- Multi-step summarization
- Classification and routing
- Agent with tools
- Error handling and recovery
- Workflow orchestration
API Reference
See the API documentation for complete type definitions and detailed API reference.
