AI Agent Orchestration at Scale: Event-Driven Architecture for Enterprise n8n Deployments
AI Agent Orchestration at Scale: Event-Driven Architecture for Enterprise n8n Deployments
A comprehensive guide to building high-throughput, resilient AI agent workflows using event-driven patterns, message queues, and enterprise-grade n8n deployments.
1. Introduction: Why Event-Driven Architecture for AI Agents
The landscape of AI agent deployment has fundamentally shifted. What began as simple chatbot implementations has evolved into complex, multi-agent orchestration systems processing millions of events daily. Traditional request-response architectures, while sufficient for basic integrations, buckle under the weight of modern AI workloads that demand high throughput, guaranteed delivery, and complex state management across distributed systems.
The Challenge of Scale
Modern AI agent deployments face unprecedented demands:
- Event Volume: Enterprises now process 10,000+ events per second in real-time AI pipelines
- Agent Complexity: Single workflows often orchestrate dozens of specialized AI agents
- State Management: Long-running agent conversations require sophisticated persistence
- Reliability Requirements: Financial and healthcare applications demand 99.99% uptime
- Latency Sensitivity: Customer-facing agents must respond within milliseconds
Traditional synchronous architectures create bottlenecks that cascade through systems. When an AI agent needs to query a vector database, call an LLM API, update CRM records, and notify downstream services—all while maintaining conversation context—the limitations become painfully apparent.
The Event-Driven Solution
Event-driven architecture (EDA) decouples these concerns, enabling:
- Asynchronous Processing: Agents communicate via events, eliminating blocking operations
- Horizontal Scalability: Add consumers to handle increased load without architectural changes
- Resilience: Failed operations retry automatically through dead letter queues
- Observability: Every state change is recorded as an event, creating complete audit trails
- Flexibility: New agents subscribe to relevant events without modifying existing systems
┌─────────────────────────────────────────────────────────────────┐
│ EVENT-DRIVEN AI PLATFORM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ Events ┌──────────────┐ │
│ │ Producer │ ──────────────> │ Message │ │
│ │ Agents │ │ Queue │ │
│ └──────────────┘ │ (Kafka/ │ │
│ │ RabbitMQ) │ │
│ ┌──────────────┐ Events └──────┬───────┘ │
│ │ Consumer │ <──────────────────────┘ │
│ │ Agents │ │
│ └──────────────┘ │
│ │
│ ┌──────────────┐ Events ┌──────────────┐ │
│ │ AI Agent │ <────────────── │ n8n │ │
│ │ Orchestrator │ ──────────────> │ Workflows │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Why n8n for Event-Driven AI?
n8n has emerged as a powerful platform for event-driven AI orchestration:
- Native Event Support: Built-in triggers for Kafka, RabbitMQ, Redis, and webhooks
- Visual Workflow Design: Complex event flows become manageable through the UI
- Self-Hosted Option: Deploy on-premises for data sovereignty requirements
- Extensive Integration: 400+ nodes including OpenAI, Anthropic, and vector databases
- Code When Needed: JavaScript/Python code nodes for custom event processing
This guide explores how to architect enterprise-grade AI agent systems using n8n and event-driven patterns, from foundational concepts to production deployment strategies.
2. Core Concepts: Events, Event Streams, and Event Sourcing
Understanding event-driven architecture requires mastering three fundamental concepts that form the bedrock of scalable AI agent systems.
Events: The Atomic Unit of Communication
An event represents something that has happened—a fact that cannot be changed. In AI agent systems, events capture:
- User Interactions: Messages sent, commands issued, preferences updated
- Agent Actions: Decisions made, tools invoked, responses generated
- System Changes: Configuration updates, model deployments, scaling events
- External Triggers: Webhooks, IoT sensor readings, third-party notifications
// Example AI Agent Event Structure
interface AIAgentEvent {
eventId: string; // UUID v4
eventType: string; // "user.message.received"
timestamp: string; // ISO 8601 UTC
correlationId: string; // Links related events
causationId?: string; // Previous event that triggered this
payload: {
agentId: string;
sessionId: string;
userId: string;
intent?: string;
entities?: Record<string, any>;
metadata?: Record<string, any>;
};
version: number; // Event schema version
}
Event Streams: The Continuous Flow
Event streams represent the continuous flow of events through your system. Unlike batch processing, streams enable:
- Real-Time Processing: Events processed as they occur
- Temporal Queries: Analyze sequences of events over time
- Replay Capabilities: Reconstruct system state from event history
- Parallel Consumption: Multiple consumers process the same stream independently
┌─────────────────────────────────────────────────────────────┐
│ EVENT STREAM │
├─────────────────────────────────────────────────────────────┤
│ │
│ Time ─────────────────────────────────────────────────> │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Event 1 │ │ Event 2 │ │ Event 3 │ │ Event 4 │ ... │
│ │ User │ │ Agent │ │ Tool │ │ User │ │
│ │ Message │ │ Started │ │ Called │ │ Response│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Partition 1: agent.session.abc123 │
│ Partition 2: agent.session.def456 │
│ Partition 3: agent.session.ghi789 │
│ │
└─────────────────────────────────────────────────────────────┘
Event Sourcing: State as a Derived Concept
Event sourcing revolutionizes how we think about state. Instead of storing current state directly, we store the events that led to that state. Current state becomes a projection—something we can recalculate at any time.
Traditional Approach:
UPDATE agent_sessions
SET status = 'completed', last_response = 'Thank you!'
WHERE session_id = 'abc123';
Event Sourcing Approach:
INSERT INTO events (type, payload) VALUES
('agent.session.started', '{"session_id": "abc123"}'),
('user.message.received', '{"session_id": "abc123", "text": "Hello"}'),
('agent.response.generated', '{"session_id": "abc123", "response": "Hi! How can I help?"}'),
('user.message.received', '{"session_id": "abc123", "text": "Thanks, bye"}'),
('agent.session.completed', '{"session_id": "abc123", "summary": "..."}');
Benefits for AI Agent Systems:
- Complete Audit Trail: Every decision, every context change, every LLM call is preserved
- Temporal Debugging: Reconstruct exactly what the agent knew at any point
- Model Training Data: Rich historical data for fine-tuning and evaluation
- Compliance: Regulatory requirements for explainable AI become achievable
- Experimentation: Replay events against new agent versions for A/B testing
Event Schema Evolution
AI systems evolve rapidly. Event schemas must accommodate change without breaking existing consumers:
{
"eventId": "550e8400-e29b-41d4-a716-446655440000",
"eventType": "agent.tool.invoked",
"timestamp": "2026-05-25T10:15:30Z",
"version": 2,
"schema": "https://api.example.com/schemas/agent-tool-invoked/v2",
"payload": {
"agentId": "customer-support-v2.3",
"sessionId": "sess_abc123",
"toolName": "vector_search",
"toolVersion": "1.5.0",
"input": {
"query": "return policy",
"topK": 5
},
"output": {
"results": [...],
"latency_ms": 245
},
"metadata": {
"model": "gpt-4o",
"temperature": 0.7,
"_legacy_field": "deprecated but preserved"
}
}
}
Schema Evolution Strategies:
- Additive Changes Only: Add new fields, never remove existing ones
- Default Values: Ensure backward compatibility for optional fields
- Version Negotiation: Consumers declare supported schema versions
- Event Transformation: Middleware transforms between versions when necessary
3. Message Queue Technologies Comparison: RabbitMQ vs Kafka vs Redis
Selecting the right message queue is critical for AI agent orchestration. Each technology offers distinct trade-offs in throughput, latency, durability, and operational complexity.
Apache Kafka: The Streaming Platform
Kafka has become the de facto standard for high-throughput event streaming.
Strengths:
- Throughput: Millions of events per second per cluster
- Retention: Configurable retention (days to years) with tiered storage
- Replay: Consumers can rewind and replay events
- Partitioning: Natural parallelization across consumer groups
- Ecosystem: Kafka Streams, ksqlDB, Connect for stream processing
Considerations:
- Latency: Typical latency 10-100ms (not sub-millisecond)
- Operational Complexity: Requires ZooKeeper/KRaft, careful broker tuning
- Resource Intensive: Designed for horizontal scaling with commodity hardware
# docker-compose.yml for Kafka Development
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
Kafka Topic Configuration for AI Agents:
# Create topic for high-throughput agent events
kafka-topics.sh --create \
--topic ai-agent-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=lz4 \
--bootstrap-server kafka:9092
# Create compacted topic for agent state (event sourcing)
kafka-topics.sh --create \
--topic ai-agent-state \
--partitions 6 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--bootstrap-server kafka:9092
RabbitMQ: The Reliable Message Broker
RabbitMQ excels at complex routing and guaranteed message delivery.
Strengths:
- Latency: Sub-millisecond latency possible
- Routing: Sophisticated exchange types (direct, topic, fanout, headers)
- Delivery Guarantees: Publisher confirms, consumer acknowledgments
- Dead Letter Handling: Built-in dead letter exchanges
- Operational Simplicity: Single node operation, easy clustering
Considerations:
- Throughput: 20,000-50,000 messages/second per node
- Memory Management: Messages held in memory by default (can page to disk)
- Clustering Limitations: Not all features work seamlessly in clusters
# docker-compose.yml for RabbitMQ
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secure_password
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
volumes:
rabbitmq_data:
RabbitMQ Configuration for AI Workflows:
# rabbitmq.conf
# Enable quorum queues for durability
queue_type = quorum
# Stream queues for high-throughput scenarios
# (RabbitMQ 3.9+)
queue_master_locator = min-masters
# Consumer prefetch for agent workers
consumer_timeout = 300000
# Message TTL for transient agent events
# (long-running sessions might need longer)
message_ttl = 3600000
Redis Streams: The In-Memory Option
Redis Streams offers a lightweight alternative for scenarios prioritizing speed.
Strengths:
- Speed: Sub-millisecond latency, millions of ops/second
- Simplicity: Single Redis instance or cluster
- Data Structures: Rich ecosystem (caching, sessions, pub/sub)
- Consumer Groups: Built-in consumer group support
Considerations:
- Durability: Memory-first (persistence available but adds latency)
- Retention: Manual stream trimming required
- Not Pure Message Queue: Different semantics than traditional MQs
# Redis CLI commands for AI Agent Streams
# Add event to stream
XADD ai-agent-events * \
eventType user.message.received \
sessionId sess_abc123 \
userId user_456 \
message "Hello, I need help"
# Create consumer group
XGROUP CREATE ai-agent-events agent-workers $ MKSTREAM
# Read events as consumer
XREADGROUP GROUP agent-workers worker-1 \
COUNT 10 \
BLOCK 5000 \
STREAMS ai-agent-events >
# Acknowledge processing
XACK ai-agent-events agent-workers 1526569495631-0
# Trim old events (keep last 10000)
XTRIM ai-agent-events MAXLEN ~ 10000
Technology Comparison Matrix
| Feature | Apache Kafka | RabbitMQ | Redis Streams |
|---|---|---|---|
| Max Throughput | Millions/sec | 50K/sec/node | Millions/sec |
| Latency | 10-100ms | <1ms | <1ms |
| Durability | Disk-based | Configurable | Memory-first |
| Message Replay | Yes (configurable) | No (queue-based) | Limited |
| Ordering Guarantees | Per partition | Per queue | Per stream |
| Complex Routing | Limited | Excellent | Limited |
| Dead Letter Queue | Manual | Built-in | Manual |
| Operational Complexity | High | Medium | Low |
| Best For | Event sourcing, analytics | Task queues, RPC | Caching, sessions |
Hybrid Architectures
Many enterprise deployments use multiple technologies:
┌─────────────────────────────────────────────────────────────┐
│ HYBRID MQ ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Kafka │ ◄──── Event Store (Event Sourcing) │
│ │ │ Analytics, Audit Trail │
│ └────┬─────┘ │
│ │ │
│ │ replicate (selected events) │
│ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ RabbitMQ │◄───►│ n8n │◄───►│ LLM │ │
│ │ │ │ Workflows │ │ APIs │ │
│ └────┬─────┘ └──────────┘ └──────────┘ │
│ │ │
│ │ state/session cache │
│ ▼ │
│ ┌──────────┐ │
│ │ Redis │ ◄──── Agent Context, Sessions │
│ │ │ Rate Limiting │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
4. n8n Event-Driven Architecture Patterns
n8n provides native support for event-driven patterns through triggers, webhooks, and message queue nodes. Understanding these patterns enables sophisticated AI agent orchestration.
Pattern 1: Event-Driven Workflow Triggers
The foundation of event-driven n8n: workflows that activate on external events.
Kafka Trigger Configuration:
{
"nodes": [
{
"parameters": {
"topic": "ai-agent-events",
"groupId": "n8n-agent-workers",
"options": {
"fromOffset": "latest"
}
},
"name": "Kafka Trigger",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1,
"position": [250, 300]
}
]
}
RabbitMQ Trigger Configuration:
{
"nodes": [
{
"parameters": {
"queue": "agent-task-queue",
"options": {
"durable": true,
"acknowledge": true
}
},
"name": "RabbitMQ Trigger",
"type": "n8n-nodes-base.rabbitmqTrigger",
"typeVersion": 1,
"position": [250, 300]
}
]
}
Redis Trigger Configuration:
{
"nodes": [
{
"parameters": {
"channel": "agent-events",
"options": {
"mode": "pattern"
}
},
"name": "Redis Trigger",
"type": "n8n-nodes-base.redisTrigger",
"typeVersion": 1,
"position": [250, 300]
}
]
}
Pattern 2: Event Router (Content-Based Router)
Route events to specialized agent workflows based on event type or content.
┌─────────────────────────────────────────────────────────────┐
│ EVENT ROUTER PATTERN │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ │
│ │ Kafka Trigger │ │
│ │ (all events) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Switch │ │
│ │ (event type) │ │
│ └───────┬───────┘ │
│ │ │
│ ┌─────┼─────┬────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │Cust│ │Tech│ │Sales│ │Fraud│ │
│ │Serv│ │Supp│ │Agent│ │Det │ │
│ │ WF │ │ WF │ │ WF │ │ WF │ │
│ └────┘ └────┘ └────┘ └────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
n8n Switch Node Implementation:
// Switch node - Routing Rules
[
{
"value": "customer.support.request",
"output": 0
},
{
"value": "technical.support.escalation",
"output": 1
},
{
"value": "sales.inquiry",
"output": 2
},
{
"value": "security.fraud.alert",
"output": 3
}
]
Pattern 3: Scatter-Gather for Parallel Agent Processing
Distribute work to multiple AI agents and aggregate results.
┌─────────────────────────────────────────────────────────────┐
│ SCATTER-GATHER PATTERN │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ │
│ │ Event │ │
│ │ Received │ │
│ └────┬─────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Sentiment│ │ Entity │ │ Intent │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └────────────┼────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Aggregate│ │
│ │ Results │ │
│ └────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Response │ │
│ │ Agent │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
n8n Parallel Execution:
{
"nodes": [
{
"parameters": {},
"name": "Event Received",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"mode": "runOnceForEachItem"
},
"name": "Split Agents",
"type": "n8n-nodes-base.splitInBatches",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Sentiment Analysis Agent\nreturn [{\n json: {\n sentiment: analyzeSentiment($input.first().json.message),\n confidence: 0.94\n }\n}];"
},
"name": "Sentiment Agent",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Entity Extraction Agent\nreturn [{\n json: {\n entities: extractEntities($input.first().json.message),\n confidence: 0.89\n }\n}];"
},
"name": "Entity Agent",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Aggregate all agent results\nconst items = $input.all();\nconst sentiment = items.find(i => i.json.sentiment);\nconst entities = items.find(i => i.json.entities);\nreturn [{\n json: {\n sentiment: sentiment?.json.sentiment,\n entities: entities?.json.entities,\n readyForResponse: true\n }\n}];"
},
"name": "Aggregate Results",
"type": "n8n-nodes-base.function",
"typeVersion": 1
}
],
"connections": {
"Event Received": {
"main": [
[
{
"node": "Sentiment Agent",
"type": "main",
"index": 0
},
{
"node": "Entity Agent",
"type": "main",
"index": 0
}
]
]
},
"Sentiment Agent": {
"main": [
[
{
"node": "Aggregate Results",
"type": "main",
"index": 0
}
]
]
},
"Entity Agent": {
"main": [
[
{
"node": "Aggregate Results",
"type": "main",
"index": 0
}
]
]
}
}
}
Pattern 4: Circuit Breaker for Resilient Agent Calls
Protect downstream services (LLM APIs, vector stores) from cascade failures.
// Circuit Breaker Code Node in n8n
const CircuitBreakerState = {
CLOSED: 'CLOSED', // Normal operation
OPEN: 'OPEN', // Failing fast
HALF_OPEN: 'HALF_OPEN' // Testing recovery
};
let state = $getWorkflowStaticData('global').circuitState || CircuitBreakerState.CLOSED;
let failureCount = $getWorkflowStaticData('global').failureCount || 0;
const FAILURE_THRESHOLD = 5;
const TIMEOUT_MS = 30000;
const RESET_TIMEOUT_MS = 60000;
// Check if circuit is open
if (state === CircuitBreakerState.OPEN) {
const lastFailure = $getWorkflowStaticData('global').lastFailureTime;
if (Date.now() - lastFailure > RESET_TIMEOUT_MS) {
state = CircuitBreakerState.HALF_OPEN;
$getWorkflowStaticData('global').circuitState = state;
} else {
return [{
json: {
error: 'Circuit breaker is OPEN',
retryAfter: RESET_TIMEOUT_MS - (Date.now() - lastFailure)
}
}];
}
}
try {
// Attempt LLM API call
const result = await callLLMAPI($input.first().json);
// Success - reset if was half-open
if (state === CircuitBreakerState.HALF_OPEN) {
state = CircuitBreakerState.CLOSED;
failureCount = 0;
$getWorkflowStaticData('global').circuitState = state;
$getWorkflowStaticData('global').failureCount = failureCount;
}
return [{ json: result }];
} catch (error) {
failureCount++;
$getWorkflowStaticData('global').failureCount = failureCount;
$getWorkflowStaticData('global').lastFailureTime = Date.now();
if (failureCount >= FAILURE_THRESHOLD) {
state = CircuitBreakerState.OPEN;
$getWorkflowStaticData('global').circuitState = state;
}
// Queue for retry via dead letter queue
return [{
json: {
error: error.message,
shouldRetry: failureCount < FAILURE_THRESHOLD,
circuitState: state
}
}];
}
Pattern 5: Outbox Pattern for Transactional Event Publishing
Ensure events are published if and only if database transactions succeed.
┌─────────────────────────────────────────────────────────────┐
│ OUTBOX PATTERN │
├─────────────────────────────────────────────────────────────┤
│ │
│ Application Code: │
│ ┌────────────────────────────────────────────────────┐ │
│ │ BEGIN TRANSACTION │ │
│ │ INSERT INTO agent_sessions (...) │ │
│ │ INSERT INTO outbox_events (...) ◄── Same TX │ │
│ │ COMMIT │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ │ Transaction succeeds │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ n8n Outbox Processor (polls every 5 seconds) │ │
│ │ 1. SELECT * FROM outbox_events WHERE processed │ │
│ │ 2. Publish to Kafka/RabbitMQ │ │
│ │ 3. DELETE/MARK as processed │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Event Consumers (other n8n workflows, agents) │ │
│ └────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
n8n Outbox Polling Workflow:
{
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"field": "seconds",
"secondsInterval": 5
}
]
}
},
"name": "Poll Outbox",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT * FROM outbox_events WHERE processed = false ORDER BY created_at ASC LIMIT 100"
},
"name": "Fetch Outbox Events",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"topic": "{{ $json.event_topic }}",
"messages": [
{
"key": "{{ $json.event_key }}",
"message": "={{ $json.event_payload }}"
}
]
},
"name": "Publish to Kafka",
"type": "n8n-nodes-base.kafka",
"typeVersion": 1
},
{
"parameters": {
"operation": "executeQuery",
"query": "UPDATE outbox_events SET processed = true, processed_at = NOW() WHERE id = {{ $json.id }}"
},
"name": "Mark Processed",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
}
]
}
5. Event Sourcing for Agent State Management
Event sourcing transforms how AI agents maintain and recover state, enabling sophisticated patterns for conversation management, debugging, and compliance.
The Agent State Challenge
Traditional state management for AI agents faces several challenges:
- Context Loss: Database failures can corrupt conversation context
- Debugging Difficulty: Reconstructing why an agent responded a certain way is nearly impossible
- Audit Requirements: Regulatory compliance demands complete interaction histories
- Version Migration: Upgrading agent logic breaks existing conversations
- Multi-Modal State: Text, images, audio, and tool results must be unified
Event Sourcing Architecture for Agents
┌─────────────────────────────────────────────────────────────┐
│ EVENT SOURCED AGENT ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Event │ │
│ │ Store │ ┌──────────────┐ │
│ │ (Kafka/ │◄─────────│ Command │ │
│ │ Database) │ │ Handler │ │
│ └──────┬───────┘ └──────────────┘ │
│ │ │
│ │ Append Only │
│ ▼ │
│ ┌──────────────┐ │
│ │ Event │ ┌──────────────┐ │
│ │ Stream │─────────►│ Projections │ │
│ │ │ │ (Read Model) │ │
│ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Agent │ │
│ │ State │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Agent Event Types
Define comprehensive event types for complete state reconstruction:
// Core Agent Events
interface AgentSessionStarted {
type: 'agent.session.started';
payload: {
sessionId: string;
agentId: string;
userId: string;
metadata: {
source: 'web' | 'mobile' | 'api';
ipAddress: string;
userAgent: string;
};
initialContext?: Record<string, any>;
};
}
interface UserMessageReceived {
type: 'user.message.received';
payload: {
sessionId: string;
messageId: string;
content: string;
contentType: 'text' | 'image' | 'voice' | 'file';
attachments?: Attachment[];
metadata: {
timestamp: string;
timezone: string;
locale: string;
};
};
}
interface IntentClassified {
type: 'intent.classified';
payload: {
sessionId: string;
messageId: string;
intent: string;
confidence: number;
entities: Entity[];
alternatives: IntentAlternative[];
};
}
interface ToolInvoked {
type: 'tool.invoked';
payload: {
sessionId: string;
toolName: string;
toolVersion: string;
input: Record<string, any>;
invocationId: string;
};
}
interface ToolCompleted {
type: 'tool.completed';
payload: {
sessionId: string;
invocationId: string;
output: any;
durationMs: number;
success: boolean;
error?: string;
};
}
interface LLMRequested {
type: 'llm.requested';
payload: {
sessionId: string;
requestId: string;
model: string;
messages: ChatMessage[];
parameters: LLMParameters;
systemPrompt?: string;
};
}
interface LLMResponded {
type: 'llm.responded';
payload: {
sessionId: string;
requestId: string;
response: string;
tokens: {
prompt: number;
completion: number;
total: number;
};
latencyMs: number;
finishReason: string;
};
}
interface AgentResponseGenerated {
type: 'agent.response.generated';
payload: {
sessionId: string;
responseId: string;
content: string;
contentType: 'text' | 'structured';
actions?: AgentAction[];
confidence: number;
};
}
interface SessionContextUpdated {
type: 'session.context.updated';
payload: {
sessionId: string;
updateType: 'memory' | 'preference' | 'state';
key: string;
oldValue?: any;
newValue: any;
};
}
interface AgentSessionEnded {
type: 'agent.session.ended';
payload: {
sessionId: string;
reason: 'user_close' | 'timeout' | 'completed' | 'error';
summary?: string;
durationSeconds: number;
messageCount: number;
};
}
n8n Event Sourcing Implementation
{
"name": "Event Sourced Agent Workflow",
"nodes": [
{
"parameters": {
"topic": "agent-commands",
"groupId": "agent-command-handlers",
"options": {
"fromOffset": "latest"
}
},
"name": "Command Receiver",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Load current state from event store\nconst sessionId = $input.first().json.sessionId;\n\n// Query events from database\nconst events = await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n SELECT * FROM agent_events \n WHERE session_id = '${sessionId}' \n ORDER BY event_sequence ASC\n `\n});\n\n// Rebuild state through event application\nlet state = {\n sessionId,\n messages: [],\n context: {},\n metadata: {},\n lastEventSequence: 0\n};\n\nfor (const event of events) {\n state = applyEvent(state, event);\n state.lastEventSequence = event.event_sequence;\n}\n\nreturn [{\n json: {\n command: $input.first().json,\n currentState: state,\n nextSequence: state.lastEventSequence + 1\n }\n}];\n\nfunction applyEvent(state, event) {\n const newState = JSON.parse(JSON.stringify(state));\n \n switch (event.event_type) {\n case 'agent.session.started':\n newState.metadata = event.payload.metadata;\n break;\n case 'user.message.received':\n newState.messages.push({\n role: 'user',\n content: event.payload.content,\n timestamp: event.payload.metadata.timestamp\n });\n break;\n case 'agent.response.generated':\n newState.messages.push({\n role: 'assistant',\n content: event.payload.content,\n timestamp: new Date().toISOString()\n });\n break;\n case 'session.context.updated':\n newState.context[event.payload.key] = event.payload.newValue;\n break;\n }\n \n return newState;\n}"
},
"name": "Rehydrate State",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Process command and generate events\nconst { command, currentState, nextSequence } = $input.first().json;\nconst events = [];\n\nswitch (command.type) {\n case 'PROCESS_USER_MESSAGE':\n // Event 1: User message received\n events.push({\n event_type: 'user.message.received',\n event_sequence: nextSequence,\n payload: {\n sessionId: command.sessionId,\n messageId: generateId(),\n content: command.message,\n contentType: 'text',\n metadata: {\n timestamp: new Date().toISOString(),\n timezone: command.timezone\n }\n }\n });\n \n // Event 2: Intent classified (simulated - would call NLP service)\n events.push({\n event_type: 'intent.classified',\n event_sequence: nextSequence + 1,\n payload: {\n sessionId: command.sessionId,\n intent: classifyIntent(command.message),\n confidence: 0.92,\n entities: extractEntities(command.message)\n }\n });\n \n // Event 3: Response generated\n events.push({\n event_type: 'agent.response.generated',\n event_sequence: nextSequence + 2,\n payload: {\n sessionId: command.sessionId,\n responseId: generateId(),\n content: generateResponse(command.message, currentState),\n confidence: 0.88\n }\n });\n break;\n}\n\nreturn events.map(e => ({ json: e }));\n\nfunction generateId() {\n return Math.random().toString(36).substring(2);\n}\n\nfunction classifyIntent(message) {\n // Would integrate with ML service\n return 'general_inquiry';\n}\n\nfunction extractEntities(message) {\n // Would integrate with NER service\n return [];\n}\n\nfunction generateResponse(message, state) {\n // Would call LLM with state context\n return 'Thank you for your message. How can I help?';\n}"
},
"name": "Generate Events",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"operation": "insert",
"table": "agent_events",
"columns": {
"string": [
{
"column": "event_id",\n "value": "={{ $json.event_id || $generateUUID() }}"\n },\n {\n "column": "event_type",\n "value": "={{ $json.event_type }}"\n },\n {\n "column": "session_id",\n "value": "={{ $json.payload.sessionId }}"\n },\n {\n "column": "event_sequence",\n "value": "={{ $json.event_sequence }}"\n },\n {\n "column": "payload",\n "value": "={{ JSON.stringify($json.payload) }}"\n },\n {\n "column": "created_at",\n "value": "={{ $now }}"\n }\n ]\n }\n },
"name": "Persist Events",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"topic": "agent-events",
"messages": [
{
"key": "={{ $json.session_id }}",
"message": "={{ JSON.stringify($json) }}"
}
]
},
"name": "Publish Events",
"type": "n8n-nodes-base.kafka",
"typeVersion": 1
}
]
}
Event Store Schema
-- Event store table for agent sessions
CREATE TABLE agent_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
event_version INT DEFAULT 1,
session_id VARCHAR(255) NOT NULL,
event_sequence BIGINT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
correlation_id UUID,
causation_id UUID,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed BOOLEAN DEFAULT FALSE,
-- Ensure event ordering within a session
CONSTRAINT unique_session_sequence UNIQUE (session_id, event_sequence)
);
-- Indexes for performance
CREATE INDEX idx_agent_events_session ON agent_events(session_id, event_sequence);
CREATE INDEX idx_agent_events_type ON agent_events(event_type);
CREATE INDEX idx_agent_events_correlation ON agent_events(correlation_id);
CREATE INDEX idx_agent_events_created ON agent_events(created_at);
-- GIN index for JSONB queries
CREATE INDEX idx_agent_events_payload ON agent_events USING GIN(payload);
-- Table for projections (read models)
CREATE TABLE agent_session_projections (
session_id VARCHAR(255) PRIMARY KEY,
state JSONB NOT NULL,
last_event_sequence BIGINT NOT NULL,
version INT DEFAULT 1,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Materialized view for analytics
CREATE MATERIALIZED VIEW agent_conversation_summary AS
SELECT
session_id,
COUNT(*) FILTER (WHERE event_type = 'user.message.received') as user_messages,
COUNT(*) FILTER (WHERE event_type = 'agent.response.generated') as agent_responses,
COUNT(*) FILTER (WHERE event_type = 'tool.invoked') as tools_used,
MIN(created_at) as started_at,
MAX(created_at) as last_activity,
EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) as duration_seconds
FROM agent_events
GROUP BY session_id;
-- Refresh schedule
CREATE INDEX idx_agent_session_projections_updated ON agent_session_projections(updated_at);
Snapshotting for Performance
Replaying thousands of events for each command becomes inefficient:
// Snapshot management in n8n
const SNAPSHOT_FREQUENCY = 100; // Every 100 events
async function maybeCreateSnapshot(sessionId, currentState, eventSequence) {
if (eventSequence % SNAPSHOT_FREQUENCY === 0) {
await $getAll('postgres', {
operation': 'executeQuery',
query: `
INSERT INTO agent_session_projections (session_id, state, last_event_sequence, updated_at)
VALUES ('${sessionId}', '${JSON.stringify(currentState)}', ${eventSequence}, NOW())
ON CONFLICT (session_id) DO UPDATE SET
state = EXCLUDED.state,
last_event_sequence = EXCLUDED.last_event_sequence,
updated_at = NOW(),
version = agent_session_projections.version + 1
`
});
}
}
// Optimized state rehydration
async function getStateWithSnapshot(sessionId) {
// Get latest snapshot
const snapshot = await $getAll('postgres', {
operation: 'executeQuery',
query: `SELECT * FROM agent_session_projections WHERE session_id = '${sessionId}'`
});
if (snapshot.length === 0) {
// No snapshot - replay all events
return replayEvents(sessionId, 0);
}
// Replay only events after snapshot
const { state, last_event_sequence } = snapshot[0];
const newEvents = await $getAll('postgres', {
operation: 'executeQuery',
query: `
SELECT * FROM agent_events
WHERE session_id = '${sessionId}'
AND event_sequence > ${last_event_sequence}
ORDER BY event_sequence ASC
`
});
return applyEvents(state, newEvents);
}
6. Saga Patterns for Distributed Agent Workflows
Saga patterns manage long-running, distributed transactions across multiple AI agents and services—critical for workflows where atomicity cannot be guaranteed.
The Distributed Agent Challenge
Consider an e-commerce order processing workflow:
- Validate order with inventory agent
- Process payment with payment agent
- Reserve shipping with logistics agent
- Send confirmation with notification agent
- Update analytics with reporting agent
If step 3 fails, previous steps must be compensated. This is the saga pattern's domain.
Saga Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ SAGA ORCHESTRATION │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ Saga │ │
│ │ Orchestrator │ │
│ │ (n8n workflow) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────┼───────┬──────────┬──────────┐ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │Inv. │ │Pay. │ │Ship │ │Notif│ │Analytics│ │
│ │Agent│ │Agent│ │Agent│ │Agent│ │ Agent │ │
│ └──┬──┘ └──┬──┘ └──┬──┘ └──┬──┘ └───┬────┘ │
│ │ │ │ │ │ │
│ └───────┴───────┴──────────┴──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Event Store │ │
│ │ (Saga State) │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Choreography vs. Orchestration
Choreography: Agents react to events independently.
┌─────────────────────────────────────────────────────────────┐
│ SAGA CHOREOGRAPHY │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌────────┐ order.created ┌────────┐ │
│ │ Order │ ──────────────────> │Inv. │ │
│ │ Agent │ │ Agent │ │
│ └────────┘ └────┬───┘ │
│ │ │
│ inv.reserved │ │
│ ┌────────┐ <────────────────────┘ │
│ │ Payment│ │
│ │ Agent │ ──────────────────> ┌────────┐ │
│ └────────┘ payment.processed│ Ship │ │
│ ▲ │ Agent │ │
│ │ ship.reserved └────┬───┘ │
│ └──────────────────────────────┘ │
│ │
│ No central coordinator - agents communicate via events │
└─────────────────────────────────────────────────────────────┘
Orchestration: Central coordinator manages the flow.
┌─────────────────────────────────────────────────────────────┐
│ SAGA ORCHESTRATION │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Orchestrator │ │
│ └──────┬───────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Inv. │ │Payment │ │Ship │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
│ Central coordinator directs each step │
└─────────────────────────────────────────────────────────────┘
n8n Saga Orchestrator Implementation
{
"name": "Order Processing Saga",
"nodes": [
{
"parameters": {
"topic": "order.saga.started",
"groupId": "saga-orchestrators",
"options": {
"fromOffset": "latest"
}
},
"name": "Saga Started",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Initialize saga state\nconst sagaId = $input.first().json.sagaId;\nconst orderId = $input.first().json.orderId;\n\nconst sagaState = {\n sagaId,\n orderId,\n status: 'STARTED',\n steps: [\n { step: 'INVENTORY_RESERVE', status: 'PENDING', compensations: [] },\n { step: 'PAYMENT_PROCESS', status: 'PENDING', compensations: [] },\n { step: 'SHIPPING_RESERVE', status: 'PENDING', compensations: [] },\n { step: 'NOTIFICATION_SEND', status: 'PENDING', compensations: [] }\n ],\n currentStep: 0,\n startedAt: new Date().toISOString()\n};\n\nawait saveSagaState(sagaState);\n\nreturn [{ json: sagaState }];\n\nasync function saveSagaState(state) {\n await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n INSERT INTO saga_instances (saga_id, state, updated_at)\n VALUES ('${state.sagaId}', '${JSON.stringify(state)}', NOW())\n ON CONFLICT (saga_id) DO UPDATE SET\n state = EXCLUDED.state,\n updated_at = NOW()\n `\n });\n}"
},
"name": "Initialize Saga",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Execute current step\nconst state = $input.first().json;\nconst currentStep = state.steps[state.currentStep];\n\nif (!currentStep) {\n // Saga complete\n return [{ json: { ...state, status: 'COMPLETED' } }];\n}\n\n// Execute step based on type\nlet result;\nswitch (currentStep.step) {\n case 'INVENTORY_RESERVE':\n result = await executeInventoryReserve(state.orderId);\n break;\n case 'PAYMENT_PROCESS':\n result = await executePaymentProcess(state.orderId);\n break;\n case 'SHIPPING_RESERVE':\n result = await executeShippingReserve(state.orderId);\n break;\n case 'NOTIFICATION_SEND':\n result = await executeNotification(state.orderId);\n break;\n}\n\nif (result.success) {\n state.steps[state.currentStep].status = 'COMPLETED';\n state.steps[state.currentStep].result = result.data;\n state.currentStep++;\n} else {\n state.steps[state.currentStep].status = 'FAILED';\n state.steps[state.currentStep].error = result.error;\n await compensate(state);\n state.status = 'COMPENSATED';\n}\n\nawait saveSagaState(state);\nreturn [{ json: state }];\n\nasync function executeInventoryReserve(orderId) {\n // Publish command to inventory service\n await $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'inventory.commands',\n message: {\n type: 'RESERVE_STOCK',\n orderId,\n sagaId: state.sagaId\n }\n });\n \n // Wait for response (simplified - would use correlation)\n return { success: true, data: { reservationId: 'RES-123' } };\n}\n\nasync function compensate(state) {\n // Execute compensations in reverse order\n for (let i = state.currentStep - 1; i >= 0; i--) {\n const step = state.steps[i];\n if (step.status === 'COMPLETED') {\n await executeCompensation(step);\n step.status = 'COMPENSATED';\n }\n }\n}\n\nasync function executeCompensation(step) {\n switch (step.step) {\n case 'INVENTORY_RESERVE':\n await $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'inventory.commands',\n message: {\n type: 'RELEASE_STOCK',\n reservationId: step.result.reservationId\n }\n });\n break;\n case 'PAYMENT_PROCESS':\n await $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'payment.commands',\n message: {\n type: 'REFUND_PAYMENT',\n paymentId: step.result.paymentId\n }\n });\n break;\n }\n}"
},
"name": "Execute Step",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{ $json.status }}",
"value2": "COMPLETED"
}
]
}
},
"name": "Saga Complete?",
"type": "n8n-nodes-base.if",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Continue saga\nconst state = $input.first().json;\n\n// Re-trigger for next step\nawait $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'saga.orchestrator.continue',\n message: state\n});\n\nreturn [];"
},
"name": "Continue Saga",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Saga completed\nconst state = $input.first().json;\n\n// Publish completion event\nawait $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'order.saga.completed',\n message: {\n sagaId: state.sagaId,\n orderId: state.orderId,\n status: 'SUCCESS',\n completedAt: new Date().toISOString()\n }\n});\n\n// Send notification\nawait $getAll('httpRequest', {\n method: 'POST',\n url: 'https://api.notifications.com/send',\n body: {\n orderId: state.orderId,\n message: 'Your order has been confirmed!'\n }\n});\n\nreturn [{ json: { success: true } }];"
},
"name": "Saga Completed",
"type": "n8n-nodes-base.function",
"typeVersion": 1
}
]
}
Saga State Persistence
-- Saga persistence schema
CREATE TABLE saga_instances (
saga_id UUID PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
status VARCHAR(50) NOT NULL, -- STARTED, COMPLETED, FAILED, COMPENSATED
state JSONB NOT NULL,
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
completed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE TABLE saga_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_id UUID NOT NULL REFERENCES saga_instances(saga_id),
event_type VARCHAR(100) NOT NULL,
step_name VARCHAR(100),
payload JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_saga_events_saga ON saga_events(saga_id);
CREATE INDEX idx_saga_status ON saga_instances(status);
7. CQRS Implementation for Agent Systems
Command Query Responsibility Segregation (CQRS) separates read and write operations, optimizing each path independently for AI agent workloads.
Why CQRS for AI Agents?
AI agent systems have fundamentally different access patterns:
Commands (Writes):
- Infrequent but critical
- Require transaction integrity
- Update complex aggregate structures
- Trigger side effects (LLM calls, notifications)
Queries (Reads):
- High frequency (10-100x commands)
- Must be fast (< 10ms for real-time agents)
- Complex filtering ("conversations from premium users")
- Aggregation ("average response time by agent type")
CQRS Architecture
┌─────────────────────────────────────────────────────────────┐
│ CQRS ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ COMMAND SIDE │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Command │───►│ Event │ │ │
│ │ │ Handler │ │ Store │ │ │
│ │ └──────────────┘ └──────┬───────┘ │ │
│ └─────────────────────────────┼──────────┘ │
│ │ │
│ │ Events │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ PROJECTION SIDE │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Event │───►│ Read │ │ │
│ │ │ Projector │ │ Models │ │ │
│ │ └──────────────┘ └──────┬───────┘ │ │
│ └─────────────────────────────┼──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ QUERY SIDE │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Query │◄───│ Read │ │ │
│ │ │ Handler │ │ Database │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Command Side: Event Sourcing
The write model uses event sourcing for durability and auditability.
{
"name": "Agent Command Handler",
"nodes": [
{
"parameters": {
"topic": "agent.commands",
"groupId": "agent-command-handlers"
},
"name": "Command Receiver",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Validate and process command\nconst command = $input.first().json;\n\n// Validation\nconst validation = validateCommand(command);\nif (!validation.valid) {\n return [{\n json: {\n success: false,\n errors: validation.errors\n }\n }];\n}\n\n// Load aggregate\nconst aggregate = await loadAggregate(command.sessionId);\n\n// Execute command\nconst events = executeCommand(aggregate, command);\n\n// Save events\nfor (const event of events) {\n await persistEvent(event);\n}\n\nreturn [{ json: { success: true, events } }];\n\nfunction validateCommand(cmd) {\n const errors = [];\n if (!cmd.sessionId) errors.push('sessionId required');\n if (!cmd.type) errors.push('command type required');\n return { valid: errors.length === 0, errors };\n}\n\nasync function loadAggregate(sessionId) {\n // Rehydrate from events\n const events = await $getAll('postgres', {\n operation: 'executeQuery',\n query: `SELECT * FROM agent_events WHERE session_id = '${sessionId}' ORDER BY sequence`\n });\n return events.reduce(applyEvent, createEmptyAggregate());\n}\n\nfunction executeCommand(aggregate, command) {\n // Business logic here\n return [{\n type: 'AgentCommandExecuted',\n payload: {\n sessionId: command.sessionId,\n commandType: command.type,\n result: processBusinessLogic(aggregate, command)\n }\n }];\n}"
},
"name": "Handle Command",
"type": "n8n-nodes-base.function",
"typeVersion": 1
}
]
}
Projection Side: Denormalized Read Models
{
"name": "Agent Event Projector",
"nodes": [
{
"parameters": {
"topic": "agent-events",
"groupId": "agent-projectors"
},
"name": "Agent Events",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Project events to read models\nconst event = $input.first().json;\n\nswitch (event.eventType) {\n case 'agent.session.started':\n await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n INSERT INTO agent_session_summary (\n session_id, user_id, agent_id, status,\n started_at, message_count, created_at\n ) VALUES (\n '${event.payload.sessionId}',\n '${event.payload.userId}',\n '${event.payload.agentId}',\n 'active',\n '${event.timestamp}',\n 0,\n NOW()\n )\n `\n });\n break;\n \n case 'user.message.received':\n await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n UPDATE agent_session_summary\n SET message_count = message_count + 1,\n last_activity = NOW(),\n last_user_message = '${event.payload.content.substring(0, 500)}'\n WHERE session_id = '${event.payload.sessionId}'\n `\n });\n \n // Also update message timeline\n await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n INSERT INTO message_timeline (\n session_id, message_type, content,\n timestamp, sentiment_score\n ) VALUES (\n '${event.payload.sessionId}',\n 'user',\n '${event.payload.content.substring(0, 1000)}',\n '${event.timestamp}',\n ${event.payload.metadata?.sentiment || 'NULL'}\n )\n `\n });\n break;\n \n case 'agent.session.ended':\n await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n UPDATE agent_session_summary\n SET status = 'ended',\n ended_at = NOW(),\n duration_seconds = ${event.payload.durationSeconds},\n end_reason = '${event.payload.reason}'\n WHERE session_id = '${event.payload.sessionId}'\n `\n });\n break;\n}\n\nreturn [{ json: { projected: true } }];"
},
"name": "Project to Read Model",
"type": "n8n-nodes-base.function",
"typeVersion": 1
}
]
}
Read Model Schema
-- Optimized read models for queries
CREATE TABLE agent_session_summary (
session_id VARCHAR(255) PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
agent_id VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
ended_at TIMESTAMP WITH TIME ZONE,
duration_seconds INT,
message_count INT DEFAULT 0,
last_activity TIMESTAMP WITH TIME ZONE,
last_user_message TEXT,
sentiment_avg DECIMAL(3,2),
category VARCHAR(100),
priority INT DEFAULT 0,
tags TEXT[],
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indexes for common queries
CREATE INDEX idx_session_summary_user ON agent_session_summary(user_id);
CREATE INDEX idx_session_summary_agent ON agent_session_summary(agent_id);
CREATE INDEX idx_session_summary_status ON agent_session_summary(status);
CREATE INDEX idx_session_summary_activity ON agent_session_summary(last_activity);
CREATE INDEX idx_session_summary_category ON agent_session_summary(category);
-- Message timeline for conversation history
CREATE TABLE message_timeline (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) REFERENCES agent_session_summary(session_id),
message_type VARCHAR(50) NOT NULL, -- user, agent, system
content TEXT,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
latency_ms INT,
tokens_used INT,
model_version VARCHAR(100),
sentiment_score DECIMAL(3,2),
intent VARCHAR(100),
confidence DECIMAL(3,2)
);
CREATE INDEX idx_timeline_session ON message_timeline(session_id, timestamp);
CREATE INDEX idx_timeline_sentiment ON message_timeline(sentiment_score)
WHERE sentiment_score IS NOT NULL;
-- Aggregated analytics
CREATE TABLE agent_performance_daily (
date DATE PRIMARY KEY,
agent_id VARCHAR(255),
total_sessions INT DEFAULT 0,
total_messages INT DEFAULT 0,
avg_session_duration INT, -- seconds
avg_response_latency_ms INT,
avg_sentiment DECIMAL(3,2),
unique_users INT DEFAULT 0
);
-- Materialized view for real-time dashboards
CREATE MATERIALIZED VIEW active_sessions_view AS
SELECT
session_id,
user_id,
agent_id,
started_at,
last_activity,
EXTRACT(EPOCH FROM (NOW() - last_activity)) as idle_seconds,
message_count
FROM agent_session_summary
WHERE status = 'active'
AND last_activity > NOW() - INTERVAL '5 minutes';
-- Refresh every 30 seconds for real-time monitoring
-- (would typically use a cron job or trigger)
Query API in n8n
{
"name": "Agent Query API",
"nodes": [
{
"parameters": {
"path": "agent-sessions",
"responseMode": "responseNode"
},
"name": "HTTP Request",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Parse query parameters\nconst query = $input.first().json.query;\nconst params = new URLSearchParams(query);\n\nconst filters = {\n userId: params.get('userId'),\n agentId: params.get('agentId'),\n status: params.get('status'),\n from: params.get('from'),\n to: params.get('to'),\n limit: parseInt(params.get('limit')) || 50,\n offset: parseInt(params.get('offset')) || 0\n};\n\nreturn [{ json: filters }];"
},
"name": "Parse Query",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{ buildQuery($input.first().json) }}"
},
"name": "Query Database",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"respondWith": "json",
"responseBody": "={{ JSON.stringify({ sessions: $input.all(), total: $input.first().json.total }) }}"
},
"name": "Return Response",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1
}
]
}
8. Real-Time Stream Processing with n8n
Real-time processing enables AI agents to react to events within milliseconds—critical for fraud detection, live chat, and IoT-driven automation.
Stream Processing Architecture
┌─────────────────────────────────────────────────────────────┐
│ REAL-TIME STREAM PROCESSING │
├─────────────────────────────────────────────────────────────┤
│ │
│ Sources: │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Kafka │ │RabbitMQ│ │Webhooks│ │ IoT │ │
│ │Events │ │Queue │ │ │ │Sensors │ │
│ └───┬────┘ └────┬───┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │ │
│ └───────────┴─────────┴──────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ n8n Real-Time Workflows │ │
│ │ ┌─────────┐ ┌─────────┐ ┌────────┐│ │
│ │ │ Filter │─►│ Enrich │─►│ Process││ │
│ │ │ │ │ │ │ ││ │
│ │ └─────────┘ └─────────┘ └────────┘│ │
│ └────────────────┬──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ Sinks / Actions │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ Alert │ │ Update │ │ Notify │ │ │
│ │ │ Trigger│ │Database│ │ User │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Windowing Patterns
Process events in temporal windows for aggregation and pattern detection:
// Tumbling Window Implementation in n8n
const WINDOW_SIZE_MS = 60000; // 1 minute windows
const windows = new Map();
// Process event
const event = $input.first().json;
const windowKey = getWindowKey(event.timestamp, WINDOW_SIZE_MS);
if (!windows.has(windowKey)) {
windows.set(windowKey, {
start: windowKey * WINDOW_SIZE_MS,
end: (windowKey + 1) * WINDOW_SIZE_MS,
events: [],
aggregates: {}
});
}
const window = windows.get(windowKey);
window.events.push(event);
// Update aggregates
updateAggregates(window, event);
// Check if window is complete
if (Date.now() >= window.end) {
// Emit window results
const result = finalizeWindow(window);
windows.delete(windowKey);
return [{ json: result }];
}
return [];
function getWindowKey(timestamp, windowSize) {
return Math.floor(new Date(timestamp).getTime() / windowSize);
}
function updateAggregates(window, event) {
// Count by event type
window.aggregates[event.eventType] =
(window.aggregates[event.eventType] || 0) + 1;
// Sum sentiment scores
if (event.payload?.sentiment) {
window.aggregates.totalSentiment =
(window.aggregates.totalSentiment || 0) + event.payload.sentiment;
window.aggregates.sentimentCount =
(window.aggregates.sentimentCount || 0) + 1;
}
}
function finalizeWindow(window) {
return {
windowStart: new Date(window.start).toISOString(),
windowEnd: new Date(window.end).toISOString(),
eventCount: window.events.length,
aggregates: {
...window.aggregates,
avgSentiment: window.aggregates.totalSentiment / window.aggregates.sentimentCount
}
};
}
Stateful Stream Processing
Use Redis for maintaining state across events:
{
"nodes": [
{
"parameters": {
"topic": "user-behavior-events",
"groupId": "behavior-analyzers"
},
"name": "Behavior Events",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"operation": "get",
"key": "={{ 'user:' + $json.userId + ':session' }}"
},
"name": "Get Session State",
"type": "n8n-nodes-base.redis",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Update session state\nconst event = $input.first().json;\nconst existingState = $input.all()[1]?.json || {};\n\nconst sessionState = {\n userId: event.userId,\n events: [...(existingState.events || []), event],\n eventCount: (existingState.eventCount || 0) + 1,\n lastActivity: new Date().toISOString(),\n \n // Calculate behavioral metrics\n pageViews: event.eventType === 'page_view' ?
(existingState.pageViews || 0) + 1 : (existingState.pageViews || 0),\n \n timeOnSite: calculateTimeOnSite(existingState, event),\n \n // Anomaly detection\n velocity: calculateVelocity(existingState, event),\n \n // Intent scoring\n purchaseIntent: scorePurchaseIntent(existingState, event)\n};\n\n// Check for anomalies
const anomalies = detectAnomalies(sessionState);\n\nreturn [{\n json: {\n sessionState,\n anomalies,\n shouldAlert: anomalies.length > 0 || sessionState.purchaseIntent > 0.8\n }\n}];\n\nfunction calculateTimeOnSite(state, event) {\n if (!state.lastActivity) return 0;\n const last = new Date(state.lastActivity);\n const now = new Date(event.timestamp);\n return (state.timeOnSite || 0) + (now - last) / 1000;\n}\n\nfunction calculateVelocity(state, event) {\n const recentEvents = sessionState.events.filter(\n e => new Date() - new Date(e.timestamp) < 60000\n );\n return recentEvents.length;\n}\n\nfunction scorePurchaseIntent(state, event) {\n let score = state.purchaseIntent || 0;\n \n if (event.eventType === 'view_product') score += 0.1;\n if (event.eventType === 'add_to_cart') score += 0.3;\n if (event.eventType === 'view_checkout') score += 0.2;\n \n return Math.min(score, 1.0);\n}\n\nfunction detectAnomalies(state) {\n const anomalies = [];\n \n // High velocity anomaly\n if (state.velocity > 100) {\n anomalies.push({\n type: 'high_velocity',\n severity: 'warning',\n description: 'Unusually high activity rate'\n });\n }\n \n // Geographic anomaly (would need GeoIP data)\n // Time-based anomaly (activity at unusual hours)\n \n return anomalies;\n}"
},
"name": "Analyze Behavior",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"operation": "set",
"key": "={{ 'user:' + $json.sessionState.userId + ':session' }}",
"value": "={{ JSON.stringify($json.sessionState) }}",
"options": {
"expire": true,
"ttl": 3600
}
},
"name": "Save State to Redis",
"type": "n8n-nodes-base.redis",
"typeVersion": 1
}
]
}
Sliding Window for Continuous Analytics
// Sliding window for real-time metrics
const SLIDE_INTERVAL = 5000; // Slide every 5 seconds
const WINDOW_SIZE = 60000; // 60 second window
const events = getEventsFromBuffer(SLIDE_INTERVAL);
const windowStart = Date.now() - WINDOW_SIZE;
const windowEnd = Date.now();
// Filter events within window
const windowEvents = events.filter(e => {
const ts = new Date(e.timestamp).getTime();
return ts >= windowStart && ts <= windowEnd;
});
// Calculate real-time metrics
const metrics = {
timestamp: new Date().toISOString(),
window: { start: windowStart, end: windowEnd },
eventRate: windowEvents.length / (WINDOW_SIZE / 1000), // events/second
uniqueUsers: new Set(windowEvents.map(e => e.userId)).size,
eventTypes: countBy(windowEvents, 'eventType'),
avgSentiment: average(windowEvents.filter(e => e.sentiment).map(e => e.sentiment)),
p95Latency: percentile(windowEvents.map(e => e.latencyMs), 95)
};
// Store for dashboards
await $getAll('redis', {
operation: 'set',
key: 'metrics:realtime',
value: JSON.stringify(metrics)
});
return [{ json: metrics }];
9. Error Handling and Dead Letter Queues
Robust error handling is essential for production AI agent systems. Failed events must be captured, analyzed, and retried without losing data.
Error Handling Patterns
┌─────────────────────────────────────────────────────────────┐
│ ERROR HANDLING FLOW │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Event │ │
│ │ Received │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Try │◄──────────────────────────────┐ │
│ │ Process │ │ │
│ └──────┬──────┘ │ │
│ │ │ │
│ ┌─────┴─────┐ │ │
│ │ │ │ │
│ Success Error Retry │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌──────┐ ┌─────────┐ ┌──────────┐ │ │
│ │Ack │ │ Retry? │──No──►│ DLQ │ │ │
│ │Msg │ │ (count) │ │ (Analyze)│ │ │
│ └──────┘ └────┬────┘ └──────────┘ │ │
│ Yes │ │ │
│ └───────────────────────────────┘ │
│ (delay + back-off) │
│ │
└─────────────────────────────────────────────────────────────┘
Retry Logic with Exponential Backoff
// Retry configuration
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 1000;
const MAX_DELAY_MS = 60000;
async function processWithRetry(event) {
const retryCount = event.metadata?.retryCount || 0;
try {
// Attempt processing
const result = await processEvent(event);
return { success: true, result };
} catch (error) {
if (retryCount < MAX_RETRIES) {
// Calculate delay with exponential backoff + jitter
const delay = Math.min(
BASE_DELAY_MS * Math.pow(2, retryCount),
MAX_DELAY_MS
);
const jitter = Math.random() * 1000;
const totalDelay = delay + jitter;
// Update event with retry info
const retryEvent = {
...event,
metadata: {
...event.metadata,
retryCount: retryCount + 1,
lastError: error.message,
nextRetryAt: new Date(Date.now() + totalDelay).toISOString()
}
};
// Publish to retry queue with delay
await publishToRetryQueue(retryEvent, totalDelay);
return {
success: false,
willRetry: true,
retryCount: retryCount + 1,
delayMs: totalDelay
};
} else {
// Max retries exceeded - send to DLQ
await publishToDLQ(event, error);
return {
success: false,
willRetry: false,
reason: 'MAX_RETRIES_EXCEEDED',
error: error.message
};
}
}
}
async function publishToRetryQueue(event, delayMs) {
// Use delayed message queue (RabbitMQ or Redis)
await $getAll('redis', {
operation: 'zadd',
key: 'retry:queue',
score: Date.now() + delayMs,
member: JSON.stringify(event)
});
}
async function publishToDLQ(event, error) {
const dlqEvent = {
originalEvent: event,
failedAt: new Date().toISOString(),
error: {
message: error.message,
stack: error.stack,
code: error.code
},
retryCount: event.metadata?.retryCount || 0
};
// Persist to DLQ
await $getAll('postgres', {
operation: 'insert',
table: 'dead_letter_queue',
data: dlqEvent
});
// Also emit alert
await $getAll('httpRequest', {
method: 'POST',
url: 'https://alerts.example.com/dlq',
body: dlqEvent
});
}
Dead Letter Queue Implementation
-- Dead letter queue schema
CREATE TABLE dead_letter_queue (
id SERIAL PRIMARY KEY,
dlq_id UUID DEFAULT gen_random_uuid(),
original_event JSONB NOT NULL,
error_info JSONB NOT NULL,
retry_count INT DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(50) DEFAULT 'NEW', -- NEW, REVIEWED, RETRIED, ARCHIVED
reviewed_by VARCHAR(255),
reviewed_at TIMESTAMP WITH TIME ZONE,
resolution_notes TEXT
);
-- DLQ metrics view
CREATE VIEW dlq_metrics AS
SELECT
DATE(created_at) as date,
status,
COUNT(*) as count,
error_info->>'code' as error_code
FROM dead_letter_queue
GROUP BY DATE(created_at), status, error_info->>'code';
-- Index for quick lookup
CREATE INDEX idx_dlq_status ON dead_letter_queue(status);
CREATE INDEX idx_dlq_created ON dead_letter_queue(created_at);
n8n DLQ Processor Workflow
{
"name": "DLQ Processor",
"nodes": [
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT * FROM dead_letter_queue WHERE status = 'NEW' ORDER BY created_at ASC LIMIT 100"
},
"name": "Fetch DLQ Items",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Analyze DLQ items\nconst items = $input.all();\n\nconst analysis = items.map(item => {\n const error = item.json.error_info;\n const event = item.json.original_event;\n \n // Classify errors\n let category = 'UNKNOWN';\n let actionable = false;\n let suggestedAction = null;\n \n if (error.message?.includes('timeout')) {\n category = 'TIMEOUT';\n actionable = true;\n suggestedAction = 'RETRY_WITH_BACKOFF';\n } else if (error.message?.includes('rate limit')) {\n category = 'RATE_LIMIT';\n actionable = true;\n suggestedAction = 'RETRY_AFTER_DELAY';\n } else if (error.message?.includes('validation')) {\n category = 'VALIDATION';\n actionable = false;\n suggestedAction = 'MANUAL_REVIEW';\n } else if (error.code === 'ECONNREFUSED') {\n category = 'CONNECTIVITY';\n actionable = true;\n suggestedAction = 'RETRY';\n }\n \n return {\n dlqId: item.json.dlq_id,\n category,\n actionable,\n suggestedAction,\n errorMessage: error.message,\n eventType: event.event_type\n };\n});\n\nreturn analysis.map(a => ({ json: a }));"
},
"name": "Analyze Errors",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{ $json.actionable }}",
"value2": true
}
]
}
},
"name": "Is Actionable?",
"type": "n8n-nodes-base.if",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Auto-retry actionable items\nconst item = $input.first().json;\n\n// Fetch full event\nconst dlqItem = await $getAll('postgres', {\n operation: 'executeQuery',\n query: `SELECT * FROM dead_letter_queue WHERE dlq_id = '${item.dlqId}'`\n});\n\nconst event = dlqItem[0].original_event;\n\n// Reset retry count and republish\nevent.metadata = event.metadata || {};\nevent.metadata.retryCount = 0;\nevent.metadata.retryReason = 'dlq_auto_retry';\n\nawait $getAll('kafka', {\n operation: 'sendMessage',\n topic: event.metadata.originalTopic || 'agent-events',\n message: event\n});\n\n// Update DLQ status\nawait $getAll('postgres', {\n operation: 'executeQuery',\n query: `UPDATE dead_letter_queue SET status = 'RETRIED', resolution_notes = 'Auto-retry processed' WHERE dlq_id = '${item.dlqId}'`\n});\n\nreturn [{ json: { retried: true, dlqId: item.dlqId } }];"
},
"name": "Auto-Retry",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"resource": "message",
"operation": "post",
"channel": "#dlq-alerts",
"text": "={{ '🚨 DLQ Alert: ' + $input.all().length + ' items need manual review' }}"
},
"name": "Alert Team",
"type": "n8n-nodes-base.slack",
"typeVersion": 1
}
]
}
10. Monitoring and Observability for Event-Driven Systems
Observability in event-driven AI systems requires tracing events across distributed workflows, monitoring queue health, and understanding agent performance.
The Three Pillars of Observability
┌─────────────────────────────────────────────────────────────┐
│ OBSERVABILITY THREE PILLARS │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Metrics │ │ Logs │ │ Traces │ │
│ │ │ │ │ │ │ │
│ │ Queue depth │ │ Event flow │ │ End-to-end │ │
│ │ Event rates │ │ Processing │ │ request │ │
│ │ Latency P99 │ │ errors │ │ timing │ │
│ │ Agent perf │ │ State │ │ Cross- │ │
│ │ Error rates │ │ changes │ │ service │ │
│ │ │ │ │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Dashboards: Grafana │ Loki/Kibana │ Jaeger/Tempo │
│ │
└─────────────────────────────────────────────────────────────┘
Distributed Tracing for AI Workflows
// OpenTelemetry tracing for n8n workflows
const { trace, context, SpanStatusCode } = require('@opentelemetry/api');
const tracer = trace.getTracer('n8n-agent-workflows');
async function processWithTracing(event) {
// Extract or create trace context
const parentSpanContext = extractContext(event.metadata?.traceContext);
const span = tracer.startSpan('process_agent_event', {
attributes: {
'event.type': event.eventType,
'event.id': event.eventId,
'session.id': event.payload.sessionId,
'agent.id': event.payload.agentId
}
}, parentSpanContext);
try {
// Add event to current context
const ctx = trace.setSpan(context.active(), span);
await context.with(ctx, async () => {
// Process event
await processEvent(event);
});
span.setStatus({ code: SpanStatusCode.OK });
} catch (error) {
span.recordException(error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message
});
throw error;
} finally {
span.end();
}
}
// Propagate trace context in events
function enrichEventWithTracing(event) {
const span = trace.getActiveSpan();
if (span) {
const spanContext = span.spanContext();
event.metadata = event.metadata || {};\n event.metadata.traceContext = {\n traceId: spanContext.traceId,\n spanId: spanContext.spanId,\n traceFlags: spanContext.traceFlags\n };\n }\n return event;\n}
Key Metrics for Event-Driven AI
# Prometheus metrics for AI agent monitoring
# These would be emitted by n8n workflows
# Event processing metrics
agent_events_received_total:
type: counter
labels: [topic, event_type]
agent_events_processed_total:
type: counter
labels: [topic, event_type, status]
agent_event_processing_duration_seconds:
type: histogram
labels: [event_type]
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
# Queue metrics
agent_queue_depth:
type: gauge
labels: [queue_name]
agent_queue_consumer_lag:
type: gauge
labels: [topic, partition, consumer_group]
# LLM metrics
agent_llm_requests_total:
type: counter
labels: [model, status]
agent_llm_latency_seconds:
type: histogram
labels: [model]
buckets: [0.1, 0.5, 1, 2, 5, 10, 30]
agent_llm_tokens_used_total:
type: counter
labels: [model, token_type]
# Session metrics
agent_sessions_active:
type: gauge
labels: [agent_type]
agent_session_duration_seconds:
type: histogram
labels: [agent_type, outcome]
buckets: [30, 60, 300, 600, 1800, 3600]
# Error metrics
agent_errors_total:
type: counter
labels: [error_type, event_type, agent_id]
agent_dlq_items_total:
type: counter
labels: [reason]
n8n Monitoring Workflow
{
"name": "Agent Metrics Collector",
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"field": "seconds",
"secondsInterval": 15
}
]
}
},
"name": "Collect Every 15s",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Collect metrics from multiple sources\nconst metrics = {\n timestamp: new Date().toISOString(),\n sources: {}\n};\n\n// 1. Kafka consumer lag\nconst kafkaLag = await $getAll('httpRequest', {\n method: 'GET',\n url: 'http://kafka:9090/admin/consumers/agent-workers/lag'\n});\nmetrics.sources.kafka_lag = kafkaLag;\n\n// 2. Redis queue depth\nconst redisQueue = await $getAll('redis', {\n operation: 'llen',\n key: 'agent-task-queue'\n});\nmetrics.sources.redis_queue_depth = redisQueue;\n\n// 3. Database metrics\nconst dbMetrics = await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n SELECT \n COUNT(*) FILTER (WHERE status = 'active') as active_sessions,\n COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 minute') as sessions_last_minute,\n AVG(EXTRACT(EPOCH FROM (ended_at - started_at))) FILTER (WHERE ended_at IS NOT NULL) as avg_duration\n FROM agent_session_summary\n `\n});\nmetrics.sources.database = dbMetrics[0];\n\n// 4. n8n execution metrics\nconst n8nMetrics = await $getAll('httpRequest', {\n method: 'GET',\n url: 'http://n8n:5678/rest/metrics'\n});\nmetrics.sources.n8n = n8nMetrics;\n\nreturn [{ json: metrics }];"
},
"name": "Collect Metrics",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"operation": "insert",
"table": "metrics_timeseries",
"columns": {
"string": [
{
"column": "timestamp",
"value": "={{ $json.timestamp }}"
},
{
"column": "metrics",
"value": "={{ JSON.stringify($json.sources) }}"
}
]
}
},
"name": "Store Metrics",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ $json.sources.redis_queue_depth }}",
"operation": "larger",
"value2": 1000
}
]
}
},
"name": "Queue Too Deep?",
"type": "n8n-nodes-base.if",
"typeVersion": 1
},
{
"parameters": {
"resource": "message",
"operation": "post",
"channel": "#ops-alerts",
"text": "={{ '⚠️ High queue depth: ' + $json.sources.redis_queue_depth + ' items' }}"
},
"name": "Alert Ops",
"type": "n8n-nodes-base.slack",
"typeVersion": 1
}
]
}
Alerting Rules
# Prometheus alerting rules for AI agent systems
groups:
- name: ai_agents
rules:
# High queue depth
- alert: AgentQueueDepthHigh
expr: agent_queue_depth > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Agent queue depth is high"
description: "Queue {{ $labels.queue_name }} has {{ $value }} items"
# Consumer lag
- alert: AgentConsumerLagHigh
expr: agent_queue_consumer_lag > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer lag is high"
description: "Consumer group {{ $labels.consumer_group }} is {{ $value }} messages behind"
# Event processing errors
- alert: AgentEventProcessingErrors
expr: rate(agent_errors_total[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate in event processing"
# LLM latency
- alert: AgentLLMLatencyHigh
expr: histogram_quantile(0.95, agent_llm_latency_seconds) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "LLM latency is high"
# DLQ items accumulating
- alert: AgentDLQItemsAccumulating
expr: increase(agent_dlq_items_total[1h]) > 100
for: 15m
labels:
severity: critical
annotations:
summary: "Many items entering DLQ"
# Session timeouts
- alert: AgentSessionTimeouts
expr: rate(agent_session_timeout_total[5m]) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "High rate of session timeouts"
11. Scalability Patterns and Horizontal Scaling
Scaling AI agent systems requires understanding both data partitioning and compute scaling strategies.
Partitioning Strategies
┌─────────────────────────────────────────────────────────────┐
│ EVENT STREAM PARTITIONING │
├─────────────────────────────────────────────────────────────┤
│ │
│ Partition by Session ID (Session Affinity): │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Part 0 │ │ Part 1 │ │ Part 2 │ │ Part 3 │ │
│ │ sessA* │ │ sessB* │ │ sessC* │ │ sessD* │ │
│ │ sessA1 │ │ sessB1 │ │ sessC1 │ │ sessD1 │ │
│ │ sessA2 │ │ sessB2 │ │ sessC2 │ │ sessD2 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Benefits: │
│ ✓ Event ordering per session │
│ ✓ Local state possible │
│ ✓ Natural sharding │
│ │
│ Partition by Event Type (Load Balancing): │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Part 0 │ │ Part 1 │ │ Part 2 │ │ Part 3 │ │
│ │ user.* │ │ agent.*│ │ tool.* │ │system.*│ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ Benefits: │
│ ✓ Specialized consumers │
│ ✓ Independent scaling per event type │
│ ✓ Separate retention policies │
│ │
└─────────────────────────────────────────────────────────────┘
Kafka Partitioning Configuration
# Create partitioned topic for agent events
kafka-topics.sh --create \
--topic ai-agent-events \
--partitions 24 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config retention.ms=604800000 \
--config compression.type=lz4 \
--bootstrap-server kafka:9092
# Producer: Ensure session affinity
# Key = session_id ensures all events for a session go to same partition
# Consumer: Scale consumers within group
# 24 partitions = max 24 consumers in group for 1:1 mapping
# More consumers = some consumers process multiple partitions
n8n Horizontal Scaling
# docker-compose.yml for scaled n8n deployment
version: '3.8'
services:
# Shared PostgreSQL for workflow storage
n8n-db:
image: postgres:15-alpine
environment:
POSTGRES_USER: n8n
POSTGRES_PASSWORD: password
POSTGRES_DB: n8n
volumes:
- postgres_data:/var/lib/postgresql/data
# Redis for distributed locking and caching
n8n-redis:
image: redis:7-alpine
volumes:
- redis_data:/data
# Multiple n8n worker instances
n8n-worker-1:
image: n8nio/n8n:latest
environment:
- DB_TYPE=postgresdb
- DB_POSTGRESDB_HOST=n8n-db
- DB_POSTGRESDB_DATABASE=n8n
- DB_POSTGRESDB_USER=n8n
- DB_POSTGRESDB_PASSWORD=password
- QUEUE_BULL_REDIS_HOST=n8n-redis
- EXECUTIONS_MODE=queue
- N8N_BASIC_AUTH_ACTIVE=true
- N8N_BASIC_AUTH_USER=admin
- N8N_BASIC_AUTH_PASSWORD=password
- WEBHOOK_URL=https://n8n.example.com/
volumes:
- ./workflows:/home/node/.n8n/workflows
deploy:
replicas: 3
# Webhook processor (can scale independently)
n8n-webhook:
image: n8nio/n8n:latest
environment:
- DB_TYPE=postgresdb
- DB_POSTGRESDB_HOST=n8n-db
- QUEUE_BULL_REDIS_HOST=n8n-redis
- EXECUTIONS_MODE=queue
- N8N_DISABLE_PRODUCTION_MAIN_PROCESS=true
deploy:
replicas: 2
# Main instance (UI and orchestration)
n8n-main:
image: n8nio/n8n:latest
environment:
- DB_TYPE=postgresdb
- DB_POSTGRESDB_HOST=n8n-db
- QUEUE_BULL_REDIS_HOST=n8n-redis
- EXECUTIONS_MODE=queue
ports:
- "5678:5678"
depends_on:
- n8n-db
- n8n-redis
volumes:
postgres_data:
redis_data:
Auto-Scaling Configuration
# Kubernetes HPA for n8n workers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: n8n-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: n8n-worker
minReplicas: 3
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
topic: ai-agent-events
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Back-Pressure Handling
// Implement back-pressure to prevent overwhelming downstream services
class BackPressureController {
constructor(maxInFlight, targetLatency) {
this.maxInFlight = maxInFlight;
this.targetLatency = targetLatency;
this.currentInFlight = 0;
this.latencyHistory = [];
this.isPaused = false;
}
canAccept() {
// Simple limit-based back-pressure
if (this.currentInFlight >= this.maxInFlight) {
return false;
}
// Latency-based back-pressure
const avgLatency = this.getAverageLatency();
if (avgLatency > this.targetLatency * 1.5) {
return false;
}
return true;
}
async processWithBackPressure(event) {
while (!this.canAccept()) {
await sleep(100); // Wait before retrying
}
this.currentInFlight++;
const startTime = Date.now();
try {
const result = await processEvent(event);
return result;
} finally {
this.currentInFlight--;
this.latencyHistory.push(Date.now() - startTime);
// Keep only last 100 latency measurements
if (this.latencyHistory.length > 100) {
this.latencyHistory.shift();
}
}
}
getAverageLatency() {
if (this.latencyHistory.length === 0) return 0;
return this.latencyHistory.reduce((a, b) => a + b) / this.latencyHistory.length;
}
}
// Usage in n8n
const controller = new BackPressureController(100, 1000); // 100 max, 1s target
exports.default = async function(event) {
return await controller.processWithBackPressure(event);
};
12. Production Deployment Strategies
Deploying event-driven AI agent systems requires careful attention to data persistence, security, and operational procedures.
Deployment Architecture
┌─────────────────────────────────────────────────────────────┐
│ PRODUCTION DEPLOYMENT │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ LOAD BALANCER │ │
│ │ (SSL termination, rate limiting) │ │
│ └──────────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ n8n │ │ n8n │ │ n8n │ │
│ │ Webhook │ │ Webhook │ │ Main │ │
│ │ Instance │ │ Instance │ │ Instance │ │
│ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Redis │ │ PostgreSQL│ │
│ │ Queue │ │ (State) │ │
│ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────┐ ┌────────┐ │
│ │Worker│ │ Worker │ │
│ │ Pod 1│ │ Pod 2 │ ... (auto-scaled) │
│ └──┬───┘ └───┬────┘ │
│ │ │ │
│ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Apache Kafka │ │
│ │ Cluster │ │
│ │ (3 brokers) │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Database Migration Strategy
# Database versioning for event store
# migration_001_initial.sql
-- Initial schema
CREATE TABLE agent_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
event_version INT DEFAULT 1,
session_id VARCHAR(255) NOT NULL,
event_sequence BIGINT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
CONSTRAINT unique_session_sequence UNIQUE (session_id, event_sequence)
);
CREATE INDEX idx_agent_events_session ON agent_events(session_id, event_sequence);
-- migration_002_add_correlation.sql
-- Add correlation tracking
ALTER TABLE agent_events ADD COLUMN correlation_id UUID;
ALTER TABLE agent_events ADD COLUMN causation_id UUID;
CREATE INDEX idx_agent_events_correlation ON agent_events(correlation_id);
-- migration_003_add_partitioning.sql
-- Convert to partitioned table by time
CREATE TABLE agent_events_partitioned (
LIKE agent_events INCLUDING ALL
) PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE agent_events_y2026m01 PARTITION OF agent_events_partitioned
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE agent_events_y2026m02 PARTITION OF agent_events_partitioned
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
Blue-Green Deployment for n8n
# Kubernetes blue-green deployment
apiVersion: v1
kind: Service
metadata:
name: n8n-active
spec:
selector:
app: n8n
version: blue # Switch to green for deployment
ports:
- port: 5678
targetPort: 5678
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: n8n-blue
labels:
app: n8n
version: blue
spec:
replicas: 3
selector:
matchLabels:
app: n8n
version: blue
template:
metadata:
labels:
app: n8n
version: blue
spec:
containers:
- name: n8n
image: n8nio/n8n:1.50.0
env:
- name: EXECUTIONS_MODE
value: queue
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: n8n-green
labels:
app: n8n
version: green
spec:
replicas: 3
selector:
matchLabels:
app: n8n
version: green
template:
metadata:
labels:
app: n8n
version: green
spec:
containers:
- name: n8n
image: n8nio/n8n:1.51.0 # New version
env:
- name: EXECUTIONS_MODE
value: queue
Security Best Practices
// Security middleware for n8n workflows
function securityMiddleware(event) {
// 1. Input validation
const sanitized = sanitizeInput(event);
// 2. Authentication check
if (!verifyWebhookSignature(sanitized)) {
throw new Error('Invalid signature');
}
// 3. Rate limiting check
if (!checkRateLimit(sanitized.sourceIp)) {
throw new Error('Rate limit exceeded');
}
// 4. PII masking in logs
const masked = maskPII(sanitized);
// 5. Token validation for LLM calls
if (sanitized.requiresLLM) {
validateAPITokens();
}
return masked;
}
function sanitizeInput(event) {
// Remove potentially dangerous content
const sanitized = JSON.parse(JSON.stringify(event));
if (sanitized.payload?.content) {
// Sanitize HTML/script tags
sanitized.payload.content = sanitized.payload.content
.replace(/<script[^>]*>.*?<\/script>/gi, '')
.replace(/<[^>]+>/g, '');
}
return sanitized;
}
function maskPII(event) {
const masked = JSON.parse(JSON.stringify(event));
// Mask email addresses
if (masked.payload?.email) {
masked.payload.email = masked.payload.email.replace(
/(.{2}).*@(.*)/,
'$1***@$2'
);
}
// Mask phone numbers
if (masked.payload?.phone) {
masked.payload.phone = masked.payload.phone.replace(
/\d(?=\d{4})/g,
'*'
);
}
return masked;
}
13. Case Study: E-commerce Order Processing Pipeline
A real-world implementation of event-driven AI agents for order processing.
Business Requirements
- Process 50,000+ orders/day
- AI-powered fraud detection
- Real-time inventory updates
- Multi-step approval workflows
- 99.9% uptime SLA
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ E-COMMERCE ORDER PROCESSING PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Order Events → Kafka → n8n Workflows → Services │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Event Flow: │ │
│ │ │ │
│ │ order.created │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────┐ inventory.check │ │
│ │ │ Validation │───────────────┐ │ │
│ │ │ Agent │ │ │ │
│ │ └─────────────┘ ▼ │ │
│ │ ┌─────────┐ │ │
│ │ ┌────│Inventory│ │ │
│ │ │ │ Service │ │ │
│ │ │ └────┬────┘ │ │
│ │ │ │ │ │
│ │ │ inventory.checked │ │
│ │ │ │ │ │
│ │ │ ▼ │ │
│ │ │ ┌─────────┐ │ │
│ │ └────►│ Fraud │ │ │
│ │ │ Detection │ │
│ │ │ Agent │ │ │
│ │ └────┬────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────┐ │ │
│ │ │ Decision Point │ │ │
│ │ │ (risk_score > 0.7)│ │ │
│ │ └─────────┬──────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────┴────────────────┐│ │
│ │ │ ││ │
│ │ ▼ ▼│ │
│ │ ┌──────────┐ ┌──────────┐ │
│ │ │ Auto │ │ Manual │ │
│ │ │ Approve │ │ Review │ │
│ │ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ │ └─────────────────┬───────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ order.processed │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────┐ │
│ │ │ Fulfillment │ │
│ │ │ Agent │ │
│ │ └──────────────┘ │
│ │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Implementation Details
{
"name": "Order Processing Saga",
"nodes": [
{
"parameters": {
"topic": "orders.created",
"groupId": "order-processors"
},
"name": "New Orders",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Validation Agent\nconst order = $input.first().json;\n\nconst validation = {\n valid: true,\n errors: [],\n normalized: {}\n};\n\n// Validate customer\nif (!order.customerId) {\n validation.valid = false;\n validation.errors.push('Missing customer ID');\n}\n\n// Validate items\nif (!order.items || order.items.length === 0) {\n validation.valid = false;\n validation.errors.push('Empty order');\n}\n\n// Calculate totals\nvalidation.normalized.totalAmount = order.items.reduce(\n (sum, item) => sum + (item.price * item.quantity), 0\n);\n\n// Check for high-value order\nvalidation.isHighValue = validation.normalized.totalAmount > 1000;\n\nreturn [{\n json: {\n order,\n validation\n }\n}];"
},
"name": "Validation Agent",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{ $json.validation.valid }}",
"value2": false
}
]
}
},
"name": "Valid Order?",
"type": "n8n-nodes-base.if",
"typeVersion": 1
},
{
"parameters": {
"operation": "insert",
"table": "failed_orders",
"columns": {
"string": [
{
"column": "order_id",
"value": "={{ $json.order.id }}"
},
{
"column": "reason",
"value": "={{ $json.validation.errors.join(', ') }}"
}
]
}
},
"name": "Log Failed Order",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"method": "POST",
"url": "http://inventory-service:8080/check",
"body": "={{ { items: $json.order.items } }}"
},
"name": "Check Inventory",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 1
},
{
"parameters": {
"conditions": {
"boolean": [
{
"value1": "={{ $json.available }}",
"value2": true
}
]
}
},
"name": "In Stock?",
"type": "n8n-nodes-base.if",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// AI Fraud Detection\nconst order = $input.first().json.order;\n\n// Prepare features for fraud detection\nconst features = {\n orderAmount: order.total,\n customerAge: calculateCustomerAge(order.customerId),\n shippingBillingMatch: order.shippingAddress === order.billingAddress,\n timeOfDay: new Date(order.createdAt).getHours(),\n itemCategories: order.items.map(i => i.category),\n deviceFingerprint: order.metadata?.deviceFingerprint,\n velocity: await getOrderVelocity(order.customerId)\n};\n\n// Call ML fraud detection service\nconst fraudResult = await $getAll('httpRequest', {\n method: 'POST',\n url: 'http://ml-service:8080/predict/fraud',\n body: features\n});\n\nconst riskScore = fraudResult.riskScore;\nconst recommendation = riskScore > 0.8 ? 'BLOCK' : \n riskScore > 0.5 ? 'REVIEW' : 'APPROVE';\n\n// Log for training data\nawait $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'fraud.predictions',\n message: {\n orderId: order.id,\n riskScore,\n recommendation,\n features,\n timestamp: new Date().toISOString()\n }\n});\n\nreturn [{\n json: {\n order,\n fraud: {\n riskScore,\n recommendation\n }\n }\n}];\n\nasync function getOrderVelocity(customerId) {\n const recent = await $getAll('postgres', {\n operation: 'executeQuery',\n query: `\n SELECT COUNT(*) FROM orders \n WHERE customer_id = '${customerId}' \n AND created_at > NOW() - INTERVAL '1 hour'\n `\n });\n return recent[0].count;\n}"
},
"name": "Fraud Detection Agent",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"rules": {
"rules": [
{
"value": "={{ $json.fraud.recommendation }}",
"output": 0
}
]
}
},
"name": "Route by Risk",
"type": "n8n-nodes-base.switch",
"typeVersion": 1
},
{
"parameters": {
"operation": "insert",
"table": "manual_review_queue",
"columns": {
"string": [
{
"column": "order_id",
"value": "={{ $json.order.id }}"
},
{
"column": "risk_score",
"value": "={{ $json.fraud.riskScore }}"
}
]
}
},
"name": "Queue for Review",
"type": "n8n-nodes-base.postgres",
"typeVersion": 1
},
{
"parameters": {
"method": "POST",
"url": "http://payment-service:8080/process",
"body": "={{ { orderId: $json.order.id, amount: $json.order.total } }}"
},
"name": "Process Payment",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Fulfillment Agent\nconst order = $input.first().json.order;\n\n// Determine fulfillment strategy\nlet strategy = 'STANDARD';\nif (order.isExpress) strategy = 'EXPRESS';\nif (order.isInternational) strategy = 'INTERNATIONAL';\n\n// Create fulfillment tasks\nconst tasks = order.items.map(item => ({\n taskId: generateId(),\n orderId: order.id,\n sku: item.sku,\n quantity: item.quantity,\n strategy,\n priority: order.isHighValue ? 'HIGH' : 'NORMAL'\n}));\n\n// Publish to fulfillment service\nfor (const task of tasks) {\n await $getAll('kafka', {\n operation: 'sendMessage',\n topic: 'fulfillment.tasks',\n message: task\n });\n}\n\nreturn [{\n json: {\n orderId: order.id,\n tasksCreated: tasks.length,\n strategy\n }\n}];\n\nfunction generateId() {\n return Math.random().toString(36).substring(2);\n}"
},
"name": "Fulfillment Agent",
"type": "n8n-nodes-base.function",
"typeVersion": 1
}
]
}
Performance Results
| Metric | Before | After | Improvement |
|---|---|---|---|
| Orders/day | 10,000 | 50,000+ | 400% |
| Avg Processing Time | 45s | 3s | 93% |
| Fraud Detection | Manual | AI-powered | Automated |
| False Positive Rate | 5% | 0.8% | 84% |
| System Uptime | 99.5% | 99.97% | +0.47% |
14. Case Study: Real-Time Fraud Detection System
A comprehensive AI-powered fraud detection system using event-driven architecture.
System Requirements
- Process 100,000+ transactions/second
- Sub-100ms detection latency
- 99.9% fraud detection accuracy
- Real-time blocking capability
- Model updates without downtime
Architecture
┌─────────────────────────────────────────────────────────────┐
│ REAL-TIME FRAUD DETECTION SYSTEM │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Transaction Stream (Kafka - 100 partitions) │ │
│ └────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Feature │ │ Rule Engine │ │ ML Inference │ │
│ │ Engineering │ │ (Fast Path) │ │ (Deep Check) │ │
│ └───────┬────────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Risk Scoring │ │
│ │ & Decision │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ALLOW │ │ CHALLENGE│ │ BLOCK │ │
│ │ │ │ (2FA) │ │ │ │
│ └────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Feedback Loop: Confirmed fraud → Model Retrain │ │
│ └────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Feature Engineering Pipeline
{
"name": "Fraud Feature Pipeline",
"nodes": [
{
"parameters": {
"topic": "transactions.raw",
"groupId": "fraud-feature-engineers"
},
"name": "Raw Transactions",
"type": "n8n-nodes-base.kafkaTrigger",
"typeVersion": 1
},
{
"parameters": {
"functionCode": "// Real-time feature engineering\nconst txn = $input.first().json;\n\n// Fetch customer history from Redis\nconst customerKey = `customer:${txn.customerId}:history`;\nconst history = await $getAll('redis', {\n operation: 'get',\n key: customerKey\n});\nconst customerHistory = JSON.parse(history || '{}');\n\n// Calculate velocity features\nconst now = new Date(txn.timestamp);\nconst features = {\n transaction: {\n amount: txn.amount,\n currency: txn.currency,\n merchantCategory: txn.mcc,\n isOnline: txn.channel === 'online',\n isInternational: txn.country !== txn.cardCountry,\n timeOfDay: now.getHours(),\n dayOfWeek: now.getDay(),\n isWeekend: [0, 6].includes(now.getDay())\n },\n velocity: {\n txnsLastHour: countRecentTxns(customerHistory, 1),\n txnsLastDay: countRecentTxns(customerHistory, 24),\n amountLastHour: sumRecentAmounts(customerHistory, 1),\n uniqueMerchantsLastDay: countUniqueMerchants(customerHistory, 24)\n },\n behavior: {\n avgTxnAmount: customerHistory.avgAmount || txn.amount,\n typicalLocations: customerHistory.typicalCountries || [txn.cardCountry],\n isLocationAnomaly: !customerHistory.typicalCountries?.includes(txn.country),\n deviceConsistency: checkDeviceConsistency(customerHistory, txn.deviceId)\n },\n risk: {\n merchantRiskScore: await getMerchantRisk(txn.merchantId),\n countryRiskScore: await getCountryRisk(txn.country),\n binRiskScore: await getBinRisk(txn.cardBin)\n }\n};\n\n// Update customer history\nconst updatedHistory = updateCustomerHistory(customerHistory, txn);\nawait $getAll('redis', {\n operation: 'set',\n key: customerKey,\n value: JSON.stringify(updatedHistory),\n expire: true,\n ttl: 86400 * 7 // 7 days\n});\n\nreturn [{\n json: {\n transactionId: txn.id,\n customerId: txn.customerId,\n features,\n timestamp: txn.timestamp\n }\n}];\n\nfunction countRecentTxns(history, hours) {\n const cutoff = Date.now() - (hours * 3600000);\n return (history.recentTransactions || [])\n .filter(t => new Date(t.timestamp).getTime() > cutoff)\n .length;\n}\n\nfunction sumRecentAmounts(history, hours) {\n const cutoff = Date.now() - (hours * 3600000);\n return (history.recentTransactions || [])\n .filter(t => new Date(t.timestamp).getTime() > cutoff)\n .reduce((sum, t) => sum + t.amount, 0);\n}\n\nfunction updateCustomerHistory(history, txn) {\n const updated = { ...history };\n updated.recentTransactions = [\n ...(history.recentTransactions || []).slice(-99),\n { amount: txn.amount, timestamp: txn.timestamp, merchant: txn.merchantId }\n ];\n updated.avgAmount = updated.recentTransactions\n .reduce((sum, t) => sum + t.amount, 0) / updated.recentTransactions.length;\n updated.typicalCountries = [...new Set([\n ...(history.typicalCountries || []),\n txn.country\n })].slice(-5);\n return updated;\n}"
},
"name": "Feature Engineer",
"type": "n8n-nodes-base.function",
"typeVersion": 1
},
{
"parameters": {
"topic": "transactions.features",
"messages": [
{
"key": "={{ $json.transactionId }}",
"message": "={{ JSON.stringify($json) }}"
}
]
},
"name": "Emit Features",
"type": "n8n-nodes-base.kafka",
"typeVersion": 1
}
]
}
ML Model Serving
# Python code node for ML inference in n8n
import json
import numpy as np
from typing import Dict, Any
# Load model (in production, load once and cache)
model = load_model('/models/fraud_detection_v2.pkl')
async def main():
input_data = json.loads($input.first().json)
features = input_data['features']
# Flatten features for model
feature_vector = flatten_features(features)
# Run inference
prediction = model.predict_proba([feature_vector])[0]
risk_score = prediction[1] # Probability of fraud
# Model confidence
confidence = max(prediction)
# Determine action
action = determine_action(risk_score, confidence)
return [{
"json": {
"transactionId": input_data['transactionId'],
"riskScore": float(risk_score),
"confidence": float(confidence),
"action": action,
"modelVersion": "2.3.1",
"features": feature_vector,
"timestamp": datetime.now().isoformat()
}
}]
def flatten_features(features: Dict[str, Any]) -> list:
"""Convert nested feature dict to flat vector"""
flat = []
# Transaction features
flat.append(features['transaction']['amount'])
flat.append(1 if features['transaction']['isOnline'] else 0)
flat.append(1 if features['transaction']['isInternational'] else 0)
flat.append(features['transaction']['timeOfDay'] / 24) # Normalize
# Velocity features
flat.append(features['velocity']['txnsLastHour'])
flat.append(features['velocity']['txnsLastDay'] / 24)
flat.append(features['velocity']['amountLastHour'] / 1000)
# Behavior features
flat.append(1 if features['behavior']['isLocationAnomaly'] else 0)
flat.append(features['behavior']['deviceConsistency'])
# Risk features
flat.append(features['risk']['merchantRiskScore'])
flat.append(features['risk']['countryRiskScore'])
return flat
def determine_action(risk_score: float, confidence: float) -> str:
if risk_score > 0.9 and confidence > 0.8:
return "BLOCK"
elif risk_score > 0.7:
return "CHALLENGE"
elif risk_score > 0.3:
return "MONITOR"
else:
return "ALLOW"
# Run main
result = await main()
Performance Results
| Metric | Target | Achieved |
|---|---|---|
| Transactions/second | 100,000 | 125,000 |
| Detection latency | <100ms | 45ms avg, 120ms p99 |
| Fraud detection rate | >99% | 99.3% |
| False positive rate | <1% | 0.7% |
| System uptime | 99.99% | 99.997% |
15. Performance Benchmarks and Best Practices
Benchmarking and optimizing event-driven AI agent systems requires systematic measurement and tuning.
Benchmarking Framework
// Performance test harness for n8n workflows
class WorkflowBenchmark {
constructor(config) {
this.targetThroughput = config.targetThroughput; // events/sec
this.durationSeconds = config.durationSeconds;
this.warmupSeconds = config.warmupSeconds || 30;
this.metrics = [];
}
async run() {
console.log(`Starting benchmark: ${this.targetThroughput} events/sec`);
// Warmup phase
await this.warmup();
// Measurement phase
const startTime = Date.now();
const promises = [];
const intervalMs = 1000 / this.targetThroughput;
while (Date.now() - startTime < this.durationSeconds * 1000) {
const event = this.generateEvent();
promises.push(this.measureEvent(event));
await sleep(intervalMs);
}
await Promise.all(promises);
return this.generateReport();
}
async measureEvent(event) {
const start = process.hrtime.bigint();
try {
await this.sendEvent(event);
const end = process.hrtime.bigint();
this.metrics.push({
success: true,
latencyMs: Number(end - start) / 1000000,
timestamp: Date.now()
});
} catch (error) {
this.metrics.push({
success: false,
error: error.message,
timestamp: Date.now()
});
}
}
generateReport() {
const latencies = this.metrics.filter(m => m.success).map(m => m.latencyMs);
const errors = this.metrics.filter(m => !m.success);
return {
throughput: this.metrics.length / this.durationSeconds,
avgLatencyMs: latencies.reduce((a, b) => a + b, 0) / latencies.length,
p50LatencyMs: percentile(latencies, 50),
p95LatencyMs: percentile(latencies, 95),
p99LatencyMs: percentile(latencies, 99),
errorRate: errors.length / this.metrics.length,
totalEvents: this.metrics.length
};
}
}
// Usage
const benchmark = new WorkflowBenchmark({
targetThroughput: 10000,
durationSeconds: 300,
warmupSeconds: 60
});
const results = await benchmark.run();
console.table(results);
Performance Benchmarks
| Configuration | Throughput | Latency (p99) | CPU Usage | Memory |
|---|---|---|---|---|
| Single n8n + Redis | 500/sec | 150ms | 40% | 2GB |
| 3 n8n workers + Redis | 2,000/sec | 120ms | 35% each | 1.5GB each |
| 5 n8n + Kafka | 10,000/sec | 85ms | 60% each | 2GB each |
| 10 n8n + Kafka + Redis | 25,000/sec | 65ms | 70% each | 2.5GB each |
| 20 n8n + Kafka cluster | 50,000/sec | 45ms | 65% each | 3GB each |
| 50 n8n + Optimized Kafka | 100,000+/sec | 35ms | 75% each | 4GB each |
Optimization Best Practices
1. Event Size Optimization
// Before: Bloated events
const largeEvent = {
eventId: uuid(),
timestamp: new Date().toISOString(),
payload: {
user: { /* 50KB of user data */ },
session: { /* 30KB of session data */ },
message: "Hello"
}
};
// After: Reference heavy data
const compactEvent = {
eventId: uuid(),
timestamp: new Date().toISOString(),
payload: {
userId: "user_123", // Reference only
sessionId: "sess_456", // Reference only
message: "Hello"
},
// Include reference to full data in event store if needed
references: {
userSnapshot: "snapshot:user_123:1704067200"
}
};
2. Connection Pooling
# n8n configuration for connection pooling
DB_POSTGRESDB_POOL_SIZE: 20
DB_POSTGRESDB_POOL_MIN: 5
# Redis connection pool
QUEUE_BULL_REDIS_POOL_SIZE: 10
3. Batch Processing
// Batch process multiple events
const BATCH_SIZE = 100;
const FLUSH_INTERVAL = 1000;
class BatchedProcessor {
constructor() {
this.buffer = [];
this.timer = null;
}
async add(event) {
this.buffer.push(event);
if (this.buffer.length >= BATCH_SIZE) {
await this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), FLUSH_INTERVAL);
}
}
async flush() {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0, BATCH_SIZE);
clearTimeout(this.timer);
this.timer = null;
// Process batch
await $getAll('postgres', {
operation: 'insert',
table: 'agent_events',
data: batch
});
}
}
4. Caching Strategy
// Multi-layer caching
const cache = {
// L1: In-memory (sub-millisecond)
memory: new Map(),
// L2: Redis (1-5ms)
async getFromRedis(key) {
return await $getAll('redis', {
operation: 'get',
key
});
},
// L3: Database (5-20ms)
async getFromDB(key) {
return await $getAll('postgres', {
operation: 'select',
table: 'cache',
where: { key }
});
}
};
async function getWithCache(key, fetchFn) {
// Check L1
if (cache.memory.has(key)) {
return cache.memory.get(key);
}
// Check L2
const redisValue = await cache.getFromRedis(key);
if (redisValue) {
cache.memory.set(key, redisValue);
return redisValue;
}
// Fetch and populate all layers
const value = await fetchFn(key);
cache.memory.set(key, value);
await $getAll('redis', {
operation: 'set',
key,
value: JSON.stringify(value),
expire: true,
ttl: 300
});
return value;
}
16. Future Trends: Event-Driven AI and Serverless
The event-driven AI landscape continues evolving rapidly. Understanding emerging trends helps future-proof your architecture.
Serverless Event Processing
┌─────────────────────────────────────────────────────────────┐
│ SERVERLESS EVENT ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Event Sources: │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ API │ │ Kafka │ │ S3 │ │ DynamoDB│ │
│ │ Gateway│ │ Events │ │ Events │ │ Streams │ │
│ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │ │
│ └──────────┴──────────┴──────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ Serverless Function Platform │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Lambda │ │ Cloud │ │ Azure │ │ │
│ │ │ Function│ │ Function│ │ Function│ │ │
│ │ │ (AWS) │ │ (GCP) │ │ (Azure) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────┐ │
│ │ Event Sinks │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ Kafka │ │ SQS │ │Webhook │ │ │
│ │ │ Topic │ │ Queue │ │ │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ └────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Edge AI and Event Streaming
// Edge deployment pattern
// Lightweight n8n at the edge, core processing in cloud
// Edge node configuration (Raspberry Pi / Edge server)
const edgeConfig = {
mode: 'edge',
upstream: 'https://core-n8n.example.com',
localProcessing: [
'intent-classification',
'entity-extraction',
'basic-response'
],
forwardToCloud: [
'complex-reasoning',
'model-training',
'analytics'
]
};
// Edge workflow
async function edgeProcess(event) {
// Try local processing first
if (canProcessLocally(event, edgeConfig)) {
const result = await localModelInference(event);
return { processed: 'local', result };
}
// Forward to cloud with trace context
return await forwardToCloud(event, edgeConfig.upstream);
}
Event-Driven Model Serving
# Kubernetes event-driven model serving
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: fraud-detection-model
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1"
autoscaling.knative.dev/maxScale: "100"
autoscaling.knative.dev/targetConcurrency: "10"
spec:
containers:
- image: gcr.io/project/fraud-model:v2.3
ports:
- containerPort: 8080
resources:
limits:
nvidia.com/gpu: 1
memory: "8Gi"
requests:
memory: "4Gi"
---
# Event source triggering model inference
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: fraud-detection-source
spec:
consumerGroup: fraud-detection
bootstrapServers:
- kafka-cluster:9092
topics:
- transactions.features
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: fraud-detection-model
Emerging Patterns
1. Event-Driven LLM Chains
// Chain of thought as event stream
const chainEvents = [
{ type: 'chain.step', step: 1, thought: 'Understanding the query...' },
{ type: 'chain.tool_call', step: 2, tool: 'search', input: '...' },
{ type: 'chain.observation', step: 3, result: '...' },
{ type: 'chain.step', step: 4, thought: 'Analyzing results...' },
{ type: 'chain.complete', step: 5, response: 'Final answer' }
];
// Each step is an event that can be:
// - Monitored in real-time
// - Retried independently
// - Routed to specialized processors
2. Multi-Modal Event Processing
{
"eventType": "multimodal.message.received",
"payload": {
"sessionId": "sess_abc",
"content": [
{ "type": "text", "content": "What's in this image?" },
{ "type": "image", "url": "s3://bucket/image.jpg", "format": "jpeg" }
],
"processing": {
"vision_model": "gpt-4o-vision",
"text_model": "gpt-4o",
"routing": "parallel"
}
}
}
3. Federated Event Learning
// Train models on event streams without centralizing data
const federatedUpdate = {
clientId: 'client_a',
globalModelVersion: 'v2.3',
localUpdate: {
gradientUpdate: 'encrypted_gradient_vector',
sampleCount: 1000,
loss: 0.23
},
timestamp: '2026-05-25T10:00:00Z'
};
// Aggregate at central coordinator
// Update global model
// Distribute new version
Conclusion
Event-driven architecture has become essential for enterprise AI agent deployments. By decoupling components through events, organizations can achieve the scalability, resilience, and flexibility required for production AI workloads.
Key Takeaways
- Start with Events: Design around events, not APIs. Events capture intent and state changes naturally.
- Choose the Right Message Queue: Kafka for high-throughput streaming, RabbitMQ for complex routing, Redis for low-latency caching.
- Implement Event Sourcing: Complete audit trails and temporal debugging become possible when every state change is an event.
- Design for Failure: Sagas manage distributed transactions. Dead letter queues capture failed events. Circuit breakers prevent cascade failures.
- Separate Reads from Writes: CQRS optimizes query performance while maintaining transaction integrity for commands.
- Monitor Everything: Distributed tracing, metrics, and alerting are essential for operational visibility.
- Scale Horizontally: Partition by session ID for ordering. Auto-scale based on consumer lag.
- Plan for Evolution: Schema evolution, blue-green deployments, and feature flags enable safe changes.
The Future is Event-Driven
As AI agents become more sophisticated—handling multi-modal inputs, engaging in multi-step reasoning, and collaborating in agent swarms—the need for robust event-driven infrastructure only grows. The patterns and practices outlined in this guide provide a foundation for building the next generation of AI-powered applications.
The question is no longer whether to adopt event-driven architecture for AI agents, but how quickly you can implement it to gain competitive advantage.
Resources and References
Official Documentation
- n8n Documentation
- Apache Kafka Documentation
- RabbitMQ Documentation
- Redis Streams Documentation
- OpenTelemetry Specification
Recommended Reading
- Designing Data-Intensive Applications by Martin Kleppmann
- Building Event-Driven Microservices by Adam Bellemare
- Kafka: The Definitive Guide by Neha Narkhede, Gwen Shapira, and Todd Palino
- Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf
Open Source Tools
- n8n - Workflow automation tool
- Apache Kafka - Distributed streaming platform
- RabbitMQ - Message broker
- Jaeger - Distributed tracing
- Prometheus - Monitoring and alerting
- Grafana - Observability platform
Community and Support
This article was written by the Tropical Media engineering team. For questions, feedback, or consulting inquiries, contact us at [email protected].
Last updated: May 25, 2026
MCP Security Hardening for AI Agent Infrastructure: Implementing NSA Guidelines for Production-Grade Model Context Protocol Deployments
Implement NSA's Model Context Protocol security guidelines in your AI agent infrastructure. Learn enterprise-grade MCP hardening strategies, CVE mitigation, zero-trust architecture, and production security patterns for n8n, OpenClaw, and multi-agent systems.
OpenClaw MCP Integration with n8n: Building Production-Grade Agentic AI Workflows
Master the integration of OpenClaw's Model Context Protocol with n8n to build autonomous AI agents. Learn MCP server setup, tool orchestration, secure authentication, and enterprise deployment patterns for the future of agentic automation.