Agent Orchestration·

Production-Grade AI Agent Orchestration: Scaling Multi-Agent Systems with Event-Driven Architecture

Master production-grade AI agent orchestration with event-driven architecture, message queues, and scalable patterns. Learn how to manage 100+ agents in n8n workflows, implement resilient error handling, optimize costs, and build fault-tolerant multi-agent systems with Redis, RabbitMQ, and Temporal.

Production-Grade AI Agent Orchestration: Scaling Multi-Agent Systems with Event-Driven Architecture

By May 2026, enterprises running AI agent systems face a critical inflection point. Organizations that started with simple 3-agent workflows are now struggling to manage 100+ agents in production. The challenge isn't building agents anymore—it's orchestrating them at scale without drowning in complexity, costs, or cascading failures.

The numbers tell the story. Companies running large-scale agent deployments report that 67% of their engineering time is spent not on building new capabilities, but on managing agent coordination, debugging cross-agent failures, and optimizing resource usage. A typical mid-size enterprise now runs 150-400 active agents across customer service, content generation, data processing, and internal automation—creating an operational challenge that resembles managing a distributed microservices architecture, but with the added complexity of non-deterministic AI behavior.

This comprehensive guide explores production-grade patterns for orchestrating multi-agent systems at scale. From event-driven architectures and message queue patterns to fault tolerance strategies and cost optimization techniques, we'll cover the engineering practices that separate prototype agent systems from enterprise-grade deployments. Whether you're managing 10 agents or 1,000, these patterns will help you build systems that are scalable, resilient, observable, and cost-effective.

The Scale Problem: Why Agent Orchestration Breaks Down

Understanding the Complexity Explosion

The complexity of multi-agent systems doesn't grow linearly—it explodes. A system with n agents can have n(n-1)/2 potential communication paths, meaning a 10-agent system has 45 possible interaction patterns, while a 50-agent system has 1,225.

┌────────────────────────────────────────────────────────────────┐
│                    Agent Communication Complexity               │
├────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Agents:  5      10      20      50      100     200           │
│  Paths:   10     45      190     1,225   4,950   19,900        │
│                                                                 │
│  └─ Linear growth would be: 5x, 10x, 20x, 50x, 100x, 200x      │
│  └─ Actual is:       2x, 4.5x, 19x,  122.5x, 495x, 1,990x     │
│                                                                 │
│  This is why point-to-point communication fails at scale.      │
└────────────────────────────────────────────────────────────────┘

Common Failure Patterns at Scale:

1. The Thundering Herd Problem

// Anti-pattern: Synchronous agent spawning
// When 1,000 tasks arrive simultaneously, agents get overwhelmed

async function processTasks(tasks) {
  // ❌ BAD: Creates agents for every task immediately
  const results = await Promise.all(
    tasks.map(task => spawnAgentAndProcess(task))
  );
  return results;
}

// Result: 1,000 simultaneous API calls to OpenAI
// Cost spike: $500 in 30 seconds
// Rate limit exceeded, most requests fail

2. Cascading Failures

// Anti-pattern: No circuit breakers or timeouts
// When one agent fails, dependent agents keep trying

async function researchAndWrite(topic) {
  const research = await researchAgent.query(topic); // Fails after 30s
  // ❌ BAD: No timeout, no fallback
  const draft = await writerAgent.create(research); // Times out too
  // ❌ BAD: Error propagates
  const edited = await editorAgent.review(draft); // Also fails
  return edited;
}

// Result: Three failed operations, wasted tokens, angry users

3. State Explosion

// Anti-pattern: Storing full state for every agent interaction
// Each conversation includes all previous context

const conversation = {
  messages: [
    { role: 'system', content: 'You are a helpful assistant...' }, // 500 tokens
    { role: 'user', content: 'Previous question...' },              // 1,200 tokens
    { role: 'assistant', content: 'Previous answer...' },           // 2,800 tokens
    { role: 'user', content: 'Another question...' },               // 1,500 tokens
    { role: 'assistant', content: 'Another answer...' },          // 2,200 tokens
    // ... continues for 50+ turns
  ]
};

// Result: 10th turn sends 45,000 tokens, costs $1.35
// 50th turn sends 225,000 tokens, costs $6.75
// User stops using the feature

4. Hidden Retry Storms

// Anti-pattern: Naive retry without backoff or coordination
// Multiple systems retry simultaneously

async function callWithRetry(fn, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn();
    } catch (e) {
      // ❌ BAD: No exponential backoff
      // ❌ BAD: All retries happen at same time across agents
      await sleep(1000); // Fixed 1s delay
    }
  }
  throw new Error('Failed after retries');
}

// Result: When API is degraded, retry load multiplies 3x
// 100 agents × 3 retries = 300 requests hitting struggling API

The Scale Thresholds

Based on production data from 200+ organizations, here are the practical scale thresholds where architecture changes become necessary:

ScaleAgentsKey ChallengesRequired Architecture
Prototype1-5CoordinationDirect API calls
Small5-20Task routingSimple queue + routing logic
Medium20-50State managementMessage bus + persistence layer
Large50-200Fault toleranceEvent-driven + circuit breakers
Enterprise200+Cost optimizationFull event sourcing + cost controls

The jump from "Medium" to "Large" is particularly significant—this is where synchronous patterns fail completely and event-driven architecture becomes mandatory.

Event-Driven Architecture for Agent Systems

Core Principles

Event-driven architecture (EDA) decouples agents through asynchronous message passing. Instead of agents directly calling each other, they publish events to a message bus and subscribe to relevant event types.

┌─────────────────────────────────────────────────────────────────┐
│              Event-Driven Agent Architecture                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   │
│   │  Agent A │    │  Agent B │    │  Agent C │    │  Agent D │   │
│   │ (Research)│    │(Writing) │    │(Editing) │    │(Review)  │   │
│   └────┬─────┘    └────┬─────┘    └────┬─────┘    └────┬─────┘   │
│        │               │               │               │          │
│        │  PUBLISH      │  PUBLISH      │  PUBLISH      │          │
│        ▼               ▼               ▼               ▼          │
│   ┌──────────────────────────────────────────────────────────┐   │
│   │                    Message Bus (Redis/RabbitMQ)          │   │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │   │
│   │  │research.done│  │ draft.ready │  │ edit.complete│         │   │
│   │  └─────────────┘  └─────────────┘  └─────────────┘         │   │
│   └──────────────────────────────────────────────────────────┘   │
│        │               │               │               │          │
│        │  SUBSCRIBE    │  SUBSCRIBE    │  SUBSCRIBE    │          │
│        ▼               ▼               ▼               ▼          │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐   │
│   │   Listens│    │  Listens │    │  Listens │    │  Listens │   │
│   │ for tasks│    │for draft │    │for review │    │for final │   │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘   │
│                                                                   │
│   Key Benefit: Agents don't know about each other directly        │
│   Agent B doesn't know WHO created the draft, just THAT one exists│
└─────────────────────────────────────────────────────────────────┘

Benefits of EDA for Agent Systems:

  1. Decoupling: Agents can be developed, deployed, and scaled independently
  2. Resilience: If one agent fails, others continue processing
  3. Observability: Every event is logged, enabling comprehensive monitoring
  4. Scalability: Add more agent instances by subscribing to event topics
  5. Flexibility: Reorder workflows by changing event routing, not agent code

Event Types and Schema Design

A well-designed event schema is critical for maintainability. Here's a production-grade schema for agent systems:

// Core event types for agent orchestration

interface AgentEvent {
  // Metadata
  eventId: string;           // UUID v4
  eventType: string;         // Fully qualified: agent.task.assigned
  timestamp: ISO8601;        // UTC
  version: string;           // Schema version: "2.1.0"
  
  // Context
  correlationId: string;     // Traces request across agents
  causationId: string;       // Points to triggering event
  sessionId: string;         // Groups related events
  
  // Content
  payload: unknown;          // Event-specific data
  
  // Routing
  source: AgentRef;          // Who emitted the event
  target?: AgentRef;         // Intended recipient (optional)
  priority: 'low' | 'normal' | 'high' | 'critical';
  
  // Lifecycle
  ttl?: number;             // Time-to-live in seconds
  retryCount: number;        // How many times processed
  deadline?: ISO8601;        // When event becomes stale
}

interface AgentRef {
  agentId: string;
  agentType: string;         // 'research', 'writing', 'review'
  version: string;
  instanceId: string;        // For multi-instance agents
}

// Specific event types
interface TaskAssignedEvent extends AgentEvent {
  eventType: 'agent.task.assigned';
  payload: {
    taskId: string;
    taskType: string;
    requirements: TaskRequirements;
    context: SharedContext;
    maxDuration: number;       // Seconds
    costBudget?: number;      // Max tokens/cost
  };
}

interface TaskCompletedEvent extends AgentEvent {
  eventType: 'agent.task.completed';
  payload: {
    taskId: string;
    result: TaskResult;
    metrics: {
      duration: number;       // Seconds
      tokensUsed: number;
      cost: number;
      retries: number;
    };
  };
}

interface TaskFailedEvent extends AgentEvent {
  eventType: 'agent.task.failed';
  payload: {
    taskId: string;
    error: {
      code: string;
      message: string;
      recoverable: boolean;
    };
    attempts: number;
    nextRetry?: ISO8601;
  };
}

Event Naming Conventions:

# Consistent naming enables routing and filtering

# Format: domain.entity.action.status
# Examples:

events:
  # Task lifecycle
  - agent.task.assigned
  - agent.task.started
  - agent.task.progress      # Intermediate updates
  - agent.task.completed
  - agent.task.failed
  - agent.task.cancelled
  - agent.task.expired       # TTL exceeded
  
  # Agent lifecycle
  - agent.instance.started
  - agent.instance.heartbeat
  - agent.instance.stopped
  - agent.instance.crashed
  - agent.instance.scale.up
  - agent.instance.scale.down
  
  # System events
  - system.circuit.opened
  - system.circuit.closed
  - system.rate.limit.exceeded
  - system.cost.budget.warning
  - system.cost.budget.exceeded

Implementing with n8n and Redis

Redis Streams provides an excellent message bus for agent systems—fast, persistent, and with consumer group support for scaling.

n8n Workflow: Event Publisher

// n8n Code Node: Publish Events to Redis Streams

const Redis = require('ioredis');
const { v4: uuidv4 } = require('uuid');

const redis = new Redis({
  host: $env.REDIS_HOST,
  port: $env.REDIS_PORT,
  password: $env.REDIS_PASSWORD,
  maxRetriesPerRequest: 3
});

async function publishEvent(eventData) {
  const event = {
    eventId: uuidv4(),
    eventType: eventData.type,
    timestamp: new Date().toISOString(),
    version: '2.1.0',
    correlationId: eventData.correlationId || uuidv4(),
    causationId: eventData.causationId,
    sessionId: eventData.sessionId,
    payload: JSON.stringify(eventData.payload),
    source: JSON.stringify(eventData.source),
    target: eventData.target ? JSON.stringify(eventData.target) : null,
    priority: eventData.priority || 'normal',
    retryCount: eventData.retryCount || 0,
    deadline: eventData.deadline
  };

  // Publish to stream with maxlen to prevent unbounded growth
  const streamKey = `events:${eventData.type.split('.')[0]}`;
  await redis.xadd(
    streamKey,
    'MAXLEN', '~', 10000,  // Approximate max length
    '*',                   // Auto-generate ID
    ...Object.entries(event).flat()
  );

  // Also publish to type-specific stream for targeted consumers
  await redis.xadd(
    `events:type:${eventData.type}`,
    'MAXLEN', '~', 5000,
    '*',
    ...Object.entries(event).flat()
  );

  return event;
}

// Example: Publish task assigned event
const event = await publishEvent({
  type: 'agent.task.assigned',
  correlationId: $json.correlationId,
  causationId: $json.triggerEventId,
  sessionId: $json.sessionId,
  payload: {
    taskId: `task-${Date.now()}`,
    taskType: 'content-research',
    requirements: {
      topic: $json.topic,
      depth: 'comprehensive',
      sources: ['industry-reports', 'competitor-analysis'],
      outputFormat: 'structured-json'
    },
    context: {
      industry: $json.industry,
      targetAudience: $json.audience,
      brandVoice: $json.brandVoice
    },
    maxDuration: 300,  // 5 minutes
    costBudget: 0.50   // $0.50 max
  },
  source: {
    agentId: 'orchestrator-1',
    agentType: 'workflow-orchestrator',
    version: '2.0.0',
    instanceId: 'orc-1a'
  },
  priority: 'high'
});

return {
  json: {
    published: true,
    eventId: event.eventId,
    correlationId: event.correlationId
  }
};

n8n Workflow: Event Consumer with Consumer Groups

// n8n Code Node: Consume Events with Redis Consumer Groups

const Redis = require('ioredis');

const redis = new Redis({
  host: $env.REDIS_HOST,
  port: $env.REDIS_PORT,
  password: $env.REDIS_PASSWORD
});

const CONSUMER_GROUP = 'writing-agents';
const CONSUMER_NAME = `writer-${$env.INSTANCE_ID || '1'}`;
const STREAM_KEY = 'events:type:agent.task.assigned';

async function consumeEvents() {
  try {
    // Ensure consumer group exists
    try {
      await redis.xgroup('CREATE', STREAM_KEY, CONSUMER_GROUP, '$', 'MKSTREAM');
    } catch (e) {
      // Group already exists, ignore error
      if (!e.message.includes('already exists')) throw e;
    }

    // Read events with timeout
    const messages = await redis.xreadgroup(
      'GROUP', CONSUMER_GROUP, CONSUMER_NAME,
      'BLOCK', 5000,  // Wait up to 5 seconds
      'COUNT', 10,    // Max 10 messages per batch
      'STREAMS', STREAM_KEY, '>'
    );

    if (!messages) {
      return { json: { processed: 0, events: [] } };
    }

    const processed = [];

    for (const [, entries] of messages) {
      for (const [id, fields] of entries) {
        const event = Object.fromEntries(
          fields.map((v, i) => i % 2 === 0 ? [v, fields[i + 1]] : null).filter(Boolean)
        );

        try {
          // Parse payload
          const payload = JSON.parse(event.payload);
          
          // Check priority
          if (event.priority === 'critical') {
            // Process immediately, don't batch
            await processCriticalTask(payload);
          } else {
            // Queue for batch processing
            await queueTask(payload);
          }

          // Acknowledge success
          await redis.xack(STREAM_KEY, CONSUMER_GROUP, id);
          
          processed.push({
            eventId: event.eventId,
            status: 'success',
            taskId: payload.taskId
          });

        } catch (error) {
          // Log failure but don't ack - will be retried
          processed.push({
            eventId: event.eventId,
            status: 'failed',
            error: error.message,
            willRetry: event.retryCount < 3
          });

          // If max retries exceeded, send to dead letter queue
          if (event.retryCount >= 3) {
            await redis.xadd(
              'events:dead-letter',
              '*',
              'originalEvent', JSON.stringify(event),
              'error', error.message,
              'failedAt', new Date().toISOString()
            );
            await redis.xack(STREAM_KEY, CONSUMER_GROUP, id);
          }
        }
      }
    }

    return {
      json: {
        processed: processed.length,
        events: processed
      }
    };

  } finally {
    await redis.quit();
  }
}

async function processCriticalTask(payload) {
  // Critical path processing
  // Skip queue, execute immediately
  const result = await executeAgentTask(payload);
  return result;
}

async function queueTask(payload) {
  // Add to internal queue for batch processing
  await redis.lpush('queue:pending-tasks', JSON.stringify(payload));
}

return await consumeEvents();

Message Queue Patterns for Agent Coordination

Pattern 1: Priority Queues

Different tasks require different SLAs. Priority queues ensure critical tasks get processed first.

// n8n Workflow: Multi-Priority Queue System

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

const PRIORITIES = ['critical', 'high', 'normal', 'low'];
const QUEUE_TTLS = {
  critical: 60,    // 1 minute
  high: 300,       // 5 minutes
  normal: 3600,    // 1 hour
  low: 86400       // 24 hours
};

class PriorityQueueManager {
  constructor(redis) {
    this.redis = redis;
    this.queues = {
      critical: 'agent:queue:critical',
      high: 'agent:queue:high',
      normal: 'agent:queue:normal',
      low: 'agent:queue:low'
    };
  }

  async enqueue(task, priority = 'normal') {
    const queue = this.queues[priority] || this.queues.normal;
    const ttl = QUEUE_TTLS[priority];

    const enrichedTask = {
      ...task,
      priority,
      enqueuedAt: Date.now(),
      expiresAt: Date.now() + (ttl * 1000),
      attempts: 0
    };

    // Use LPUSH for stack-like behavior (LIFO for same priority)
    // Or RPUSH for FIFO - depends on use case
    await this.redis.lpush(queue, JSON.stringify(enrichedTask));
    
    // Track queue depth metrics
    await this.redis.hincrby('metrics:queue-depth', priority, 1);

    return {
      taskId: enrichedTask.id,
      priority,
      queueDepth: await this.redis.llen(queue)
    };
  }

  async dequeue() {
    // Try each priority in order
    for (const priority of PRIORITIES) {
      const queue = this.queues[priority];
      const task = await this.redis.rpop(queue);
      
      if (task) {
        await this.redis.hincrby('metrics:queue-depth', priority, -1);
        return {
          task: JSON.parse(task),
          priority,
          waitTime: Date.now() - JSON.parse(task).enqueuedAt
        };
      }
    }
    
    return null; // No tasks available
  }

  async getQueueStats() {
    const stats = {};
    for (const [priority, queue] of Object.entries(this.queues)) {
      const length = await this.redis.llen(queue);
      const oldestTask = await this.redis.lindex(queue, -1);
      const oldestAge = oldestTask 
        ? Date.now() - JSON.parse(oldestTask).enqueuedAt 
        : 0;
      
      stats[priority] = {
        depth: length,
        oldestTaskAgeMs: oldestAge,
        maxWaitTime: QUEUE_TTLS[priority]
      };
    }
    return stats;
  }
}

// Usage in n8n
const queueManager = new PriorityQueueManager(redis);

// Producer: Add task with priority
const enqueueResult = await queueManager.enqueue({
  id: `task-${Date.now()}`,
  type: 'content-generation',
  payload: {
    topic: $json.topic,
    requirements: $json.requirements
  }
}, $json.priority || 'normal');

// Consumer: Get next task
const nextTask = await queueManager.dequeue();

// Monitor queue health
const stats = await queueManager.getQueueStats();

return {
  json: {
    enqueued: enqueueResult,
    nextTask,
    queueStats: stats
  }
};

Pattern 2: Delayed Execution

Some tasks need to be processed at a specific time (e.g., scheduled social media posts, follow-up emails).

// n8n Workflow: Delayed Task Scheduler

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class DelayedScheduler {
  constructor(redis) {
    this.redis = redis;
    this.delayedSet = 'agent:delayed-tasks';
    this.readyQueue = 'agent:queue:ready';
  }

  async schedule(task, executeAt) {
    const scheduledTask = {
      ...task,
      scheduledAt: Date.now(),
      executeAt: executeAt.getTime(),
      id: `delayed-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
    };

    // Add to sorted set with execute time as score
    await this.redis.zadd(
      this.delayedSet,
      scheduledTask.executeAt,
      JSON.stringify(scheduledTask)
    );

    return {
      scheduled: true,
      taskId: scheduledTask.id,
      executeAt: scheduledTask.executeAt,
      delayMs: scheduledTask.executeAt - Date.now()
    };
  }

  async pollAndPromote() {
    const now = Date.now();
    
    // Get all tasks ready to execute (score <= now)
    const readyTasks = await this.redis.zrangebyscore(
      this.delayedSet,
      '-inf',
      now,
      'WITHSCORES'
    );

    const promoted = [];

    for (let i = 0; i < readyTasks.length; i += 2) {
      const task = readyTasks[i];
      const score = readyTasks[i + 1];
      
      // Remove from delayed set
      await this.redis.zrem(this.delayedSet, task);
      
      // Add to ready queue
      await this.redis.lpush(this.readyQueue, task);
      
      promoted.push(JSON.parse(task));
    }

    // Get next task time for scheduling
    const nextTask = await this.redis.zrange(this.delayedSet, 0, 0, 'WITHSCORES');
    const nextExecution = nextTask.length > 0 
      ? parseInt(nextTask[1]) 
      : null;

    return {
      promoted: promoted.length,
      tasks: promoted.map(t => t.id),
      nextExecution,
      delayedRemaining: await this.redis.zcard(this.delayedSet)
    };
  }

  async getSchedule(windowHours = 24) {
    const now = Date.now();
    const windowEnd = now + (windowHours * 60 * 60 * 1000);
    
    const tasks = await this.redis.zrangebyscore(
      this.delayedSet,
      now,
      windowEnd,
      'WITHSCORES'
    );

    return tasks.reduce((acc, val, idx, arr) => {
      if (idx % 2 === 0) {
        const task = JSON.parse(val);
        const score = parseInt(arr[idx + 1]);
        acc.push({
          ...task,
          executeAt: new Date(score).toISOString(),
          timeUntil: score - now
        });
      }
      return acc;
    }, []);
  }
}

// Usage
const scheduler = new DelayedScheduler(redis);

// Schedule a task for later
const scheduleResult = await scheduler.schedule({
  type: 'social-post',
  payload: {
    platform: 'linkedin',
    content: $json.postContent,
    hashtags: $json.hashtags
  }
}, new Date($json.scheduledTime));

// Poll for ready tasks (run this every minute via cron)
const promotionResult = await scheduler.pollAndPromote();

// View upcoming schedule
const upcoming = await scheduler.getSchedule(48); // Next 48 hours

return {
  json: {
    scheduled: scheduleResult,
    promoted: promotionResult,
    upcoming: upcoming.slice(0, 10) // Show first 10
  }
};

Pattern 3: Saga Pattern for Distributed Transactions

When multiple agents need to complete a workflow together, use sagas to ensure consistency.

// n8n Workflow: Saga Pattern Implementation

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class SagaOrchestrator {
  constructor(redis) {
    this.redis = redis;
  }

  async startSaga(sagaId, steps) {
    const saga = {
      id: sagaId,
      status: 'running',
      startedAt: Date.now(),
      steps: steps.map((step, idx) => ({
        id: `step-${idx}`,
        name: step.name,
        action: step.action,
        compensation: step.compensation,
        status: 'pending',
        input: null,
        output: null
      })),
      currentStep: 0
    };

    await this.redis.setex(
      `saga:${sagaId}`,
      3600, // 1 hour TTL
      JSON.stringify(saga)
    );

    return this.executeNextStep(sagaId);
  }

  async executeNextStep(sagaId) {
    const saga = JSON.parse(await this.redis.get(`saga:${sagaId}`));
    
    if (saga.currentStep >= saga.steps.length) {
      // Saga complete
      saga.status = 'completed';
      saga.completedAt = Date.now();
      await this.redis.setex(`saga:${sagaId}`, 86400, JSON.stringify(saga));
      return { status: 'completed', saga };
    }

    const step = saga.steps[saga.currentStep];
    step.status = 'executing';
    step.startedAt = Date.now();
    await this.redis.setex(`saga:${sagaId}`, 3600, JSON.stringify(saga));

    try {
      // Execute the step action
      const result = await this.executeAction(step.action, saga);
      
      step.status = 'completed';
      step.output = result;
      step.completedAt = Date.now();
      saga.currentStep++;

      await this.redis.setex(`saga:${sagaId}`, 3600, JSON.stringify(saga));
      
      // Continue to next step
      return this.executeNextStep(sagaId);

    } catch (error) {
      step.status = 'failed';
      step.error = error.message;
      
      // Trigger compensation
      await this.compensate(saga, saga.currentStep);
      
      saga.status = 'compensated';
      saga.failedAt = Date.now();
      await this.redis.setex(`saga:${sagaId}`, 86400, JSON.stringify(saga));
      
      return { status: 'compensated', saga, error: error.message };
    }
  }

  async executeAction(action, saga) {
    // Dispatch to appropriate agent based on action type
    switch (action.type) {
      case 'agent-task':
        return await this.callAgent(action.agentType, action.payload);
      case 'webhook':
        return await fetch(action.url, {
          method: 'POST',
          body: JSON.stringify({ sagaId: saga.id, ...action.payload })
        });
      case 'n8n-subflow':
        return await this.triggerSubflow(action.workflowId, action.payload);
      default:
        throw new Error(`Unknown action type: ${action.type}`);
    }
  }

  async compensate(saga, failedStepIndex) {
    // Run compensation in reverse order
    for (let i = failedStepIndex; i >= 0; i--) {
      const step = saga.steps[i];
      if (step.compensation && step.output) {
        try {
          await this.executeAction(step.compensation, saga);
          step.compensationStatus = 'completed';
        } catch (e) {
          step.compensationStatus = 'failed';
          // Log for manual intervention
          await this.redis.lpush('saga:compensation-failures', JSON.stringify({
            sagaId: saga.id,
            stepId: step.id,
            error: e.message,
            timestamp: new Date().toISOString()
          }));
        }
      }
    }
  }

  async getSagaStatus(sagaId) {
    const saga = await this.redis.get(`saga:${sagaId}`);
    return saga ? JSON.parse(saga) : null;
  }
}

// Usage: Content creation saga
const orchestrator = new SagaOrchestrator(redis);

const sagaSteps = [
  {
    name: 'research',
    action: {
      type: 'agent-task',
      agentType: 'research-agent',
      payload: { topic: $json.topic, depth: 'comprehensive' }
    },
    compensation: {
      type: 'webhook',
      url: 'https://api.company.com/cleanup-research',
      payload: { action: 'cleanup' }
    }
  },
  {
    name: 'draft',
    action: {
      type: 'agent-task',
      agentType: 'writer-agent',
      payload: { /* uses previous step output */ }
    },
    compensation: {
      type: 'webhook',
      url: 'https://api.company.com/cleanup-draft',
      payload: { action: 'cleanup' }
    }
  },
  {
    name: 'publish',
    action: {
      type: 'webhook',
      url: 'https://cms.company.com/api/publish',
      payload: { /* uses draft output */ }
    },
    compensation: {
      type: 'webhook',
      url: 'https://cms.company.com/api/unpublish',
      payload: { action: 'unpublish' }
    }
  }
];

const sagaId = `saga-${Date.now()}`;
const result = await orchestrator.startSaga(sagaId, sagaSteps);

return {
  json: result
};

Fault Tolerance and Resilience Patterns

Circuit Breaker Implementation

Circuit breakers prevent cascading failures by temporarily rejecting requests when a service is struggling.

// n8n Workflow: Circuit Breaker for Agent Calls

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class CircuitBreaker {
  constructor(redis, config) {
    this.redis = redis;
    this.config = {
      failureThreshold: config.failureThreshold || 5,
      successThreshold: config.successThreshold || 3,
      timeout: config.timeout || 60000, // 1 minute
      ...config
    };
    this.stateKey = `circuit:${config.name}`;
  }

  async getState() {
    const state = await this.redis.get(this.stateKey);
    if (!state) {
      return {
        status: 'CLOSED',
        failures: 0,
        successes: 0,
        lastFailureTime: null,
        openedAt: null
      };
    }
    return JSON.parse(state);
  }

  async recordSuccess() {
    const state = await this.getState();
    
    if (state.status === 'HALF_OPEN') {
      state.successes++;
      if (state.successes >= this.config.successThreshold) {
        state.status = 'CLOSED';
        state.failures = 0;
        state.successes = 0;
        await this.redis.publish('circuit:events', JSON.stringify({
          circuit: this.config.name,
          event: 'CLOSED',
          timestamp: new Date().toISOString()
        }));
      }
    }
    
    await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
    return state;
  }

  async recordFailure() {
    const state = await this.getState();
    state.failures++;
    state.lastFailureTime = Date.now();

    if (state.status === 'CLOSED' && state.failures >= this.config.failureThreshold) {
      state.status = 'OPEN';
      state.openedAt = Date.now();
      await this.redis.publish('circuit:events', JSON.stringify({
        circuit: this.config.name,
        event: 'OPENED',
        timestamp: new Date().toISOString()
      }));
    }

    await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
    return state;
  }

  async canExecute() {
    const state = await this.getState();
    
    if (state.status === 'CLOSED') {
      return { allowed: true, state };
    }

    if (state.status === 'OPEN') {
      const timeOpen = Date.now() - state.openedAt;
      
      if (timeOpen > this.config.timeout) {
        // Transition to HALF_OPEN
        state.status = 'HALF_OPEN';
        state.successes = 0;
        await this.redis.setex(this.stateKey, 3600, JSON.stringify(state));
        await this.redis.publish('circuit:events', JSON.stringify({
          circuit: this.config.name,
          event: 'HALF_OPEN',
          timestamp: new Date().toISOString()
        }));
        return { allowed: true, state, trial: true };
      }
      
      return { 
        allowed: false, 
        state,
        retryAfter: Math.ceil((this.config.timeout - timeOpen) / 1000)
      };
    }

    if (state.status === 'HALF_OPEN') {
      return { allowed: true, state, trial: true };
    }
  }

  async execute(operation) {
    const permission = await this.canExecute();
    
    if (!permission.allowed) {
      throw new Error(`Circuit breaker OPEN for ${this.config.name}. Retry after ${permission.retryAfter}s`);
    }

    try {
      const result = await operation();
      await this.recordSuccess();
      return { result, state: permission.state, trial: permission.trial };
    } catch (error) {
      await this.recordFailure();
      throw error;
    }
  }
}

// Usage for LLM API calls
const openAICircuit = new CircuitBreaker(redis, {
  name: 'openai-api',
  failureThreshold: 5,
  successThreshold: 3,
  timeout: 60000
});

try {
  const result = await openAICircuit.execute(async () => {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${$env.OPENAI_API_KEY}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: 'gpt-4o',
        messages: $json.messages
      })
    });
    
    if (!response.ok) {
      throw new Error(`OpenAI API error: ${response.status}`);
    }
    
    return await response.json();
  });

  return {
    json: {
      success: true,
      data: result.result,
      circuitState: result.state.status,
      trial: result.trial || false
    }
  };

} catch (error) {
  return {
    json: {
      success: false,
      error: error.message,
      fallback: 'Using cached response or alternative model'
    }
  };
}

Exponential Backoff with Jitter

Prevents retry storms by adding randomization to retry delays.

// n8n Workflow: Smart Retry with Exponential Backoff and Jitter

class RetryWithBackoff {
  constructor(config = {}) {
    this.maxRetries = config.maxRetries || 5;
    this.baseDelay = config.baseDelay || 1000; // 1 second
    this.maxDelay = config.maxDelay || 60000; // 60 seconds
    this.jitterFactor = config.jitterFactor || 0.3; // 30% randomization
  }

  calculateDelay(attempt) {
    // Exponential: base * 2^attempt
    const exponential = this.baseDelay * Math.pow(2, attempt);
    const capped = Math.min(exponential, this.maxDelay);
    
    // Add jitter: ±jitterFactor
    const jitter = capped * this.jitterFactor * (Math.random() * 2 - 1);
    const finalDelay = capped + jitter;
    
    return Math.max(0, Math.floor(finalDelay));
  }

  async execute(operation, context) {
    const attempts = [];
    
    for (let attempt = 0; attempt < this.maxRetries; attempt++) {
      const startTime = Date.now();
      
      try {
        const result = await operation();
        
        attempts.push({
          attempt: attempt + 1,
          duration: Date.now() - startTime,
          success: true
        });
        
        return {
          success: true,
          result,
          attempts,
          totalDuration: Date.now() - attempts[0].startTime
        };
        
      } catch (error) {
        const duration = Date.now() - startTime;
        
        attempts.push({
          attempt: attempt + 1,
          duration,
          success: false,
          error: error.message
        });

        if (attempt === this.maxRetries - 1) {
          // Final attempt failed
          return {
            success: false,
            error: error.message,
            attempts,
            totalDuration: Date.now() - attempts[0].startTime
          };
        }

        // Calculate and apply backoff
        const delay = this.calculateDelay(attempt);
        attempts[attempt].nextDelay = delay;
        
        await this.sleep(delay);
      }
    }
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// Usage
const retryConfig = {
  maxRetries: 5,
  baseDelay: 2000,    // Start with 2 seconds
  maxDelay: 32000,    // Cap at 32 seconds
  jitterFactor: 0.2   // 20% jitter
};

const retrier = new RetryWithBackoff(retryConfig);

const result = await retrier.execute(async () => {
  const response = await fetch('https://api.anthropic.com/v1/messages', {
    method: 'POST',
    headers: {
      'x-api-key': $env.ANTHROPIC_API_KEY,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      model: 'claude-3-7-sonnet-20250219',
      max_tokens: 4096,
      messages: $json.messages
    })
  });

  if (response.status === 429) {
    const retryAfter = response.headers.get('retry-after');
    throw new Error(`Rate limited. Retry after: ${retryAfter}`);
  }

  if (!response.ok) {
    throw new Error(`API error: ${response.status}`);
  }

  return await response.json();
});

return {
  json: result
};

Dead Letter Queue and Error Handling

Not all failures are recoverable. A dead letter queue captures failed events for analysis and manual intervention.

// n8n Workflow: Dead Letter Queue Pattern

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class DeadLetterQueue {
  constructor(redis) {
    this.redis = redis;
    this.dlqKey = 'queue:dead-letter';
    this.maxSize = 10000; // Keep last 10k failed events
  }

  async add(event, error, context) {
    const deadEvent = {
      originalEvent: event,
      error: {
        message: error.message,
        stack: error.stack,
        code: error.code
      },
      context: {
        ...context,
        failedAt: new Date().toISOString(),
        retryCount: event.retryCount || 0
      },
      id: `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
    };

    // Add to DLQ with timestamp score for sorting
    await this.redis.zadd(
      this.dlqKey,
      Date.now(),
      JSON.stringify(deadEvent)
    );

    // Trim to max size
    const currentSize = await this.redis.zcard(this.dlqKey);
    if (currentSize > this.maxSize) {
      await this.redis.zremrangebyrank(this.dlqKey, 0, currentSize - this.maxSize - 1);
    }

    // Alert if DLQ is growing fast
    if (currentSize > this.maxSize * 0.8) {
      await this.redis.publish('alerts:dlq', JSON.stringify({
        severity: 'warning',
        message: `DLQ at ${Math.round((currentSize / this.maxSize) * 100)}% capacity`,
        timestamp: new Date().toISOString()
      }));
    }

    return deadEvent.id;
  }

  async list(limit = 50, offset = 0) {
    const events = await this.redis.zrevrange(
      this.dlqKey,
      offset,
      offset + limit - 1
    );
    
    return events.map(e => {
      const parsed = JSON.parse(e);
      return {
        ...parsed,
        age: Date.now() - new Date(parsed.context.failedAt).getTime()
      };
    });
  }

  async retry(eventId) {
    // Find event
    const events = await this.redis.zrange(this.dlqKey, 0, -1);
    const event = events.find(e => JSON.parse(e).id === eventId);
    
    if (!event) {
      throw new Error('Event not found in DLQ');
    }

    const parsed = JSON.parse(event);
    
    // Re-queue original event
    await this.redis.lpush('queue:pending-tasks', JSON.stringify({
      ...parsed.originalEvent,
      retryCount: (parsed.originalEvent.retryCount || 0) + 1,
      retryFromDLQ: true,
      originalFailure: parsed.error
    }));

    // Remove from DLQ
    await this.redis.zrem(this.dlqKey, event);

    return { success: true, message: 'Event re-queued' };
  }

  async getStats() {
    const total = await this.redis.zcard(this.dlqKey);
    
    // Get time distribution
    const now = Date.now();
    const oneHourAgo = now - 3600000;
    const oneDayAgo = now - 86400000;
    
    const lastHour = await this.redis.zcount(this.dlqKey, oneHourAgo, now);
    const lastDay = await this.redis.zcount(this.dlqKey, oneDayAgo, now);
    
    // Group by error type
    const allEvents = await this.redis.zrange(this.dlqKey, 0, -1);
    const errorTypes = allEvents.reduce((acc, e) => {
      const error = JSON.parse(e).error.code || 'unknown';
      acc[error] = (acc[error] || 0) + 1;
      return acc;
    }, {});

    return {
      total,
      lastHour,
      lastDay,
      errorTypes,
      oldestEvent: total > 0 ? await this.redis.zrange(this.dlqKey, 0, 0) : null
    };
  }
}

// Usage in n8n
const dlq = new DeadLetterQueue(redis);

// When processing fails
try {
  await processEvent($json.event);
} catch (error) {
  const dlqId = await dlq.add($json.event, error, {
    workflowId: $env.WORKFLOW_ID,
    nodeId: $env.NODE_ID
  });
  
  return {
    json: {
      processed: false,
      movedToDLQ: true,
      dlqId,
      error: error.message
    }
  };
}

// Monitor DLQ stats
const stats = await dlq.getStats();

// List recent failures for review
const recentFailures = await dlq.list(20);

return {
  json: {
    dlqStats: stats,
    recentFailures
  }
};

Cost Optimization Strategies

Token Usage Optimization

Token costs are the primary expense in agent systems. Smart strategies can reduce costs by 50-80%.

// n8n Workflow: Token Optimization Strategies

class TokenOptimizer {
  constructor() {
    this.cache = new Map(); // In production, use Redis
    this.modelCosts = {
      'gpt-4o': { input: 2.50, output: 10.00, per: 1000000 }, // per 1M tokens
      'gpt-4o-mini': { input: 0.15, output: 0.60, per: 1000000 },
      'claude-3-7-sonnet': { input: 3.00, output: 15.00, per: 1000000 },
      'claude-3-5-haiku': { input: 0.80, output: 4.00, per: 1000000 }
    };
  }

  // Strategy 1: Intelligent Model Selection
  selectModel(taskComplexity) {
    const selection = {
      simple: 'gpt-4o-mini',      // Classification, extraction
      medium: 'claude-3-5-haiku', // Analysis, summarization
      complex: 'gpt-4o',          // Reasoning, generation
      critical: 'claude-3-7-sonnet' // High-stakes decisions
    };
    
    return selection[taskComplexity] || selection.medium;
  }

  // Strategy 2: Context Compression
  compressContext(messages, maxTokens = 4000) {
    let totalTokens = this.estimateTokens(messages);
    
    if (totalTokens <= maxTokens) {
      return messages;
    }

    // Priority order: system > last user > last assistant > older messages
    const systemMsgs = messages.filter(m => m.role === 'system');
    const userMsgs = messages.filter(m => m.role === 'user');
    const assistantMsgs = messages.filter(m => m.role === 'assistant');

    const compressed = [...systemMsgs];
    
    // Always keep the most recent exchange
    if (userMsgs.length > 0) {
      compressed.push(userMsgs[userMsgs.length - 1]);
    }
    if (assistantMsgs.length > 0) {
      compressed.push(assistantMsgs[assistantMsgs.length - 1]);
    }

    // Add older messages if space permits
    const remaining = maxTokens - this.estimateTokens(compressed);
    if (remaining > 0) {
      const olderMessages = [...userMsgs.slice(0, -1), ...assistantMsgs.slice(0, -1)]
        .sort((a, b) => messages.indexOf(a) - messages.indexOf(b));
      
      for (const msg of olderMessages) {
        if (this.estimateTokens([...compressed, msg]) <= maxTokens) {
          compressed.push(msg);
        } else {
          // Summarize old messages instead of dropping
          const summary = this.createSummary(compressed);
          return [systemMsgs[0], { role: 'user', content: `Previous context: ${summary}` }, compressed[compressed.length - 1]];
        }
      }
    }

    return compressed;
  }

  // Strategy 3: Response Caching
  async getCachedResponse(promptHash) {
    // In production: Redis or similar
    const cached = this.cache.get(promptHash);
    if (cached && Date.now() - cached.timestamp < 3600000) { // 1 hour TTL
      return cached.response;
    }
    return null;
  }

  async cacheResponse(promptHash, response) {
    this.cache.set(promptHash, {
      response,
      timestamp: Date.now()
    });
  }

  // Strategy 4: Batch Processing
  async batchProcess(items, batchSize = 10) {
    const results = [];
    const batches = [];
    
    for (let i = 0; i < items.length; i += batchSize) {
      batches.push(items.slice(i, i + batchSize));
    }

    for (const batch of batches) {
      // Process batch in single call
      const batchResult = await this.processBatch(batch);
      results.push(...batchResult);
    }

    return results;
  }

  // Strategy 5: Streaming for Long Outputs
  async streamProcess(messages, onChunk) {
    let fullResponse = '';
    let tokenCount = 0;
    
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${$env.OPENAI_API_KEY}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        model: 'gpt-4o',
        messages,
        stream: true,
        max_tokens: 4000
      })
    });

    const reader = response.body.getReader();
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      const chunk = new TextDecoder().decode(value);
      const lines = chunk.split('\n').filter(line => line.trim());
      
      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') continue;
          
          try {
            const parsed = JSON.parse(data);
            const content = parsed.choices[0]?.delta?.content || '';
            fullResponse += content;
            tokenCount++;
            
            // Call progress handler
            if (onChunk) {
              await onChunk(content, tokenCount);
            }
            
            // Early termination if we have enough
            if (tokenCount >= 1000 && this.hasCompleteAnswer(fullResponse)) {
              reader.releaseLock();
              return { response: fullResponse, tokens: tokenCount, truncated: true };
            }
          } catch (e) {
            // Ignore parsing errors in stream
          }
        }
      }
    }

    return { response: fullResponse, tokens: tokenCount };
  }

  // Helper methods
  estimateTokens(messages) {
    // Rough estimation: ~4 characters per token
    const text = messages.map(m => m.content).join('');
    return Math.ceil(text.length / 4);
  }

  createSummary(messages) {
    // In production: Use a smaller model for summarization
    return `Conversation with ${messages.length} messages about ${messages[0].content.substring(0, 50)}...`;
  }

  hasCompleteAnswer(text) {
    // Detect if response seems complete
    return text.trim().endsWith('.') || text.trim().endsWith('}');
  }

  calculateCost(model, inputTokens, outputTokens) {
    const costs = this.modelCosts[model];
    if (!costs) return 0;
    
    const inputCost = (inputTokens / costs.per) * costs.input;
    const outputCost = (outputTokens / costs.per) * costs.output;
    
    return inputCost + outputCost;
  }
}

// Usage
const optimizer = new TokenOptimizer();

// Example workflow
const task = $json.task;
const complexity = optimizer.assessComplexity(task); // Your logic here

// Select appropriate model
const model = optimizer.selectModel(complexity);

// Check cache
const cacheKey = optimizer.hashPrompt(task.prompt);
const cached = await optimizer.getCachedResponse(cacheKey);

if (cached) {
  return {
    json: {
      result: cached,
      cost: 0,
      source: 'cache'
    }
  };
}

// Compress context
const compressedMessages = optimizer.compressContext(task.messages, 3000);

// Call API with optimized parameters
const response = await optimizer.streamProcess(compressedMessages, (chunk, count) => {
  // Update progress if needed
  console.log(`Received ${count} tokens`);
});

// Cache result
await optimizer.cacheResponse(cacheKey, response.response);

// Calculate cost
const cost = optimizer.calculateCost(model, optimizer.estimateTokens(compressedMessages), response.tokens);

return {
  json: {
    result: response.response,
    cost,
    tokens: response.tokens,
    truncated: response.truncated || false,
    model
  }
};

Budget-Based Rate Limiting

Prevent runaway costs with budget-based circuit breakers.

// n8n Workflow: Budget-Based Rate Limiting

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class BudgetController {
  constructor(redis) {
    this.redis = redis;
    this.budgets = new Map();
  }

  async setBudget(scope, config) {
    const budget = {
      scope, // 'daily', 'hourly', 'per-workflow'
      maxAmount: config.maxAmount,
      currentSpend: 0,
      currency: config.currency || 'USD',
      warningThreshold: config.warningThreshold || 0.8,
      action: config.action || 'block' // 'block', 'throttle', 'alert'
    };

    const key = `budget:${scope}`;
    await this.redis.setex(key, config.ttl || 86400, JSON.stringify(budget));
    
    return budget;
  }

  async checkBudget(scope, estimatedCost = 0) {
    const key = `budget:${scope}`;
    const data = await this.redis.get(key);
    
    if (!data) {
      return { allowed: true, reason: 'no-budget-set' };
    }

    const budget = JSON.parse(data);
    const projectedSpend = budget.currentSpend + estimatedCost;
    const utilization = projectedSpend / budget.maxAmount;

    if (utilization >= 1.0) {
      return {
        allowed: false,
        reason: 'budget-exceeded',
        currentSpend: budget.currentSpend,
        budget: budget.maxAmount,
        action: budget.action
      };
    }

    if (utilization >= budget.warningThreshold) {
      // Send warning
      await this.redis.publish('alerts:budget', JSON.stringify({
        scope,
        currentSpend: budget.currentSpend,
        budget: budget.maxAmount,
        utilization,
        timestamp: new Date().toISOString()
      }));
    }

    return {
      allowed: true,
      utilization,
      remaining: budget.maxAmount - budget.currentSpend
    };
  }

  async recordSpend(scope, amount) {
    const key = `budget:${scope}`;
    const data = await this.redis.get(key);
    
    if (!data) return;

    const budget = JSON.parse(data);
    budget.currentSpend += amount;
    
    await this.redis.setex(key, 86400, JSON.stringify(budget));
    
    // Also record in time-series for analytics
    await this.redis.zadd(
      `spend:${scope}:history`,
      Date.now(),
      JSON.stringify({ amount, timestamp: new Date().toISOString() })
    );

    return budget.currentSpend;
  }

  async getSpendHistory(scope, hours = 24) {
    const since = Date.now() - (hours * 60 * 60 * 1000);
    const entries = await this.redis.zrangebyscore(
      `spend:${scope}:history`,
      since,
      Date.now()
    );
    
    return entries.map(e => JSON.parse(e));
  }

  async getThrottledRate(scope) {
    const check = await this.checkBudget(scope);
    
    if (!check.allowed) return 0; // Fully blocked
    
    if (check.utilization > 0.9) return 0.1; // 10% of normal rate
    if (check.utilization > 0.8) return 0.25;  // 25% of normal rate
    if (check.utilization > 0.7) return 0.5;   // 50% of normal rate
    
    return 1.0; // Full rate
  }
}

// Usage in n8n
const budget = new BudgetController(redis);

// Set up budgets
await budget.setBudget('hourly', {
  maxAmount: 50, // $50/hour
  warningThreshold: 0.75,
  ttl: 3600
});

await budget.setBudget('daily', {
  maxAmount: 500, // $500/day
  warningThreshold: 0.8,
  ttl: 86400
});

// Before making API call
const estimatedCost = 0.05; // $0.05 for this call
const hourlyCheck = await budget.checkBudget('hourly', estimatedCost);

if (!hourlyCheck.allowed) {
  return {
    json: {
      error: 'Hourly budget exceeded',
      retryAfter: 'Next hour',
      currentSpend: hourlyCheck.currentSpend
    }
  };
}

// Make the API call
const response = await callLLMAPI($json.prompt);
const actualCost = response.usage.total_tokens * 0.00001; // Rough estimate

// Record the spend
await budget.recordSpend('hourly', actualCost);
await budget.recordSpend('daily', actualCost);

// Get spending analytics
const dailyHistory = await budget.getSpendHistory('daily', 24);
const currentDailySpend = dailyHistory.reduce((sum, e) => sum + e.amount, 0);

return {
  json: {
    result: response,
    cost: actualCost,
    dailySpend: currentDailySpend,
    remaining: 500 - currentDailySpend
  }
};

Monitoring and Observability

Distributed Tracing for Agent Workflows

Understanding what happens across dozens of agents requires distributed tracing.

// n8n Workflow: OpenTelemetry Tracing for Agent Systems

const { NodeSDK } = require('@opentelemetry/sdk-node');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-http');
const { Resource } = require('@opentelemetry/resources');
const { SemanticResourceAttributes } = require('@opentelemetry/semantic-conventions');
const { trace, context, SpanStatusCode } = require('@opentelemetry/api');

// Initialize tracer
const sdk = new NodeSDK({
  traceExporter: new OTLPTraceExporter({
    url: $env.OTEL_COLLECTOR_URL
  }),
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'agent-orchestrator',
    [SemanticResourceAttributes.SERVICE_VERSION]: '2.0.0',
    [SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: $env.ENVIRONMENT
  })
});

sdk.start();

const tracer = trace.getTracer('agent-orchestrator');

class AgentTracer {
  constructor(tracer) {
    this.tracer = tracer;
  }

  async traceWorkflow(workflowId, workflowFn) {
    return this.tracer.startActiveSpan(
      `workflow:${workflowId}`,
      {
        attributes: {
          'workflow.id': workflowId,
          'workflow.type': 'agent-orchestration'
        }
      },
      async (span) => {
        try {
          const result = await workflowFn(span);
          span.setStatus({ code: SpanStatusCode.OK });
          return result;
        } catch (error) {
          span.setStatus({
            code: SpanStatusCode.ERROR,
            message: error.message
          });
          span.recordException(error);
          throw error;
        } finally {
          span.end();
        }
      }
    );
  }

  async traceAgentCall(agentType, agentFn, parentContext) {
    const span = this.tracer.startSpan(
      `agent:${agentType}`,
      {
        attributes: {
          'agent.type': agentType,
          'agent.model': 'gpt-4o', // or dynamically detected
        }
      },
      parentContext
    );

    const startTime = Date.now();
    
    try {
      const result = await context.with(
        trace.setSpan(parentContext, span),
        agentFn
      );
      
      span.setAttributes({
        'agent.duration_ms': Date.now() - startTime,
        'agent.tokens_input': result.usage?.prompt_tokens,
        'agent.tokens_output': result.usage?.completion_tokens,
        'agent.cost_usd': this.calculateCost(result.usage)
      });
      
      span.setStatus({ code: SpanStatusCode.OK });
      return result;
      
    } catch (error) {
      span.setAttributes({
        'error.type': error.name,
        'error.message': error.message
      });
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: error.message
      });
      throw error;
      
    } finally {
      span.end();
    }
  }

  calculateCost(usage) {
    if (!usage) return 0;
    const inputCost = (usage.prompt_tokens / 1000000) * 2.50;
    const outputCost = (usage.completion_tokens / 1000000) * 10.00;
    return inputCost + outputCost;
  }
}

// Usage in n8n
const agentTracer = new AgentTracer(tracer);

const result = await agentTracer.traceWorkflow($json.workflowId, async (workflowSpan) => {
  const parentContext = trace.setSpan(context.active(), workflowSpan);
  
  // Trace individual agent calls
  const research = await agentTracer.traceAgentCall(
    'research',
    async () => await callResearchAgent($json.topic),
    parentContext
  );
  
  const draft = await agentTracer.traceAgentCall(
    'writer',
    async () => await callWriterAgent(research),
    parentContext
  );
  
  const edit = await agentTracer.traceAgentCall(
    'editor',
    async () => await callEditorAgent(draft),
    parentContext
  );
  
  return { research, draft, edit };
});

return {
  json: result
};

Real-Time Dashboard Metrics

// n8n Workflow: Metrics Collection for Dashboards

const Redis = require('ioredis');
const redis = new Redis($env.REDIS_URL);

class MetricsCollector {
  constructor(redis) {
    this.redis = redis;
    this.flushInterval = 60000; // 1 minute
  }

  async increment(counter, value = 1, labels = {}) {
    const key = `metrics:counter:${counter}:${this.serializeLabels(labels)}`;
    await this.redis.hincrby('metrics:realtime', key, value);
  }

  async gauge(metric, value, labels = {}) {
    const key = `metrics:gauge:${metric}:${this.serializeLabels(labels)}`;
    await this.redis.hset('metrics:realtime', key, value);
  }

  async histogram(metric, value, labels = {}) {
    const key = `metrics:histogram:${metric}:${this.serializeLabels(labels)}`;
    await this.redis.lpush(`metrics:histogram:${key}`, JSON.stringify({
      value,
      timestamp: Date.now()
    }));
    // Trim to last hour
    await this.redis.ltrim(`metrics:histogram:${key}`, 0, 3599);
  }

  async timing(metric, durationMs, labels = {}) {
    await this.histogram(`${metric}_duration`, durationMs, labels);
  }

  async getDashboardData(timeRange = '1h') {
    const rangeMs = {
      '1h': 3600000,
      '6h': 21600000,
      '24h': 86400000
    }[timeRange] || 3600000;

    const since = Date.now() - rangeMs;

    // Aggregate metrics
    const counters = await this.redis.hgetall('metrics:realtime');
    
    return {
      timestamp: new Date().toISOString(),
      range: timeRange,
      agents: {
        active: parseInt(counters['metrics:counter:agent_active:{}'] || 0),
        totalTasks: parseInt(counters['metrics:counter:tasks_completed:{}'] || 0),
        failedTasks: parseInt(counters['metrics:counter:tasks_failed:{}'] || 0),
        successRate: this.calculateSuccessRate(counters)
      },
      costs: {
        total: parseFloat(counters['metrics:gauge:cost_total:{}'] || 0),
        hourly: await this.getHourlyCost(),
        byModel: await this.getCostByModel()
      },
      performance: {
        avgLatency: parseFloat(counters['metrics:gauge:avg_latency:{}'] || 0),
        p95Latency: parseFloat(counters['metrics:gauge:p95_latency:{}'] || 0),
        throughput: parseInt(counters['metrics:counter:requests_total:{}'] || 0) / (rangeMs / 1000)
      },
      queues: await this.getQueueDepths(),
      circuits: await this.getCircuitStates()
    };
  }

  calculateSuccessRate(counters) {
    const completed = parseInt(counters['metrics:counter:tasks_completed:{}'] || 0);
    const failed = parseInt(counters['metrics:counter:tasks_failed:{}'] || 0);
    if (completed + failed === 0) return 100;
    return ((completed / (completed + failed)) * 100).toFixed(2);
  }

  serializeLabels(labels) {
    return Object.entries(labels)
      .map(([k, v]) => `${k}=${v}`)
      .join(',');
  }

  async getHourlyCost() {
    const hourAgo = Date.now() - 3600000;
    const entries = await this.redis.zrangebyscore('spend:history', hourAgo, Date.now());
    return entries.reduce((sum, e) => sum + JSON.parse(e).amount, 0);
  }

  async getCostByModel() {
    // Aggregate by model from metrics
    return {};
  }

  async getQueueDepths() {
    const queues = ['agent:queue:critical', 'agent:queue:high', 'agent:queue:normal', 'agent:queue:low'];
    const depths = {};
    for (const queue of queues) {
      depths[queue.split(':')[2]] = await this.redis.llen(queue);
    }
    return depths;
  }

  async getCircuitStates() {
    const circuits = await this.redis.keys('circuit:*');
    const states = {};
    for (const key of circuits) {
      const state = await this.redis.get(key);
      states[key.replace('circuit:', '')] = state ? JSON.parse(state).status : 'unknown';
    }
    return states;
  }
}

// Usage in n8n
const metrics = new MetricsCollector(redis);

// Record metrics during processing
await metrics.increment('tasks_completed', 1, { agent_type: 'writer' });
await metrics.timing('agent_response', 2450, { agent_type: 'writer', model: 'gpt-4o' });
await metrics.gauge('active_agents', 12);

// Get dashboard data
const dashboardData = await metrics.getDashboardData('1h');

return {
  json: dashboardData
};

Production Deployment Architecture

Kubernetes-Ready Setup

# agent-orchestrator-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-orchestrator
  namespace: ai-automation
  labels:
    app: agent-orchestrator
    version: v2.0.0
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: agent-orchestrator
  template:
    metadata:
      labels:
        app: agent-orchestrator
        version: v2.0.0
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9090"
    spec:
      containers:
      - name: orchestrator
        image: company/agent-orchestrator:2.0.0
        ports:
        - containerPort: 3000
          name: http
        - containerPort: 9090
          name: metrics
        env:
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: redis-credentials
              key: url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: llm-credentials
              key: openai
        - name: ANTHROPIC_API_KEY
          valueFrom:
            secretKeyRef:
              name: llm-credentials
              key: anthropic
        - name: OTEL_COLLECTOR_URL
          value: "http://otel-collector.monitoring:4318"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health/live
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
        volumeMounts:
        - name: config
          mountPath: /app/config
      volumes:
      - name: config
        configMap:
          name: agent-orchestrator-config
---
apiVersion: v1
kind: Service
metadata:
  name: agent-orchestrator
  namespace: ai-automation
spec:
  selector:
    app: agent-orchestrator
  ports:
  - port: 80
    targetPort: 3000
    name: http
  - port: 9090
    targetPort: 9090
    name: metrics
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-orchestrator-hpa
  namespace: ai-automation
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-orchestrator
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: agent_queue_depth
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Pods
        value: 4
        periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Pods
        value: 2
        periodSeconds: 120
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: agent-orchestrator-pdb
  namespace: ai-automation
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: agent-orchestrator

Monitoring Stack Integration

# prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: agent-orchestrator-alerts
  namespace: monitoring
spec:
  groups:
  - name: agent-orchestrator
    rules:
    - alert: HighAgentFailureRate
      expr: |
        (
          sum(rate(agent_tasks_failed_total[5m]))
          /
          sum(rate(agent_tasks_total[5m]))
        ) > 0.1
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High agent failure rate detected"
        description: "Agent failure rate is {{ $value | humanizePercentage }} over last 5m"

    - alert: BudgetExceeded
      expr: agent_daily_spend_usd > 500
      for: 1m
      labels:
        severity: critical
      annotations:
        summary: "Daily budget exceeded"
        description: "Daily spend of ${{ $value }} exceeds $500 limit"

    - alert: CircuitBreakerOpen
      expr: circuit_breaker_status == 1
      for: 1m
      labels:
        severity: warning
      annotations:
        summary: "Circuit breaker is open"
        description: "Circuit {{ $labels.name }} has been open for >1m"

    - alert: DLQGrowing
      expr: |
        (
          deriv(agent_dlq_size[10m]) > 0.1
        )
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "Dead letter queue is growing"
        description: "DLQ size increasing at {{ $value }} events/min"

    - alert: HighLatency
      expr: |
        histogram_quantile(0.95,
          sum(rate(agent_duration_seconds_bucket[5m])) by (le)
        ) > 30
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "High agent latency"
        description: "P95 latency is {{ $value }}s"

Conclusion: Building for Scale

Production-grade agent orchestration requires moving beyond simple API calls to sophisticated patterns that handle scale gracefully:

1. Event-Driven Architecture

  • Decouple agents through message buses
  • Use standardized event schemas
  • Implement proper routing and filtering

2. Resilience Patterns

  • Circuit breakers prevent cascading failures
  • Exponential backoff with jitter avoids retry storms
  • Dead letter queues capture unrecoverable failures

3. Cost Management

  • Token optimization saves 50-80% on API costs
  • Budget-based rate limiting prevents runaway spending
  • Intelligent model selection matches task to capability

4. Observability

  • Distributed tracing tracks requests across agents
  • Real-time metrics enable proactive monitoring
  • Structured logging simplifies debugging

Key Takeaways:

  • Start with event-driven architecture from day one—retrofitting is painful
  • Implement circuit breakers before you need them
  • Monitor cost per task as closely as latency
  • Plan for 10x scale—designs that work for 10 agents often fail at 100

The difference between a prototype agent system and a production one isn't the sophistication of the AI—it's the robustness of the orchestration layer beneath it.


Ready to scale your agent systems? Contact Tropical Media at https://tropical-media.work for expert consultation on production-grade AI agent architecture.

Tags: AI Agent Orchestration, Event-Driven Architecture, Message Queues, Redis Streams, n8n Scalability, Circuit Breakers, Cost Optimization, Production AI, Multi-Agent Systems, Fault Tolerance, Distributed Tracing, Kubernetes, 2026 AI Trends