Production-Grade AI Agent Orchestration: Scaling Multi-Agent Systems with Event-Driven Architecture
Production-Grade AI Agent Orchestration: Scaling Multi-Agent Systems with Event-Driven Architecture
By May 2026, enterprises running AI agent systems face a critical inflection point. Organizations that started with simple 3-agent workflows are now struggling to manage 100+ agents in production. The challenge isn't building agents anymore—it's orchestrating them at scale without drowning in complexity, costs, or cascading failures.
The numbers tell the story. Companies running large-scale agent deployments report that 67% of their engineering time is spent not on building new capabilities, but on managing agent coordination, debugging cross-agent failures, and optimizing resource usage. A typical mid-size enterprise now runs 150-400 active agents across customer service, content generation, data processing, and internal automation—creating an operational challenge that resembles managing a distributed microservices architecture, but with the added complexity of non-deterministic AI behavior.
This comprehensive guide explores production-grade patterns for orchestrating multi-agent systems at scale. From event-driven architectures and message queue patterns to fault tolerance strategies and cost optimization techniques, we'll cover the engineering practices that separate prototype agent systems from enterprise-grade deployments. Whether you're managing 10 agents or 1,000, these patterns will help you build systems that are scalable, resilient, observable, and cost-effective.
The Scale Problem: Why Agent Orchestration Breaks Down
Understanding the Complexity Explosion
The complexity of multi-agent systems doesn't grow linearly—it explodes. A system with n agents can have n(n-1)/2 potential communication paths, meaning a 10-agent system has 45 possible interaction patterns, while a 50-agent system has 1,225.
┌────────────────────────────────────────────────────────────────┐
│ Agent Communication Complexity │
├────────────────────────────────────────────────────────────────┤
│ │
│ Agents: 5 10 20 50 100 200 │
│ Paths: 10 45 190 1,225 4,950 19,900 │
│ │
│ └─ Linear growth would be: 5x, 10x, 20x, 50x, 100x, 200x │
│ └─ Actual is: 2x, 4.5x, 19x, 122.5x, 495x, 1,990x │
│ │
│ This is why point-to-point communication fails at scale. │
└────────────────────────────────────────────────────────────────┘
Common Failure Patterns at Scale:
1. The Thundering Herd Problem
// Anti-pattern: Synchronous agent spawning
// When 1,000 tasks arrive simultaneously, agents get overwhelmed
async function processTasks(tasks) {
// ❌ BAD: Creates agents for every task immediately
const results = await Promise.all(
tasks.map(task => spawnAgentAndProcess(task))
);
return results;
}
// Result: 1,000 simultaneous API calls to OpenAI
// Cost spike: $500 in 30 seconds
// Rate limit exceeded, most requests fail
2. Cascading Failures
// Anti-pattern: No circuit breakers or timeouts
// When one agent fails, dependent agents keep trying
async function researchAndWrite(topic) {
const research = await researchAgent.query(topic); // Fails after 30s
// ❌ BAD: No timeout, no fallback
const draft = await writerAgent.create(research); // Times out too
// ❌ BAD: Error propagates
const edited = await editorAgent.review(draft); // Also fails
return edited;
}
// Result: Three failed operations, wasted tokens, angry users
3. State Explosion
// Anti-pattern: Storing full state for every agent interaction
// Each conversation includes all previous context
const conversation = {
messages: [
{ role: 'system', content: 'You are a helpful assistant...' }, // 500 tokens
{ role: 'user', content: 'Previous question...' }, // 1,200 tokens
{ role: 'assistant', content: 'Previous answer...' }, // 2,800 tokens
{ role: 'user', content: 'Another question...' }, // 1,500 tokens
{ role: 'assistant', content: 'Another answer...' }, // 2,200 tokens
// ... continues for 50+ turns
]
};
// Result: 10th turn sends 45,000 tokens, costs $1.35
// 50th turn sends 225,000 tokens, costs $6.75
// User stops using the feature
4. Hidden Retry Storms
// Anti-pattern: Naive retry without backoff or coordination
// Multiple systems retry simultaneously
async function callWithRetry(fn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (e) {
// ❌ BAD: No exponential backoff
// ❌ BAD: All retries happen at same time across agents
await sleep(1000); // Fixed 1s delay
}
}
throw new Error('Failed after retries');
}
// Result: When API is degraded, retry load multiplies 3x
// 100 agents × 3 retries = 300 requests hitting struggling API
The Scale Thresholds
Based on production data from 200+ organizations, here are the practical scale thresholds where architecture changes become necessary:
| Scale | Agents | Key Challenges | Required Architecture |
|---|---|---|---|
| Prototype | 1-5 | Coordination | Direct API calls |
| Small | 5-20 | Task routing | Simple queue + routing logic |
| Medium | 20-50 | State management | Message bus + persistence layer |
| Large | 50-200 | Fault tolerance | Event-driven + circuit breakers |
| Enterprise | 200+ | Cost optimization | Full event sourcing + cost controls |
The jump from "Medium" to "Large" is particularly significant—this is where synchronous patterns fail completely and event-driven architecture becomes mandatory.
Event-Driven Architecture for Agent Systems
Core Principles
Event-driven architecture (EDA) decouples agents through asynchronous message passing. Instead of agents directly calling each other, they publish events to a message bus and subscribe to relevant event types.
┌─────────────────────────────────────────────────────────────────┐
│ Event-Driven Agent Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent A │ │ Agent B │ │ Agent C │ │ Agent D │ │
│ │ (Research)│ │(Writing) │ │(Editing) │ │(Review) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ │ PUBLISH │ PUBLISH │ PUBLISH │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Message Bus (Redis/RabbitMQ) │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │research.done│ │ draft.ready │ │ edit.complete│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │ │ │ │
│ │ SUBSCRIBE │ SUBSCRIBE │ SUBSCRIBE │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Listens│ │ Listens │ │ Listens │ │ Listens │ │
│ │ for tasks│ │for draft │ │for review │ │for final │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Key Benefit: Agents don't know about each other directly │
│ Agent B doesn't know WHO created the draft, just THAT one exists│
└─────────────────────────────────────────────────────────────────┘
Benefits of EDA for Agent Systems:
- Decoupling: Agents can be developed, deployed, and scaled independently
- Resilience: If one agent fails, others continue processing
- Observability: Every event is logged, enabling comprehensive monitoring
- Scalability: Add more agent instances by subscribing to event topics
- Flexibility: Reorder workflows by changing event routing, not agent code
Event Types and Schema Design
A well-designed event schema is critical for maintainability. Here's a production-grade schema for agent systems:
// Core event types for agent orchestration
interface AgentEvent {
// Metadata
eventId: string; // UUID v4
eventType: string; // Fully qualified: agent.task.assigned
timestamp: ISO8601; // UTC
version: string; // Schema version: "2.1.0"
// Context
correlationId: string; // Traces request across agents
causationId: string; // Points to triggering event
sessionId: string; // Groups related events
// Content
payload: unknown; // Event-specific data
// Routing
source: AgentRef; // Who emitted the event
target?: AgentRef; // Intended recipient (optional)
priority: 'low' | 'normal' | 'high' | 'critical';
// Lifecycle
ttl?: number; // Time-to-live in seconds
retryCount: number; // How many times processed
deadline?: ISO8601; // When event becomes stale
}
interface AgentRef {
agentId: string;
agentType: string; // 'research', 'writing', 'review'
version: string;
instanceId: string; // For multi-instance agents
}
// Specific event types
interface TaskAssignedEvent extends AgentEvent {
eventType: 'agent.task.assigned';
payload: {
taskId: string;
taskType: string;
requirements: TaskRequirements;
context: SharedContext;
maxDuration: number; // Seconds
costBudget?: number; // Max tokens/cost
};
}
interface TaskCompletedEvent extends AgentEvent {
eventType: 'agent.task.completed';
payload: {
taskId: string;
result: TaskResult;
metrics: {
duration: number; // Seconds
tokensUsed: number;
cost: number;
retries: number;
};
};
}
interface TaskFailedEvent extends AgentEvent {
eventType: 'agent.task.failed';
payload: {
taskId: string;
error: {
code: string;
message: string;
recoverable: boolean;
};
attempts: number;
nextRetry?: ISO8601;
};
}
Event Naming Conventions:
# Consistent naming enables routing and filtering
# Format: domain.entity.action.status
# Examples:
events:
# Task lifecycle
- agent.task.assigned
- agent.task.started
- agent.task.progress # Intermediate updates
- agent.task.completed
- agent.task.failed
- agent.task.cancelled
- agent.task.expired # TTL exceeded
# Agent lifecycle
- agent.instance.started
- agent.instance.heartbeat
- agent.instance.stopped
- agent.instance.crashed
- agent.instance.scale.up
- agent.instance.scale.down
# System events
- system.circuit.opened
- system.circuit.closed
- system.rate.limit.exceeded
- system.cost.budget.warning
- system.cost.budget.exceeded
Implementing with n8n and Redis
Redis Streams provides an excellent message bus for agent systems—fast, persistent, and with consumer group support for scaling.
n8n Workflow: Event Publisher
// n8n Code Node: Publish Events to Redis Streams
const Redis = require('ioredis');
const { v4: uuidv4 } = require('uuid');
const redis = new Redis({
host: $env.REDIS_HOST,
port: $env.REDIS_PORT,
password: $env.REDIS_PASSWORD,
maxRetriesPerRequest: 3
});
async function publishEvent(eventData) {
const event = {
eventId: uuidv4(),
eventType: eventData.type,
timestamp: new Date().toISOString(),
version: '2.1.0',
correlationId: eventData.correlationId || uuidv4(),
causationId: eventData.causationId,
sessionId: eventData.sessionId,
payload: JSON.stringify(eventData.payload),
source: JSON.stringify(eventData.source),
target: eventData.target ? JSON.stringify(eventData.target) : null,
priority: eventData.priority || 'normal',
retryCount: eventData.retryCount || 0,
deadline: eventData.deadline
};
// Publish to stream with maxlen to prevent unbounded growth
const streamKey = `events:${eventData.type.split('.')[0]}`;
await redis.xadd(
streamKey,
'MAXLEN', '~', 10000, // Approximate max length
'*', // Auto-generate ID
...Object.entries(event).flat()
);
// Also publish to type-specific stream for targeted consumers
await redis.xadd(
`events:type:${eventData.type}`,
'MAXLEN', '~', 5000,
'*',
...Object.entries(event).flat()
);
return event;
}
// Example: Publish task assigned event
const event = await publishEvent({
type: 'agent.task.assigned',
correlationId: $json.correlationId,
causationId: $json.triggerEventId,
sessionId: $json.sessionId,
payload: {
taskId: `task-${Date.now()}`,
taskType: 'content-research',
requirements: {
topic: $json.topic,
depth: 'comprehensive',
sources: ['industry-reports', 'competitor-analysis'],
outputFormat: 'structured-json'
},
context: {
industry: $json.industry,
targetAudience: $json.audience,
brandVoice: $json.brandVoice
},
maxDuration: 300, // 5 minutes
costBudget: 0.50 // $0.50 max
},
source: {
agentId: 'orchestrator-1',
agentType: 'workflow-orchestrator',
version: '2.0.0',
instanceId: 'orc-1a'
},
priority: 'high'
});
return {
json: {
published: true,
eventId: event.eventId,
correlationId: event.correlationId
}
};
n8n Workflow: Event Consumer with Consumer Groups
// n8n Code Node: Consume Events with Redis Consumer Groups
const Redis = require('ioredis');
const redis = new Redis({
host: $env.REDIS_HOST,
port: $env.REDIS_PORT,
password: $env.REDIS_PASSWORD
});
const CONSUMER_GROUP = 'writing-agents';
const CONSUMER_NAME = `writer-${$env.INSTANCE_ID || '1'}`;
const STREAM_KEY = 'events:type:agent.task.assigned';
async function consumeEvents() {
try {
// Ensure consumer group exists
try {
await redis.xgroup('CREATE', STREAM_KEY, CONSUMER_GROUP, '$', 'MKSTREAM');
} catch (e) {
// Group already exists, ignore error
if (!e.message.includes('already exists')) throw e;
}
// Read events with timeout
const messages = await redis.xreadgroup(
'GROUP', CONSUMER_GROUP, CONSUMER_NAME,
'BLOCK', 5000, // Wait up to 5 seconds
'COUNT', 10, // Max 10 messages per batch
'STREAMS', STREAM_KEY, '>'
);
if (!messages) {
return { json: { processed: 0, events: [] } };
}
const processed = [];
for (const [, entries] of messages) {
for (const [id, fields] of entries) {
const event = Object.fromEntries(
fields.map((v, i) => i % 2 === 0 ? [v, fields[i + 1]] : null).filter(Boolean)
);
try {
// Parse payload
const payload = JSON.parse(event.payload);
// Check priority
if (event.priority === 'critical') {
// Process immediately, don't batch
await processCriticalTask(payload);
} else {
// Queue for batch processing
await queueTask(payload);
}
// Acknowledge success
await redis.xack(STREAM_KEY, CONSUMER_GROUP, id);
processed.push({
eventId: event.eventId,
status: 'success',
taskId: payload.taskId
});
} catch (error) {
// Log failure but don't ack - will be retried
processed.push({
eventId: event.eventId,
status: 'failed',
error: error.message,
willRetry: event.retryCount < 3
});
// If max retries exceeded, send to dead letter queue
if (event.retryCount >= 3) {
await redis.xadd(
'events:dead-letter',
'*',
'originalEvent', JSON.stringify(event),
'error', error.message,
'failedAt', new Date().toISOString()
);
await redis.xack(STREAM_KEY, CONSUMER_GROUP, id);
}
}
}
}
return {
json: {
processed: processed.length,
events: processed
}
};
} finally {
await redis.quit();
}
}
async function processCriticalTask(payload) {
// Critical path processing
// Skip queue, execute immediately
const result = await executeAgentTask(payload);
return result;
}
async function queueTask(payload) {
// Add to internal queue for batch processing
await redis.lpush('queue:pending-tasks', JSON.stringify(payload));
}
return await consumeEvents();
Message Queue Patterns for Agent Coordination
Pattern 1: Priority Queues
Different tasks require different SLAs. Priority queues ensure critical tasks get processed first.
// n8n Workflow: Multi-Priority Queue System
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
const PRIORITIES = ['critical', 'high', 'normal', 'low'];
const QUEUE_TTLS = {
critical: 60, // 1 minute
high: 300, // 5 minutes
normal: 3600, // 1 hour
low: 86400 // 24 hours
};
class PriorityQueueManager {
constructor(redis) {
this.redis = redis;
this.queues = {
critical: 'agent:queue:critical',
high: 'agent:queue:high',
normal: 'agent:queue:normal',
low: 'agent:queue:low'
};
}
async enqueue(task, priority = 'normal') {
const queue = this.queues[priority] || this.queues.normal;
const ttl = QUEUE_TTLS[priority];
const enrichedTask = {
...task,
priority,
enqueuedAt: Date.now(),
expiresAt: Date.now() + (ttl * 1000),
attempts: 0
};
// Use LPUSH for stack-like behavior (LIFO for same priority)
// Or RPUSH for FIFO - depends on use case
await this.redis.lpush(queue, JSON.stringify(enrichedTask));
// Track queue depth metrics
await this.redis.hincrby('metrics:queue-depth', priority, 1);
return {
taskId: enrichedTask.id,
priority,
queueDepth: await this.redis.llen(queue)
};
}
async dequeue() {
// Try each priority in order
for (const priority of PRIORITIES) {
const queue = this.queues[priority];
const task = await this.redis.rpop(queue);
if (task) {
await this.redis.hincrby('metrics:queue-depth', priority, -1);
return {
task: JSON.parse(task),
priority,
waitTime: Date.now() - JSON.parse(task).enqueuedAt
};
}
}
return null; // No tasks available
}
async getQueueStats() {
const stats = {};
for (const [priority, queue] of Object.entries(this.queues)) {
const length = await this.redis.llen(queue);
const oldestTask = await this.redis.lindex(queue, -1);
const oldestAge = oldestTask
? Date.now() - JSON.parse(oldestTask).enqueuedAt
: 0;
stats[priority] = {
depth: length,
oldestTaskAgeMs: oldestAge,
maxWaitTime: QUEUE_TTLS[priority]
};
}
return stats;
}
}
// Usage in n8n
const queueManager = new PriorityQueueManager(redis);
// Producer: Add task with priority
const enqueueResult = await queueManager.enqueue({
id: `task-${Date.now()}`,
type: 'content-generation',
payload: {
topic: $json.topic,
requirements: $json.requirements
}
}, $json.priority || 'normal');
// Consumer: Get next task
const nextTask = await queueManager.dequeue();
// Monitor queue health
const stats = await queueManager.getQueueStats();
return {
json: {
enqueued: enqueueResult,
nextTask,
queueStats: stats
}
};
Pattern 2: Delayed Execution
Some tasks need to be processed at a specific time (e.g., scheduled social media posts, follow-up emails).
// n8n Workflow: Delayed Task Scheduler
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class DelayedScheduler {
constructor(redis) {
this.redis = redis;
this.delayedSet = 'agent:delayed-tasks';
this.readyQueue = 'agent:queue:ready';
}
async schedule(task, executeAt) {
const scheduledTask = {
...task,
scheduledAt: Date.now(),
executeAt: executeAt.getTime(),
id: `delayed-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
};
// Add to sorted set with execute time as score
await this.redis.zadd(
this.delayedSet,
scheduledTask.executeAt,
JSON.stringify(scheduledTask)
);
return {
scheduled: true,
taskId: scheduledTask.id,
executeAt: scheduledTask.executeAt,
delayMs: scheduledTask.executeAt - Date.now()
};
}
async pollAndPromote() {
const now = Date.now();
// Get all tasks ready to execute (score <= now)
const readyTasks = await this.redis.zrangebyscore(
this.delayedSet,
'-inf',
now,
'WITHSCORES'
);
const promoted = [];
for (let i = 0; i < readyTasks.length; i += 2) {
const task = readyTasks[i];
const score = readyTasks[i + 1];
// Remove from delayed set
await this.redis.zrem(this.delayedSet, task);
// Add to ready queue
await this.redis.lpush(this.readyQueue, task);
promoted.push(JSON.parse(task));
}
// Get next task time for scheduling
const nextTask = await this.redis.zrange(this.delayedSet, 0, 0, 'WITHSCORES');
const nextExecution = nextTask.length > 0
? parseInt(nextTask[1])
: null;
return {
promoted: promoted.length,
tasks: promoted.map(t => t.id),
nextExecution,
delayedRemaining: await this.redis.zcard(this.delayedSet)
};
}
async getSchedule(windowHours = 24) {
const now = Date.now();
const windowEnd = now + (windowHours * 60 * 60 * 1000);
const tasks = await this.redis.zrangebyscore(
this.delayedSet,
now,
windowEnd,
'WITHSCORES'
);
return tasks.reduce((acc, val, idx, arr) => {
if (idx % 2 === 0) {
const task = JSON.parse(val);
const score = parseInt(arr[idx + 1]);
acc.push({
...task,
executeAt: new Date(score).toISOString(),
timeUntil: score - now
});
}
return acc;
}, []);
}
}
// Usage
const scheduler = new DelayedScheduler(redis);
// Schedule a task for later
const scheduleResult = await scheduler.schedule({
type: 'social-post',
payload: {
platform: 'linkedin',
content: $json.postContent,
hashtags: $json.hashtags
}
}, new Date($json.scheduledTime));
// Poll for ready tasks (run this every minute via cron)
const promotionResult = await scheduler.pollAndPromote();
// View upcoming schedule
const upcoming = await scheduler.getSchedule(48); // Next 48 hours
return {
json: {
scheduled: scheduleResult,
promoted: promotionResult,
upcoming: upcoming.slice(0, 10) // Show first 10
}
};
Pattern 3: Saga Pattern for Distributed Transactions
When multiple agents need to complete a workflow together, use sagas to ensure consistency.
// n8n Workflow: Saga Pattern Implementation
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class SagaOrchestrator {
constructor(redis) {
this.redis = redis;
}
async startSaga(sagaId, steps) {
const saga = {
id: sagaId,
status: 'running',
startedAt: Date.now(),
steps: steps.map((step, idx) => ({
id: `step-${idx}`,
name: step.name,
action: step.action,
compensation: step.compensation,
status: 'pending',
input: null,
output: null
})),
currentStep: 0
};
await this.redis.setex(
`saga:${sagaId}`,
3600, // 1 hour TTL
JSON.stringify(saga)
);
return this.executeNextStep(sagaId);
}
async executeNextStep(sagaId) {
const saga = JSON.parse(await this.redis.get(`saga:${sagaId}`));
if (saga.currentStep >= saga.steps.length) {
// Saga complete
saga.status = 'completed';
saga.completedAt = Date.now();
await this.redis.setex(`saga:${sagaId}`, 86400, JSON.stringify(saga));
return { status: 'completed', saga };
}
const step = saga.steps[saga.currentStep];
step.status = 'executing';
step.startedAt = Date.now();
await this.redis.setex(`saga:${sagaId}`, 3600, JSON.stringify(saga));
try {
// Execute the step action
const result = await this.executeAction(step.action, saga);
step.status = 'completed';
step.output = result;
step.completedAt = Date.now();
saga.currentStep++;
await this.redis.setex(`saga:${sagaId}`, 3600, JSON.stringify(saga));
// Continue to next step
return this.executeNextStep(sagaId);
} catch (error) {
step.status = 'failed';
step.error = error.message;
// Trigger compensation
await this.compensate(saga, saga.currentStep);
saga.status = 'compensated';
saga.failedAt = Date.now();
await this.redis.setex(`saga:${sagaId}`, 86400, JSON.stringify(saga));
return { status: 'compensated', saga, error: error.message };
}
}
async executeAction(action, saga) {
// Dispatch to appropriate agent based on action type
switch (action.type) {
case 'agent-task':
return await this.callAgent(action.agentType, action.payload);
case 'webhook':
return await fetch(action.url, {
method: 'POST',
body: JSON.stringify({ sagaId: saga.id, ...action.payload })
});
case 'n8n-subflow':
return await this.triggerSubflow(action.workflowId, action.payload);
default:
throw new Error(`Unknown action type: ${action.type}`);
}
}
async compensate(saga, failedStepIndex) {
// Run compensation in reverse order
for (let i = failedStepIndex; i >= 0; i--) {
const step = saga.steps[i];
if (step.compensation && step.output) {
try {
await this.executeAction(step.compensation, saga);
step.compensationStatus = 'completed';
} catch (e) {
step.compensationStatus = 'failed';
// Log for manual intervention
await this.redis.lpush('saga:compensation-failures', JSON.stringify({
sagaId: saga.id,
stepId: step.id,
error: e.message,
timestamp: new Date().toISOString()
}));
}
}
}
}
async getSagaStatus(sagaId) {
const saga = await this.redis.get(`saga:${sagaId}`);
return saga ? JSON.parse(saga) : null;
}
}
// Usage: Content creation saga
const orchestrator = new SagaOrchestrator(redis);
const sagaSteps = [
{
name: 'research',
action: {
type: 'agent-task',
agentType: 'research-agent',
payload: { topic: $json.topic, depth: 'comprehensive' }
},
compensation: {
type: 'webhook',
url: 'https://api.company.com/cleanup-research',
payload: { action: 'cleanup' }
}
},
{
name: 'draft',
action: {
type: 'agent-task',
agentType: 'writer-agent',
payload: { /* uses previous step output */ }
},
compensation: {
type: 'webhook',
url: 'https://api.company.com/cleanup-draft',
payload: { action: 'cleanup' }
}
},
{
name: 'publish',
action: {
type: 'webhook',
url: 'https://cms.company.com/api/publish',
payload: { /* uses draft output */ }
},
compensation: {
type: 'webhook',
url: 'https://cms.company.com/api/unpublish',
payload: { action: 'unpublish' }
}
}
];
const sagaId = `saga-${Date.now()}`;
const result = await orchestrator.startSaga(sagaId, sagaSteps);
return {
json: result
};
Fault Tolerance and Resilience Patterns
Circuit Breaker Implementation
Circuit breakers prevent cascading failures by temporarily rejecting requests when a service is struggling.
// n8n Workflow: Circuit Breaker for Agent Calls
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class CircuitBreaker {
constructor(redis, config) {
this.redis = redis;
this.config = {
failureThreshold: config.failureThreshold || 5,
successThreshold: config.successThreshold || 3,
timeout: config.timeout || 60000, // 1 minute
...config
};
this.stateKey = `circuit:${config.name}`;
}
async getState() {
const state = await this.redis.get(this.stateKey);
if (!state) {
return {
status: 'CLOSED',
failures: 0,
successes: 0,
lastFailureTime: null,
openedAt: null
};
}
return JSON.parse(state);
}
async recordSuccess() {
const state = await this.getState();
if (state.status === 'HALF_OPEN') {
state.successes++;
if (state.successes >= this.config.successThreshold) {
state.status = 'CLOSED';
state.failures = 0;
state.successes = 0;
await this.redis.publish('circuit:events', JSON.stringify({
circuit: this.config.name,
event: 'CLOSED',
timestamp: new Date().toISOString()
}));
}
}
await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
return state;
}
async recordFailure() {
const state = await this.getState();
state.failures++;
state.lastFailureTime = Date.now();
if (state.status === 'CLOSED' && state.failures >= this.config.failureThreshold) {
state.status = 'OPEN';
state.openedAt = Date.now();
await this.redis.publish('circuit:events', JSON.stringify({
circuit: this.config.name,
event: 'OPENED',
timestamp: new Date().toISOString()
}));
}
await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
return state;
}
async canExecute() {
const state = await this.getState();
if (state.status === 'CLOSED') {
return { allowed: true, state };
}
if (state.status === 'OPEN') {
const timeOpen = Date.now() - state.openedAt;
if (timeOpen > this.config.timeout) {
// Transition to HALF_OPEN
state.status = 'HALF_OPEN';
state.successes = 0;
await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
await this.redis.publish('circuit:events', JSON.stringify({
circuit: this.config.name,
event: 'HALF_OPEN',
timestamp: new Date().toISOString()
}));
return { allowed: true, state, trial: true };
}
return {
allowed: false,
state,
retryAfter: Math.ceil((this.config.timeout - timeOpen) / 1000)
};
}
if (state.status === 'HALF_OPEN') {
return { allowed: true, state, trial: true };
}
}
async execute(operation) {
const permission = await this.canExecute();
if (!permission.allowed) {
throw new Error(`Circuit breaker OPEN for ${this.config.name}. Retry after ${permission.retryAfter}s`);
}
try {
const result = await operation();
await this.recordSuccess();
return { result, state: permission.state, trial: permission.trial };
} catch (error) {
await this.recordFailure();
throw error;
}
}
}
// Usage for LLM API calls
const openAICircuit = new CircuitBreaker(redis, {
name: 'openai-api',
failureThreshold: 5,
successThreshold: 3,
timeout: 60000
});
try {
const result = await openAICircuit.execute(async () => {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${$env.OPENAI_API_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'gpt-4o',
messages: $json.messages
})
});
if (!response.ok) {
throw new Error(`OpenAI API error: ${response.status}`);
}
return await response.json();
});
return {
json: {
success: true,
data: result.result,
circuitState: result.state.status,
trial: result.trial || false
}
};
} catch (error) {
return {
json: {
success: false,
error: error.message,
fallback: 'Using cached response or alternative model'
}
};
}
Exponential Backoff with Jitter
Prevents retry storms by adding randomization to retry delays.
// n8n Workflow: Smart Retry with Exponential Backoff and Jitter
class RetryWithBackoff {
constructor(config = {}) {
this.maxRetries = config.maxRetries || 5;
this.baseDelay = config.baseDelay || 1000; // 1 second
this.maxDelay = config.maxDelay || 60000; // 60 seconds
this.jitterFactor = config.jitterFactor || 0.3; // 30% randomization
}
calculateDelay(attempt) {
// Exponential: base * 2^attempt
const exponential = this.baseDelay * Math.pow(2, attempt);
const capped = Math.min(exponential, this.maxDelay);
// Add jitter: ±jitterFactor
const jitter = capped * this.jitterFactor * (Math.random() * 2 - 1);
const finalDelay = capped + jitter;
return Math.max(0, Math.floor(finalDelay));
}
async execute(operation, context) {
const attempts = [];
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const startTime = Date.now();
try {
const result = await operation();
attempts.push({
attempt: attempt + 1,
duration: Date.now() - startTime,
success: true
});
return {
success: true,
result,
attempts,
totalDuration: Date.now() - attempts[0].startTime
};
} catch (error) {
const duration = Date.now() - startTime;
attempts.push({
attempt: attempt + 1,
duration,
success: false,
error: error.message
});
if (attempt === this.maxRetries - 1) {
// Final attempt failed
return {
success: false,
error: error.message,
attempts,
totalDuration: Date.now() - attempts[0].startTime
};
}
// Calculate and apply backoff
const delay = this.calculateDelay(attempt);
attempts[attempt].nextDelay = delay;
await this.sleep(delay);
}
}
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage
const retryConfig = {
maxRetries: 5,
baseDelay: 2000, // Start with 2 seconds
maxDelay: 32000, // Cap at 32 seconds
jitterFactor: 0.2 // 20% jitter
};
const retrier = new RetryWithBackoff(retryConfig);
const result = await retrier.execute(async () => {
const response = await fetch('https://api.anthropic.com/v1/messages', {
method: 'POST',
headers: {
'x-api-key': $env.ANTHROPIC_API_KEY,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'claude-3-7-sonnet-20250219',
max_tokens: 4096,
messages: $json.messages
})
});
if (response.status === 429) {
const retryAfter = response.headers.get('retry-after');
throw new Error(`Rate limited. Retry after: ${retryAfter}`);
}
if (!response.ok) {
throw new Error(`API error: ${response.status}`);
}
return await response.json();
});
return {
json: result
};
Dead Letter Queue and Error Handling
Not all failures are recoverable. A dead letter queue captures failed events for analysis and manual intervention.
// n8n Workflow: Dead Letter Queue Pattern
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class DeadLetterQueue {
constructor(redis) {
this.redis = redis;
this.dlqKey = 'queue:dead-letter';
this.maxSize = 10000; // Keep last 10k failed events
}
async add(event, error, context) {
const deadEvent = {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
code: error.code
},
context: {
...context,
failedAt: new Date().toISOString(),
retryCount: event.retryCount || 0
},
id: `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
};
// Add to DLQ with timestamp score for sorting
await this.redis.zadd(
this.dlqKey,
Date.now(),
JSON.stringify(deadEvent)
);
// Trim to max size
const currentSize = await this.redis.zcard(this.dlqKey);
if (currentSize > this.maxSize) {
await this.redis.zremrangebyrank(this.dlqKey, 0, currentSize - this.maxSize - 1);
}
// Alert if DLQ is growing fast
if (currentSize > this.maxSize * 0.8) {
await this.redis.publish('alerts:dlq', JSON.stringify({
severity: 'warning',
message: `DLQ at ${Math.round((currentSize / this.maxSize) * 100)}% capacity`,
timestamp: new Date().toISOString()
}));
}
return deadEvent.id;
}
async list(limit = 50, offset = 0) {
const events = await this.redis.zrevrange(
this.dlqKey,
offset,
offset + limit - 1
);
return events.map(e => {
const parsed = JSON.parse(e);
return {
...parsed,
age: Date.now() - new Date(parsed.context.failedAt).getTime()
};
});
}
async retry(eventId) {
// Find event
const events = await this.redis.zrange(this.dlqKey, 0, -1);
const event = events.find(e => JSON.parse(e).id === eventId);
if (!event) {
throw new Error('Event not found in DLQ');
}
const parsed = JSON.parse(event);
// Re-queue original event
await this.redis.lpush('queue:pending-tasks', JSON.stringify({
...parsed.originalEvent,
retryCount: (parsed.originalEvent.retryCount || 0) + 1,
retryFromDLQ: true,
originalFailure: parsed.error
}));
// Remove from DLQ
await this.redis.zrem(this.dlqKey, event);
return { success: true, message: 'Event re-queued' };
}
async getStats() {
const total = await this.redis.zcard(this.dlqKey);
// Get time distribution
const now = Date.now();
const oneHourAgo = now - 3600000;
const oneDayAgo = now - 86400000;
const lastHour = await this.redis.zcount(this.dlqKey, oneHourAgo, now);
const lastDay = await this.redis.zcount(this.dlqKey, oneDayAgo, now);
// Group by error type
const allEvents = await this.redis.zrange(this.dlqKey, 0, -1);
const errorTypes = allEvents.reduce((acc, e) => {
const error = JSON.parse(e).error.code || 'unknown';
acc[error] = (acc[error] || 0) + 1;
return acc;
}, {});
return {
total,
lastHour,
lastDay,
errorTypes,
oldestEvent: total > 0 ? await this.redis.zrange(this.dlqKey, 0, 0) : null
};
}
}
// Usage in n8n
const dlq = new DeadLetterQueue(redis);
// When processing fails
try {
await processEvent($json.event);
} catch (error) {
const dlqId = await dlq.add($json.event, error, {
workflowId: $env.WORKFLOW_ID,
nodeId: $env.NODE_ID
});
return {
json: {
processed: false,
movedToDLQ: true,
dlqId,
error: error.message
}
};
}
// Monitor DLQ stats
const stats = await dlq.getStats();
// List recent failures for review
const recentFailures = await dlq.list(20);
return {
json: {
dlqStats: stats,
recentFailures
}
};
Cost Optimization Strategies
Token Usage Optimization
Token costs are the primary expense in agent systems. Smart strategies can reduce costs by 50-80%.
// n8n Workflow: Token Optimization Strategies
class TokenOptimizer {
constructor() {
this.cache = new Map(); // In production, use Redis
this.modelCosts = {
'gpt-4o': { input: 2.50, output: 10.00, per: 1000000 }, // per 1M tokens
'gpt-4o-mini': { input: 0.15, output: 0.60, per: 1000000 },
'claude-3-7-sonnet': { input: 3.00, output: 15.00, per: 1000000 },
'claude-3-5-haiku': { input: 0.80, output: 4.00, per: 1000000 }
};
}
// Strategy 1: Intelligent Model Selection
selectModel(taskComplexity) {
const selection = {
simple: 'gpt-4o-mini', // Classification, extraction
medium: 'claude-3-5-haiku', // Analysis, summarization
complex: 'gpt-4o', // Reasoning, generation
critical: 'claude-3-7-sonnet' // High-stakes decisions
};
return selection[taskComplexity] || selection.medium;
}
// Strategy 2: Context Compression
compressContext(messages, maxTokens = 4000) {
let totalTokens = this.estimateTokens(messages);
if (totalTokens <= maxTokens) {
return messages;
}
// Priority order: system > last user > last assistant > older messages
const systemMsgs = messages.filter(m => m.role === 'system');
const userMsgs = messages.filter(m => m.role === 'user');
const assistantMsgs = messages.filter(m => m.role === 'assistant');
const compressed = [...systemMsgs];
// Always keep the most recent exchange
if (userMsgs.length > 0) {
compressed.push(userMsgs[userMsgs.length - 1]);
}
if (assistantMsgs.length > 0) {
compressed.push(assistantMsgs[assistantMsgs.length - 1]);
}
// Add older messages if space permits
const remaining = maxTokens - this.estimateTokens(compressed);
if (remaining > 0) {
const olderMessages = [...userMsgs.slice(0, -1), ...assistantMsgs.slice(0, -1)]
.sort((a, b) => messages.indexOf(a) - messages.indexOf(b));
for (const msg of olderMessages) {
if (this.estimateTokens([...compressed, msg]) <= maxTokens) {
compressed.push(msg);
} else {
// Summarize old messages instead of dropping
const summary = this.createSummary(compressed);
return [systemMsgs[0], { role: 'user', content: `Previous context: ${summary}` }, compressed[compressed.length - 1]];
}
}
}
return compressed;
}
// Strategy 3: Response Caching
async getCachedResponse(promptHash) {
// In production: Redis or similar
const cached = this.cache.get(promptHash);
if (cached && Date.now() - cached.timestamp < 3600000) { // 1 hour TTL
return cached.response;
}
return null;
}
async cacheResponse(promptHash, response) {
this.cache.set(promptHash, {
response,
timestamp: Date.now()
});
}
// Strategy 4: Batch Processing
async batchProcess(items, batchSize = 10) {
const results = [];
const batches = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
for (const batch of batches) {
// Process batch in single call
const batchResult = await this.processBatch(batch);
results.push(...batchResult);
}
return results;
}
// Strategy 5: Streaming for Long Outputs
async streamProcess(messages, onChunk) {
let fullResponse = '';
let tokenCount = 0;
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${$env.OPENAI_API_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'gpt-4o',
messages,
stream: true,
max_tokens: 4000
})
});
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = new TextDecoder().decode(value);
const lines = chunk.split('\n').filter(line => line.trim());
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content || '';
fullResponse += content;
tokenCount++;
// Call progress handler
if (onChunk) {
await onChunk(content, tokenCount);
}
// Early termination if we have enough
if (tokenCount >= 1000 && this.hasCompleteAnswer(fullResponse)) {
reader.releaseLock();
return { response: fullResponse, tokens: tokenCount, truncated: true };
}
} catch (e) {
// Ignore parsing errors in stream
}
}
}
}
return { response: fullResponse, tokens: tokenCount };
}
// Helper methods
estimateTokens(messages) {
// Rough estimation: ~4 characters per token
const text = messages.map(m => m.content).join('');
return Math.ceil(text.length / 4);
}
createSummary(messages) {
// In production: Use a smaller model for summarization
return `Conversation with ${messages.length} messages about ${messages[0].content.substring(0, 50)}...`;
}
hasCompleteAnswer(text) {
// Detect if response seems complete
return text.trim().endsWith('.') || text.trim().endsWith('}');
}
calculateCost(model, inputTokens, outputTokens) {
const costs = this.modelCosts[model];
if (!costs) return 0;
const inputCost = (inputTokens / costs.per) * costs.input;
const outputCost = (outputTokens / costs.per) * costs.output;
return inputCost + outputCost;
}
}
// Usage
const optimizer = new TokenOptimizer();
// Example workflow
const task = $json.task;
const complexity = optimizer.assessComplexity(task); // Your logic here
// Select appropriate model
const model = optimizer.selectModel(complexity);
// Check cache
const cacheKey = optimizer.hashPrompt(task.prompt);
const cached = await optimizer.getCachedResponse(cacheKey);
if (cached) {
return {
json: {
result: cached,
cost: 0,
source: 'cache'
}
};
}
// Compress context
const compressedMessages = optimizer.compressContext(task.messages, 3000);
// Call API with optimized parameters
const response = await optimizer.streamProcess(compressedMessages, (chunk, count) => {
// Update progress if needed
console.log(`Received ${count} tokens`);
});
// Cache result
await optimizer.cacheResponse(cacheKey, response.response);
// Calculate cost
const cost = optimizer.calculateCost(model, optimizer.estimateTokens(compressedMessages), response.tokens);
return {
json: {
result: response.response,
cost,
tokens: response.tokens,
truncated: response.truncated || false,
model
}
};
Budget-Based Rate Limiting
Prevent runaway costs with budget-based circuit breakers.
// n8n Workflow: Budget-Based Rate Limiting
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class BudgetController {
constructor(redis) {
this.redis = redis;
this.budgets = new Map();
}
async setBudget(scope, config) {
const budget = {
scope, // 'daily', 'hourly', 'per-workflow'
maxAmount: config.maxAmount,
currentSpend: 0,
currency: config.currency || 'USD',
warningThreshold: config.warningThreshold || 0.8,
action: config.action || 'block' // 'block', 'throttle', 'alert'
};
const key = `budget:${scope}`;
await this.redis.setex(key, config.ttl || 86400, JSON.stringify(budget));
return budget;
}
async checkBudget(scope, estimatedCost = 0) {
const key = `budget:${scope}`;
const data = await this.redis.get(key);
if (!data) {
return { allowed: true, reason: 'no-budget-set' };
}
const budget = JSON.parse(data);
const projectedSpend = budget.currentSpend + estimatedCost;
const utilization = projectedSpend / budget.maxAmount;
if (utilization >= 1.0) {
return {
allowed: false,
reason: 'budget-exceeded',
currentSpend: budget.currentSpend,
budget: budget.maxAmount,
action: budget.action
};
}
if (utilization >= budget.warningThreshold) {
// Send warning
await this.redis.publish('alerts:budget', JSON.stringify({
scope,
currentSpend: budget.currentSpend,
budget: budget.maxAmount,
utilization,
timestamp: new Date().toISOString()
}));
}
return {
allowed: true,
utilization,
remaining: budget.maxAmount - budget.currentSpend
};
}
async recordSpend(scope, amount) {
const key = `budget:${scope}`;
const data = await this.redis.get(key);
if (!data) return;
const budget = JSON.parse(data);
budget.currentSpend += amount;
await this.redis.setex(key, 86400, JSON.stringify(budget));
// Also record in time-series for analytics
await this.redis.zadd(
`spend:${scope}:history`,
Date.now(),
JSON.stringify({ amount, timestamp: new Date().toISOString() })
);
return budget.currentSpend;
}
async getSpendHistory(scope, hours = 24) {
const since = Date.now() - (hours * 60 * 60 * 1000);
const entries = await this.redis.zrangebyscore(
`spend:${scope}:history`,
since,
Date.now()
);
return entries.map(e => JSON.parse(e));
}
async getThrottledRate(scope) {
const check = await this.checkBudget(scope);
if (!check.allowed) return 0; // Fully blocked
if (check.utilization > 0.9) return 0.1; // 10% of normal rate
if (check.utilization > 0.8) return 0.25; // 25% of normal rate
if (check.utilization > 0.7) return 0.5; // 50% of normal rate
return 1.0; // Full rate
}
}
// Usage in n8n
const budget = new BudgetController(redis);
// Set up budgets
await budget.setBudget('hourly', {
maxAmount: 50, // $50/hour
warningThreshold: 0.75,
ttl: 3600
});
await budget.setBudget('daily', {
maxAmount: 500, // $500/day
warningThreshold: 0.8,
ttl: 86400
});
// Before making API call
const estimatedCost = 0.05; // $0.05 for this call
const hourlyCheck = await budget.checkBudget('hourly', estimatedCost);
if (!hourlyCheck.allowed) {
return {
json: {
error: 'Hourly budget exceeded',
retryAfter: 'Next hour',
currentSpend: hourlyCheck.currentSpend
}
};
}
// Make the API call
const response = await callLLMAPI($json.prompt);
const actualCost = response.usage.total_tokens * 0.00001; // Rough estimate
// Record the spend
await budget.recordSpend('hourly', actualCost);
await budget.recordSpend('daily', actualCost);
// Get spending analytics
const dailyHistory = await budget.getSpendHistory('daily', 24);
const currentDailySpend = dailyHistory.reduce((sum, e) => sum + e.amount, 0);
return {
json: {
result: response,
cost: actualCost,
dailySpend: currentDailySpend,
remaining: 500 - currentDailySpend
}
};
Monitoring and Observability
Distributed Tracing for Agent Workflows
Understanding what happens across dozens of agents requires distributed tracing.
// n8n Workflow: OpenTelemetry Tracing for Agent Systems
const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-http');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { trace, context, SpanStatusCode } = require('@opentelemetry/api');
// Initialize tracer
const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter({
url: $env.OTEL_COLLECTOR_URL
}),
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'agent-orchestrator',
[SemanticResourceAttributes.SERVICE_VERSION]: '2.0.0',
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: $env.ENVIRONMENT
})
});
sdk.start();
const tracer = trace.getTracer('agent-orchestrator');
class AgentTracer {
constructor(tracer) {
this.tracer = tracer;
}
async traceWorkflow(workflowId, workflowFn) {
return this.tracer.startActiveSpan(
`workflow:${workflowId}`,
{
attributes: {
'workflow.id': workflowId,
'workflow.type': 'agent-orchestration'
}
},
async (span) => {
try {
const result = await workflowFn(span);
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
span.recordException(error);
throw error;
} finally {
span.end();
}
}
);
}
async traceAgentCall(agentType, agentFn, parentContext) {
const span = this.tracer.startSpan(
`agent:${agentType}`,
{
attributes: {
'agent.type': agentType,
'agent.model': 'gpt-4o', // or dynamically detected
}
},
parentContext
);
const startTime = Date.now();
try {
const result = await context.with(
trace.setSpan(parentContext, span),
agentFn
);
span.setAttributes({
'agent.duration_ms': Date.now() - startTime,
'agent.tokens_input': result.usage?.prompt_tokens,
'agent.tokens_output': result.usage?.completion_tokens,
'agent.cost_usd': this.calculateCost(result.usage)
});
span.setStatus({ code: SpanStatusCode.OK });
return result;
} catch (error) {
span.setAttributes({
'error.type': error.name,
'error.message': error.message
});
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
throw error;
} finally {
span.end();
}
}
calculateCost(usage) {
if (!usage) return 0;
const inputCost = (usage.prompt_tokens / 1000000) * 2.50;
const outputCost = (usage.completion_tokens / 1000000) * 10.00;
return inputCost + outputCost;
}
}
// Usage in n8n
const agentTracer = new AgentTracer(tracer);
const result = await agentTracer.traceWorkflow($json.workflowId, async (workflowSpan) => {
const parentContext = trace.setSpan(context.active(), workflowSpan);
// Trace individual agent calls
const research = await agentTracer.traceAgentCall(
'research',
async () => await callResearchAgent($json.topic),
parentContext
);
const draft = await agentTracer.traceAgentCall(
'writer',
async () => await callWriterAgent(research),
parentContext
);
const edit = await agentTracer.traceAgentCall(
'editor',
async () => await callEditorAgent(draft),
parentContext
);
return { research, draft, edit };
});
return {
json: result
};
Real-Time Dashboard Metrics
// n8n Workflow: Metrics Collection for Dashboards
const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);
class MetricsCollector {
constructor(redis) {
this.redis = redis;
this.flushInterval = 60000; // 1 minute
}
async increment(counter, value = 1, labels = {}) {
const key = `metrics:counter:${counter}:${this.serializeLabels(labels)}`;
await this.redis.hincrby('metrics:realtime', key, value);
}
async gauge(metric, value, labels = {}) {
const key = `metrics:gauge:${metric}:${this.serializeLabels(labels)}`;
await this.redis.hset('metrics:realtime', key, value);
}
async histogram(metric, value, labels = {}) {
const key = `metrics:histogram:${metric}:${this.serializeLabels(labels)}`;
await this.redis.lpush(`metrics:histogram:${key}`, JSON.stringify({
value,
timestamp: Date.now()
}));
// Trim to last hour
await this.redis.ltrim(`metrics:histogram:${key}`, 0, 3599);
}
async timing(metric, durationMs, labels = {}) {
await this.histogram(`${metric}_duration`, durationMs, labels);
}
async getDashboardData(timeRange = '1h') {
const rangeMs = {
'1h': 3600000,
'6h': 21600000,
'24h': 86400000
}[timeRange] || 3600000;
const since = Date.now() - rangeMs;
// Aggregate metrics
const counters = await this.redis.hgetall('metrics:realtime');
return {
timestamp: new Date().toISOString(),
range: timeRange,
agents: {
active: parseInt(counters['metrics:counter:agent_active:{}'] || 0),
totalTasks: parseInt(counters['metrics:counter:tasks_completed:{}'] || 0),
failedTasks: parseInt(counters['metrics:counter:tasks_failed:{}'] || 0),
successRate: this.calculateSuccessRate(counters)
},
costs: {
total: parseFloat(counters['metrics:gauge:cost_total:{}'] || 0),
hourly: await this.getHourlyCost(),
byModel: await this.getCostByModel()
},
performance: {
avgLatency: parseFloat(counters['metrics:gauge:avg_latency:{}'] || 0),
p95Latency: parseFloat(counters['metrics:gauge:p95_latency:{}'] || 0),
throughput: parseInt(counters['metrics:counter:requests_total:{}'] || 0) / (rangeMs / 1000)
},
queues: await this.getQueueDepths(),
circuits: await this.getCircuitStates()
};
}
calculateSuccessRate(counters) {
const completed = parseInt(counters['metrics:counter:tasks_completed:{}'] || 0);
const failed = parseInt(counters['metrics:counter:tasks_failed:{}'] || 0);
if (completed + failed === 0) return 100;
return ((completed / (completed + failed)) * 100).toFixed(2);
}
serializeLabels(labels) {
return Object.entries(labels)
.map(([k, v]) => `${k}=${v}`)
.join(',');
}
async getHourlyCost() {
const hourAgo = Date.now() - 3600000;
const entries = await this.redis.zrangebyscore('spend:history', hourAgo, Date.now());
return entries.reduce((sum, e) => sum + JSON.parse(e).amount, 0);
}
async getCostByModel() {
// Aggregate by model from metrics
return {};
}
async getQueueDepths() {
const queues = ['agent:queue:critical', 'agent:queue:high', 'agent:queue:normal', 'agent:queue:low'];
const depths = {};
for (const queue of queues) {
depths[queue.split(':')[2]] = await this.redis.llen(queue);
}
return depths;
}
async getCircuitStates() {
const circuits = await this.redis.keys('circuit:*');
const states = {};
for (const key of circuits) {
const state = await this.redis.get(key);
states[key.replace('circuit:', '')] = state ? JSON.parse(state).status : 'unknown';
}
return states;
}
}
// Usage in n8n
const metrics = new MetricsCollector(redis);
// Record metrics during processing
await metrics.increment('tasks_completed', 1, { agent_type: 'writer' });
await metrics.timing('agent_response', 2450, { agent_type: 'writer', model: 'gpt-4o' });
await metrics.gauge('active_agents', 12);
// Get dashboard data
const dashboardData = await metrics.getDashboardData('1h');
return {
json: dashboardData
};
Production Deployment Architecture
Kubernetes-Ready Setup
# agent-orchestrator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-orchestrator
namespace: ai-automation
labels:
app: agent-orchestrator
version: v2.0.0
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: agent-orchestrator
template:
metadata:
labels:
app: agent-orchestrator
version: v2.0.0
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
containers:
- name: orchestrator
image: company/agent-orchestrator:2.0.0
ports:
- containerPort: 3000
name: http
- containerPort: 9090
name: metrics
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-credentials
key: url
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: llm-credentials
key: openai
- name: ANTHROPIC_API_KEY
valueFrom:
secretKeyRef:
name: llm-credentials
key: anthropic
- name: OTEL_COLLECTOR_URL
value: "http://otel-collector.monitoring:4318"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health/live
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
volumes:
- name: config
configMap:
name: agent-orchestrator-config
---
apiVersion: v1
kind: Service
metadata:
name: agent-orchestrator
namespace: ai-automation
spec:
selector:
app: agent-orchestrator
ports:
- port: 80
targetPort: 3000
name: http
- port: 9090
targetPort: 9090
name: metrics
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-orchestrator-hpa
namespace: ai-automation
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-orchestrator
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: agent_queue_depth
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 4
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 2
periodSeconds: 120
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: agent-orchestrator-pdb
namespace: ai-automation
spec:
minAvailable: 2
selector:
matchLabels:
app: agent-orchestrator
Monitoring Stack Integration
# prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: agent-orchestrator-alerts
namespace: monitoring
spec:
groups:
- name: agent-orchestrator
rules:
- alert: HighAgentFailureRate
expr: |
(
sum(rate(agent_tasks_failed_total[5m]))
/
sum(rate(agent_tasks_total[5m]))
) > 0.1
for: 5m
labels:
severity: warning
annotations:
summary: "High agent failure rate detected"
description: "Agent failure rate is {{ $value | humanizePercentage }} over last 5m"
- alert: BudgetExceeded
expr: agent_daily_spend_usd > 500
for: 1m
labels:
severity: critical
annotations:
summary: "Daily budget exceeded"
description: "Daily spend of ${{ $value }} exceeds $500 limit"
- alert: CircuitBreakerOpen
expr: circuit_breaker_status == 1
for: 1m
labels:
severity: warning
annotations:
summary: "Circuit breaker is open"
description: "Circuit {{ $labels.name }} has been open for >1m"
- alert: DLQGrowing
expr: |
(
deriv(agent_dlq_size[10m]) > 0.1
)
for: 5m
labels:
severity: critical
annotations:
summary: "Dead letter queue is growing"
description: "DLQ size increasing at {{ $value }} events/min"
- alert: HighLatency
expr: |
histogram_quantile(0.95,
sum(rate(agent_duration_seconds_bucket[5m])) by (le)
) > 30
for: 5m
labels:
severity: warning
annotations:
summary: "High agent latency"
description: "P95 latency is {{ $value }}s"
Conclusion: Building for Scale
Production-grade agent orchestration requires moving beyond simple API calls to sophisticated patterns that handle scale gracefully:
1. Event-Driven Architecture
- Decouple agents through message buses
- Use standardized event schemas
- Implement proper routing and filtering
2. Resilience Patterns
- Circuit breakers prevent cascading failures
- Exponential backoff with jitter avoids retry storms
- Dead letter queues capture unrecoverable failures
3. Cost Management
- Token optimization saves 50-80% on API costs
- Budget-based rate limiting prevents runaway spending
- Intelligent model selection matches task to capability
4. Observability
- Distributed tracing tracks requests across agents
- Real-time metrics enable proactive monitoring
- Structured logging simplifies debugging
Key Takeaways:
- Start with event-driven architecture from day one—retrofitting is painful
- Implement circuit breakers before you need them
- Monitor cost per task as closely as latency
- Plan for 10x scale—designs that work for 10 agents often fail at 100
The difference between a prototype agent system and a production one isn't the sophistication of the AI—it's the robustness of the orchestration layer beneath it.
Ready to scale your agent systems? Contact Tropical Media at https://tropical-media.work for expert consultation on production-grade AI agent architecture.
Tags: AI Agent Orchestration, Event-Driven Architecture, Message Queues, Redis Streams, n8n Scalability, Circuit Breakers, Cost Optimization, Production AI, Multi-Agent Systems, Fault Tolerance, Distributed Tracing, Kubernetes, 2026 AI Trends
MCP and A2A Protocols: The Complete Guide to Multi-Agent Automation Architecture with n8n and OpenClaw
Master the Model Context Protocol (MCP) and Agent2Agent (A2A) protocols for building production-grade multi-agent automation systems. Learn how to integrate 5,800+ MCP servers with n8n workflows, orchestrate agent-to-agent communication, and create enterprise-grade AI automation architectures with 30+ practical examples.
RAG-Powered AI Agents: Building Knowledge-Intensive Automation with n8n, Vector Databases, and GPT-5.5
Master Retrieval-Augmented Generation (RAG) for building knowledge-intensive AI agents with n8n. Learn to integrate Qdrant, Pinecone, and Weaviate vector databases, implement intelligent chunking strategies, and build production-ready RAG workflows with the new GPT-5.5 model. Complete with 25+ practical examples and architectural patterns.