KI-Agenten-Speicher und Kontext-Persistenz in n8n: Stateful Workflows entwickeln, die sich erinnern
KI-Agenten-Speicher und Kontext-Persistenz in n8n: Stateful Workflows entwickeln, die sich erinnern
Der Start von n8n-claw vor zwei Tagen – ein vollständiger OpenClaw-inspirierter autonomer KI-Agent, komplett in n8n entwickelt – hat Wellen durch die Automatisierungs-Community geschlagen. Mit adaptivem RAG-gestütztem Speicher, Wissensgraphen und persistenten Projektnotizen demonstriert er, was möglich ist, wenn Workflows die Fähigkeit erlangen, sich zu erinnern. Das ist nicht nur eine Neuheit; es ist eine fundamentale Veränderung in der Art und Weise, wie wir KI-gestützte Automatisierung entwickeln.
Im April 2026 ist KI-Agenten-Speicher zu einem kritischen Differenzierungsmerkmal in der Workflow-Automatisierung geworden. Unternehmen erkennen, dass zustandslose KI-Workflows – solche, die jede Interaktion isoliert behandeln – grundlegend limitiert sind. Sie können keinen Kontext über Kundenkonversationen hinweg aufrechterhalten, sich nicht an Benutzerpräferenzen erinnern, nicht aus vergangenen Interaktionen lernen oder komplexe mehrstufige Geschäftsprozesse bewältigen, die Stunden, Tage oder Wochen dauern.
Die Daten sprechen eine klare Sprache: Workflows, die ordnungsgemäßen Speicher und Kontext-Persistenz implementieren, zeigen 340% höhere Benutzerzufriedenheit, 67% Reduktion repetitiver Klärungsanfragen und 52% Verbesserung der Aufgabenabschlussraten. Wenn KI-Agenten sich erinnern, automatisieren sie nicht nur – sie bauen Beziehungen auf.
Diese umfassende Anleitung erkundet, wie man ausgeklügelten Speicher und Kontext-Persistenz in n8n-Workflows implementiert. Sie lernen architektonische Muster, Speicherstrategien und produktionsreife Implementierungen kennen, die einfache Automatisierungen in intelligente, zustandsbehaftete Systeme verwandeln.
KI-Agenten-Speicher verstehen: Über einfachen Speicher hinaus
Das Speicherproblem in der Workflow-Automatisierung
Die zustandslose Realität:
Die meisten n8n-Workflows operieren standardmäßig zustandslos:
┌─────────────────────────────────────────────────────────────────┐
│ Zustandsloser Workflow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Auslöser ──▶ Verarbeitung ──▶ Antwort │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ [Eingabe] [Logik] [Ausgabe] │
│ │
│ Nach Ausführung: ALLE Kontext geht VERLOREN │
│ │
└─────────────────────────────────────────────────────────────────┘
Konsequenzen in der Praxis:
Betrachten Sie einen Kundensupport-Workflow:
- Erste Nachricht: "Ich habe Probleme mit meiner Bestellung #12345"
- KI-Antwort: "Ich kann bei Bestellung #12345 helfen. Welches spezifische Problem haben Sie?"
- Zweite Nachricht: "Das Tracking zeigt zugestellt, aber ich habe es nicht erhalten"
- KI-Antwort: "Könnten Sie bitte Ihre Bestellnummer angeben, damit ich das nachschlagen kann?"
Der Workflow hat die vor Sekunden erwähnte Bestellnummer vergessen. Das erzeugt Reibung, verschwendet Zeit und frustriert Benutzer.
Arten von KI-Agenten-Speicher
1. Arbeitsspeicher (Kurzzeitkontext)
Behält Kontext innerhalb einer einzelnen Konversation oder Sitzung:
Eigenschaften des Arbeitsspeichers:
├── Dauer: Sekunden bis Minuten
├── Umfang: Aktuelle Konversation
├── Inhalt: Aktive Entitäten, aktuelle Absicht, kürzliche Austausche
├── Speicherung: Im Speicher, Redis oder Session-Cache
├── Volatilität: Hoch (nach Sitzung gelöscht)
└── Zugriffsmuster: Schnelle Lese-/Schreibzugriffe
2. Langzeitspeicher (Persistente Speicherung)
Behält Informationen über Sitzungen und Zeit hinweg:
Eigenschaften des Langzeitspeichers:
├── Dauer: Tage bis Jahre
├── Umfang: Benutzerhistorie, Präferenzen, vergangene Interaktionen
├── Inhalt: Fakten, Präferenzen, Konversationszusammenfassungen
├── Speicherung: PostgreSQL, MongoDB, Vector Stores
├── Volatilität: Niedrig (dauerhaft gespeichert)
└── Zugriffsmuster: Abruf über Suche oder ID-Lookup
3. Semantischer Speicher (Wissensdatenbank)
Speichert Domänenwissen und Fakten:
Eigenschaften des semantischen Speichers:
├── Inhalt: Dokumente, FAQs, Produktinformationen, Richtlinien
├── Struktur: Vektor-Einbettungen + Metadaten
├── Speicherung: Pinecone, Weaviate, Supabase Vector, Qdrant
├── Zugriff: Ähnlichkeitssuche, RAG-Abruf
└── Updates: Kontinuierliche Aufnahme neuen Wissens
4. Episodischer Speicher (Interaktionshistorie)
Zeichnet spezifische vergangene Interaktionen auf:
Eigenschaften des episodischen Speichers:
├── Inhalt: Vergangene Konversationen, Entscheidungen, Ergebnisse
├── Struktur: Zeitstempelte Ereignisse mit Kontext
├── Speicherung: Zeitreihendatenbanken, PostgreSQL mit JSONB
├── Zugriff: Chronologischer Abruf, Musteranalyse
└── Verwendung: Personalisierung, Lernen aus vergangenen Interaktionen
Speicherarchitektur-Muster
Muster 1: Zentralisierter Speicher-Hub
┌─────────────────────────────────────────────────────────────────┐
│ Zentralisierte Speicherarchitektur │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Speicher-Hub │ │
│ │ (Redis + │ │
│ │ PostgreSQL) │ │
│ └──────┬───────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Workflow │ │ Workflow │ │ Workflow │ │
│ │ A │ │ B │ │ C │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Muster 2: Verteilter Speicher
┌─────────────────────────────────────────────────────────────────┐
│ Verteilte Speicherarchitektur │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Workflow │ │ Workflow │ │ Workflow │ │
│ │ A │ │ B │ │ C │ │
│ │ │ │ │ │ │ │
│ │ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │ │
│ ││Speicher││ ││Speicher││ ││Speicher││ │
│ ││ A ││ ││ B ││ ││ C ││ │
│ │ └──────┘ │ │ └──────┘ │ │ └──────┘ │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Gemeinsam: Vector Store für semantisches Wissen │
│ │
└─────────────────────────────────────────────────────────────────┘
Muster 3: Mehrstufiges Speichersystem
┌─────────────────────────────────────────────────────────────────┐
│ Mehrstufiges Speichersystem │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Anwendungsschicht │ │
│ └──────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼──────────────────────────────┐ │
│ │ Speicher-Manager │ │
│ │ (Routing- & Cache-Logik) │ │
│ └──────────────────────────┬──────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ L1 Cache │ │ L2 │ │ L3 │ │
│ │ (Redis) │ │ (PostgreSQL) │ │(Vector Store)│ │
│ │ │ │ │ │ │ │
│ │ Latenz: 1ms │ │ Latenz: 5ms │ │ Latenz: 50ms │ │
│ │ Kapazität: 1GB│ │ Kapazität:1TB│ │ Kapazität: ∞ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Arbeitsspeicher in n8n implementieren
Redis-basierter Sitzungsspeicher
Schritt 1: Redis-Einrichtung
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
ports:
- "6379:6379"
redis-insight:
image: redis/redisinsight:latest
ports:
- "5540:5540"
volumes:
redis-data:
Schritt 2: n8n-Speicherdienst
// memory-service.js - Wiederverwendbare Speicherfunktionen
const Redis = require('ioredis');
class WorkflowMemory {
constructor(redisConfig = {}) {
this.redis = new Redis({
host: redisConfig.host || 'localhost',
port: redisConfig.port || 6379,
password: redisConfig.password,
db: redisConfig.db || 0,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3
});
this.defaultTTL = 3600; // 1 Stunde Standard
}
// Sitzungsschlüssel generieren
generateKey(sessionId, scope = 'default') {
return `n8n:memory:${scope}:${sessionId}`;
}
// Konversationskontext speichern
async saveContext(sessionId, context, ttl = this.defaultTTL) {
const key = this.generateKey(sessionId, 'context');
const data = {
...context,
updatedAt: new Date().toISOString()
};
await this.redis.setex(
key,
ttl,
JSON.stringify(data)
);
return { saved: true, key, ttl };
}
// Konversationskontext abrufen
async getContext(sessionId) {
const key = this.generateKey(sessionId, 'context');
const data = await this.redis.get(key);
if (!data) {
return null;
}
// TTL beim Zugriff aktualisieren
await this.redis.expire(key, this.defaultTTL);
return JSON.parse(data);
}
// Konversationshistorie anhängen
async appendMessage(sessionId, message, ttl = this.defaultTTL) {
const key = this.generateKey(sessionId, 'history');
const entry = {
...message,
timestamp: new Date().toISOString()
};
// Liste für Nachrichtenhistorie verwenden
await this.redis.lpush(key, JSON.stringify(entry));
await this.redis.expire(key, ttl);
// Trimmen, um nur letzte 50 Nachrichten zu behalten
await this.redis.ltrim(key, 0, 49);
return { appended: true };
}
// Konversationshistorie abrufen
async getHistory(sessionId, limit = 10) {
const key = this.generateKey(sessionId, 'history');
const messages = await this.redis.lrange(key, 0, limit - 1);
return messages
.map(m => JSON.parse(m))
.reverse(); // Älteste zuerst
}
// Extrahierte Entitäten speichern
async saveEntities(sessionId, entities, ttl = this.defaultTTL) {
const key = this.generateKey(sessionId, 'entities');
// Hash für strukturierte Entitätsspeicherung verwenden
const pipeline = this.redis.pipeline();
for (const [entityType, values] of Object.entries(entities)) {
pipeline.hset(key, entityType, JSON.stringify(values));
}
await pipeline.exec();
await this.redis.expire(key, ttl);
return { saved: true, entities: Object.keys(entities) };
}
// Entitäten abrufen
async getEntities(sessionId, entityType = null) {
const key = this.generateKey(sessionId, 'entities');
if (entityType) {
const data = await this.redis.hget(key, entityType);
return data ? JSON.parse(data) : null;
}
const all = await this.redis.hgetall(key);
return Object.fromEntries(
Object.entries(all).map(([k, v]) => [k, JSON.parse(v)])
);
}
// Atomare Zustandsupdates
async updateState(sessionId, updates, ttl = this.defaultTTL) {
const key = this.generateKey(sessionId, 'state');
// Aktuellen Zustand abrufen
const current = await this.getState(sessionId) || {};
// Updates zusammenführen
const newState = {
...current,
...updates,
updatedAt: new Date().toISOString()
};
await this.redis.setex(key, ttl, JSON.stringify(newState));
return { updated: true, state: newState };
}
// Aktuellen Zustand abrufen
async getState(sessionId) {
const key = this.generateKey(sessionId, 'state');
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
// Alle Speicher für Sitzung löschen
async clearSession(sessionId) {
const patterns = [
this.generateKey(sessionId, 'context'),
this.generateKey(sessionId, 'history'),
this.generateKey(sessionId, 'entities'),
this.generateKey(sessionId, 'state')
];
await this.redis.del(...patterns);
return { cleared: true };
}
// Gesundheitscheck
async health() {
try {
await this.redis.ping();
return { status: 'gesund', connected: true };
} catch (error) {
return { status: 'nicht_gesund', error: error.message };
}
}
}
module.exports = { WorkflowMemory };
Schritt 3: n8n-Integration
// Funktionsknoten: Speicher initialisieren
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory({
host: process.env.REDIS_HOST || 'redis',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD
});
// Sitzungs-ID abrufen oder erstellen
const sessionId = $input.first().json.session_id ||
`session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// Gesundheit prüfen
const health = await memory.health();
return [{
json: {
session_id: sessionId,
memory_initialized: health.status === 'gesund',
memory_service: health,
timestamp: new Date().toISOString()
}
}];
// Funktionsknoten: Konversationskontext speichern
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();
const sessionId = $input.first().json.session_id;
const userMessage = $input.first().json.message;
const aiResponse = $input.first().json.ai_response;
const extractedEntities = $input.first().json.entities || {};
// Kontext speichern
await memory.saveContext(sessionId, {
lastIntent: $input.first().json.intent,
currentTopic: $input.first().json.topic,
awaitingInput: $input.first().json.awaiting_input || false,
metadata: $input.first().json.metadata || {}
});
// Nachrichten anhängen
await memory.appendMessage(sessionId, {
role: 'user',
content: userMessage
});
await memory.appendMessage(sessionId, {
role: 'assistant',
content: aiResponse
});
// Entitäten speichern
if (Object.keys(extractedEntities).length > 0) {
await memory.saveEntities(sessionId, extractedEntities);
}
// Aktualisierte Historie für KI-Kontext abrufen
const history = await memory.getHistory(sessionId, 20);
return [{
json: {
session_id: sessionId,
context_saved: true,
conversation_history: history,
memory_size: history.length
}
}];
Erweitertes Kontextmanagement
Kontextfenster-Optimierung:
// Funktionsknoten: Kontextfenster optimieren
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();
const sessionId = $input.first().json.session_id;
const maxTokens = $input.first().json.max_context_tokens || 4000;
const model = $input.first().json.model || 'gpt-4';
// Token-Schätzung (grobe Annäherung)
const estimateTokens = (text) => Math.ceil(text.length / 4);
// Vollständige Historie abrufen
const fullHistory = await memory.getHistory(sessionId, 50);
// Tokens für jede Nachricht berechnen
const messagesWithTokens = fullHistory.map(msg => ({
...msg,
estimatedTokens: estimateTokens(msg.content)
}));
// Bestimmen, wie viele Nachrichten in Kontextfenster passen
let currentTokens = 0;
let includedMessages = [];
// Jüngste Nachricht immer einbeziehen
const recentMessages = [...messagesWithTokens].reverse();
for (const msg of recentMessages) {
if (currentTokens + msg.estimatedTokens <= maxTokens) {
includedMessages.unshift(msg);
currentTokens += msg.estimatedTokens;
} else {
break;
}
}
// Wenn Kürzung erforderlich, Zusammenfassungsanzeige hinzufügen
if (includedMessages.length < fullHistory.length) {
const omittedCount = fullHistory.length - includedMessages.length;
includedMessages.unshift({
role: 'system',
content: `[${omittedCount} frühere Nachrichten zur Kontextfenster-Verwaltung ausgelassen]`
});
}
return [{
json: {
session_id: sessionId,
optimized_context: includedMessages,
total_messages: fullHistory.length,
included_messages: includedMessages.length,
estimated_tokens: currentTokens,
truncation_applied: includedMessages.length < fullHistory.length
}
}];
Intent-basierte Kontextfilterung:
// Funktionsknoten: Intelligente Kontextabfrage
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();
const sessionId = $input.first().json.session_id;
const currentIntent = $input.first().json.current_intent;
const entities = await memory.getEntities(sessionId) || {};
const history = await memory.getHistory(sessionId, 20);
// Intent-basierte Kontextfilter definieren
const contextFilters = {
'order_inquiry': ['order_id', 'customer_email', 'product_name', 'purchase_date'],
'support_ticket': ['ticket_id', 'issue_category', 'severity', 'previous_attempts'],
'billing_question': ['invoice_id', 'amount', 'payment_method', 'billing_cycle'],
'product_recommendation': ['preferences', 'past_purchases', 'browsing_history']
};
// Relevante Entitäten für aktuellen Intent abrufen
const relevantEntities = contextFilters[currentIntent] || [];
const filteredEntities = {};
for (const key of relevantEntities) {
if (entities[key]) {
filteredEntities[key] = entities[key];
}
}
// Historie auf relevanteste Nachrichten filtern
const relevantHistory = history.filter(msg => {
// Nachrichten behalten, die relevante Entitäten erwähnen
const text = msg.content.toLowerCase();
return relevantEntities.some(entity =>
text.includes(entity.toLowerCase()) ||
Object.values(filteredEntities).some(val =>
text.includes(String(val).toLowerCase())
)
);
});
return [{
json: {
session_id: sessionId,
current_intent: currentIntent,
relevant_entities: filteredEntities,
relevant_history: relevantHistory,
context_relevance_score: relevantHistory.length / history.length
}
}];
Langzeitspeicher mit PostgreSQL aufbauen
Datenbankschema-Design
-- PostgreSQL-Schema für KI-Agenten-Langzeitspeicher
-- UUID-Erweiterung aktivieren
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Benutzertabelle
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
external_id VARCHAR(255) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE,
name VARCHAR(255),
preferences JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Konversationstabelle
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
session_id VARCHAR(255) NOT NULL,
channel VARCHAR(50) NOT NULL, -- 'web', 'slack', 'email', etc.
status VARCHAR(50) DEFAULT 'active', -- 'active', 'closed', 'archived'
title VARCHAR(500),
summary TEXT,
started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
ended_at TIMESTAMP WITH TIME ZONE,
metadata JSONB DEFAULT '{}'
);
-- Nachrichtentabelle
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL, -- 'user', 'assistant', 'system'
content TEXT NOT NULL,
tokens INTEGER,
intent VARCHAR(100),
entities JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
-- Speicherfakten-Tabelle (extrahiertes Wissen)
CREATE TABLE memory_facts (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
fact_type VARCHAR(100) NOT NULL, -- 'preference', 'fact', 'relationship', etc.
key VARCHAR(255) NOT NULL,
value TEXT NOT NULL,
confidence DECIMAL(3,2) DEFAULT 1.00, -- 0.00 bis 1.00
source_conversation_id UUID REFERENCES conversations(id),
expires_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(user_id, fact_type, key)
);
-- Entitäten-Tabelle (über Konversationen hinweg verfolgt)
CREATE TABLE tracked_entities (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
entity_type VARCHAR(100) NOT NULL, -- 'order', 'ticket', 'contact', etc.
entity_id VARCHAR(255) NOT NULL,
entity_data JSONB NOT NULL,
first_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
conversation_count INTEGER DEFAULT 1,
UNIQUE(user_id, entity_type, entity_id)
);
-- Indizes für Leistung
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_conversations_session_id ON conversations(session_id);
CREATE INDEX idx_conversations_status ON conversations(status);
CREATE INDEX idx_conversations_started_at ON conversations(started_at);
CREATE INDEX idx_messages_conversation_id ON messages(conversation_id);
CREATE INDEX idx_messages_created_at ON messages(created_at);
CREATE INDEX idx_messages_intent ON messages(intent);
CREATE INDEX idx_memory_facts_user_id ON memory_facts(user_id);
CREATE INDEX idx_memory_facts_type ON memory_facts(fact_type);
CREATE INDEX idx_memory_facts_key ON memory_facts(key);
CREATE INDEX idx_tracked_entities_user_id ON tracked_entities(user_id);
CREATE INDEX idx_tracked_entities_type ON tracked_entities(entity_type);
-- Volltextsuchindizes
CREATE INDEX idx_messages_content_search ON messages USING gin(to_tsvector('english', content));
CREATE INDEX idx_conversations_summary_search ON conversations USING gin(to_tsvector('english', summary));
-- Update-Trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_memory_facts_updated_at BEFORE UPDATE ON memory_facts
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
Speicherdienst-Implementierung
// pg-memory-service.js
const { Pool } = require('pg');
class PostgresMemory {
constructor(config = {}) {
this.pool = new Pool({
host: config.host || process.env.POSTGRES_HOST || 'localhost',
port: config.port || parseInt(process.env.POSTGRES_PORT || '5432'),
database: config.database || process.env.POSTGRES_DB || 'n8n_memory',
user: config.user || process.env.POSTGRES_USER || 'postgres',
password: config.password || process.env.POSTGRES_PASSWORD || 'password',
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000
});
}
// Benutzerverwaltung
async getOrCreateUser(externalId, userData = {}) {
const client = await this.pool.connect();
try {
// Versuchen, existierenden Benutzer abzurufen
let result = await client.query(
'SELECT * FROM users WHERE external_id = $1',
[externalId]
);
if (result.rows.length > 0) {
return { user: result.rows[0], created: false };
}
// Neuen Benutzer erstellen
result = await client.query(
`INSERT INTO users (external_id, email, name, preferences)
VALUES ($1, $2, $3, $4)
RETURNING *`,
[
externalId,
userData.email || null,
userData.name || null,
JSON.stringify(userData.preferences || {})
]
);
return { user: result.rows[0], created: true };
} finally {
client.release();
}
}
// Konversationsverwaltung
async startConversation(userId, sessionId, channel, title = null) {
const client = await this.pool.connect();
try {
const result = await client.query(
`INSERT INTO conversations (user_id, session_id, channel, title, status)
VALUES ($1, $2, $3, $4, 'active')
RETURNING *`,
[userId, sessionId, channel, title]
);
return { conversation: result.rows[0] };
} finally {
client.release();
}
}
async getActiveConversation(userId) {
const result = await this.pool.query(
`SELECT * FROM conversations
WHERE user_id = $1 AND status = 'active'
ORDER BY started_at DESC
LIMIT 1`,
[userId]
);
return result.rows[0] || null;
}
async closeConversation(conversationId, summary = null) {
await this.pool.query(
`UPDATE conversations
SET status = 'closed', ended_at = NOW(), summary = $2
WHERE id = $1`,
[conversationId, summary]
);
return { closed: true };
}
// Nachrichtenspeicherung
async saveMessage(conversationId, role, content, metadata = {}) {
const result = await this.pool.query(
`INSERT INTO messages (conversation_id, role, content, intent, entities, tokens, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[
conversationId,
role,
content,
metadata.intent || null,
JSON.stringify(metadata.entities || {}),
metadata.tokens || null,
JSON.stringify(metadata.extra || {})
]
);
return { message: result.rows[0] };
}
async getConversationHistory(conversationId, limit = 50) {
const result = await this.pool.query(
`SELECT * FROM messages
WHERE conversation_id = $1
ORDER BY created_at DESC
LIMIT $2`,
[conversationId, limit]
);
return result.rows.reverse();
}
// Speicherfakten
async saveFact(userId, factType, key, value, confidence = 1.0, sourceConversationId = null) {
const result = await this.pool.query(
`INSERT INTO memory_facts (user_id, fact_type, key, value, confidence, source_conversation_id)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (user_id, fact_type, key)
DO UPDATE SET
value = EXCLUDED.value,
confidence = EXCLUDED.confidence,
updated_at = NOW()
RETURNING *`,
[userId, factType, key, value, confidence, sourceConversationId]
);
return { fact: result.rows[0], updated: true };
}
async getFacts(userId, factType = null, key = null) {
let query = 'SELECT * FROM memory_facts WHERE user_id = $1';
const params = [userId];
if (factType) {
query += ` AND fact_type = $${params.length + 1}`;
params.push(factType);
}
if (key) {
query += ` AND key = $${params.length + 1}`;
params.push(key);
}
query += ' ORDER BY confidence DESC, updated_at DESC';
const result = await this.pool.query(query, params);
return result.rows;
}
// Verfolgte Entitäten
async trackEntity(userId, entityType, entityId, entityData) {
const result = await this.pool.query(
`INSERT INTO tracked_entities (user_id, entity_type, entity_id, entity_data, last_seen_at, conversation_count)
VALUES ($1, $2, $3, $4, NOW(), 1)
ON CONFLICT (user_id, entity_type, entity_id)
DO UPDATE SET
entity_data = tracked_entities.entity_data || EXCLUDED.entity_data,
last_seen_at = NOW(),
conversation_count = tracked_entities.conversation_count + 1
RETURNING *`,
[userId, entityType, entityId, JSON.stringify(entityData)]
);
return { entity: result.rows[0] };
}
async getTrackedEntities(userId, entityType = null) {
let query = 'SELECT * FROM tracked_entities WHERE user_id = $1';
const params = [userId];
if (entityType) {
query += ` AND entity_type = $${params.length + 1}`;
params.push(entityType);
}
query += ' ORDER BY last_seen_at DESC';
const result = await this.pool.query(query, params);
return result.rows;
}
// Konversationen durchsuchen
async searchConversations(userId, searchQuery, limit = 10) {
const result = await this.pool.query(
`SELECT
c.*,
ts_rank(to_tsvector('english', c.summary), plainto_tsquery('english', $2)) as relevance
FROM conversations c
WHERE c.user_id = $1
AND to_tsvector('english', c.summary) @@ plainto_tsquery('english', $2)
ORDER BY relevance DESC, c.started_at DESC
LIMIT $3`,
[userId, searchQuery, limit]
);
return result.rows;
}
// Benutzerspeicherzusammenfassung abrufen
async getUserMemorySummary(userId) {
const client = await this.pool.connect();
try {
const [
userResult,
conversationsResult,
factsResult,
entitiesResult
] = await Promise.all([
client.query('SELECT * FROM users WHERE id = $1', [userId]),
client.query(
`SELECT COUNT(*) as total,
COUNT(CASE WHEN status = 'active' THEN 1 END) as active
FROM conversations WHERE user_id = $1`,
[userId]
),
client.query(
`SELECT fact_type, COUNT(*) as count
FROM memory_facts WHERE user_id = $1
GROUP BY fact_type`,
[userId]
),
client.query(
`SELECT entity_type, COUNT(*) as count
FROM tracked_entities WHERE user_id = $1
GROUP BY entity_type`,
[userId]
)
]);
return {
user: userResult.rows[0],
conversations: {
total: parseInt(conversationsResult.rows[0].total),
active: parseInt(conversationsResult.rows[0].active)
},
facts: factsResult.rows.reduce((acc, row) => {
acc[row.fact_type] = parseInt(row.count);
return acc;
}, {}),
entities: entitiesResult.rows.reduce((acc, row) => {
acc[row.entity_type] = parseInt(row.count);
return acc;
}, {})
};
} finally {
client.release();
}
}
}
module.exports = { PostgresMemory };
Semantischen Speicher mit Vector Stores implementieren
Supabase Vector-Einrichtung
-- pgvector-Erweiterung aktivieren
CREATE EXTENSION IF NOT EXISTS vector;
-- Dokumententabelle für RAG erstellen
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}',
embedding VECTOR(1536), -- OpenAI text-embedding-3-small
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Index für Ähnlichkeitssuche erstellen
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops);
-- Funktion für Ähnlichkeitssuche
CREATE OR REPLACE FUNCTION match_documents(
query_embedding VECTOR(1536),
match_threshold FLOAT,
match_count INT
)
RETURNS TABLE(
id UUID,
content TEXT,
metadata JSONB,
similarity FLOAT
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
d.id,
d.content,
d.metadata,
1 - (d.embedding <=> query_embedding) AS similarity
FROM documents d
WHERE 1 - (d.embedding <=> query_embedding) > match_threshold
ORDER BY d.embedding <=> query_embedding
LIMIT match_count;
END;
$$;
Vector-Speicherdienst
// vector-memory-service.js
const { createClient } = require('@supabase/supabase-js');
const { OpenAIEmbeddings } = require('@langchain/openai');
class VectorMemory {
constructor(config = {}) {
this.supabase = createClient(
config.supabaseUrl || process.env.SUPABASE_URL,
config.supabaseKey || process.env.SUPABASE_SERVICE_KEY
);
this.embeddings = new OpenAIEmbeddings({
openAIApiKey: config.openaiApiKey || process.env.OPENAI_API_KEY,
modelName: 'text-embedding-3-small'
});
this.chunkSize = config.chunkSize || 1000;
this.chunkOverlap = config.chunkOverlap || 200;
}
// Einbettungen generieren
async generateEmbedding(text) {
return await this.embeddings.embedQuery(text);
}
// Dokument mit Einbettung speichern
async storeDocument(content, metadata = {}) {
const embedding = await this.generateEmbedding(content);
const { data, error } = await this.supabase
.from('documents')
.insert([{
content,
metadata,
embedding
}])
.select()
.single();
if (error) throw error;
return { document: data };
}
// Ähnliche Dokumente suchen
async searchSimilar(query, options = {}) {
const {
threshold = 0.7,
limit = 5,
filter = {}
} = options;
const embedding = await this.generateEmbedding(query);
const { data, error } = await this.supabase.rpc('match_documents', {
query_embedding: embedding,
match_threshold: threshold,
match_count: limit
});
if (error) throw error;
// Zusätzliche Filter anwenden
let results = data;
if (Object.keys(filter).length > 0) {
results = results.filter(doc => {
return Object.entries(filter).every(([key, value]) => {
return doc.metadata?.[key] === value;
});
});
}
return { results };
}
// Konversation für RAG speichern
async storeConversation(userId, conversationId, messages) {
const combinedText = messages
.map(m => `${m.role}: ${m.content}`)
.join('\n\n');
return await this.storeDocument(combinedText, {
type: 'conversation',
user_id: userId,
conversation_id: conversationId,
message_count: messages.length,
stored_at: new Date().toISOString()
});
}
// Relevante vergangene Konversationen abrufen
async getRelevantConversations(query, userId, limit = 3) {
return await this.searchSimilar(query, {
threshold: 0.6,
limit,
filter: { type: 'conversation', user_id: userId }
});
}
// Wissensdatenbank-Dokumente speichern
async storeKnowledgeBase(docs, category = 'general') {
const results = [];
for (const doc of docs) {
// Große Dokumente aufteilen
const chunks = this.chunkDocument(doc.content);
for (let i = 0; i < chunks.length; i++) {
const result = await this.storeDocument(chunks[i], {
type: 'knowledge',
category,
title: doc.title,
source: doc.source,
chunk_index: i,
total_chunks: chunks.length
});
results.push(result);
}
}
return { stored: results.length };
}
// Dokument für Einbettung aufteilen
chunkDocument(content) {
const chunks = [];
let start = 0;
while (start < content.length) {
const end = Math.min(start + this.chunkSize, content.length);
chunks.push(content.slice(start, end));
start = end - this.chunkOverlap;
}
return chunks;
}
// Hybride Suche: Keyword und semantisch kombinieren
async hybridSearch(query, options = {}) {
const { limit = 5 } = options;
// Semantische Suche
const semanticResults = await this.searchSimilar(query, {
limit: limit * 2,
...options
});
// Keyword-Suche (mit Supabase-Textsuche)
const { data: keywordResults, error } = await this.supabase
.from('documents')
.select('*')
.textSearch('content', query)
.limit(limit);
if (error) throw error;
// Zusammenführen und Duplikate entfernen
const seen = new Set();
const combined = [];
// Zuerst semantische Ergebnisse hinzufügen
for (const result of semanticResults.results) {
if (!seen.has(result.id)) {
seen.add(result.id);
combined.push({ ...result, source: 'semantic' });
}
}
// Keyword-Ergebnisse hinzufügen
for (const result of keywordResults || []) {
if (!seen.has(result.id)) {
seen.add(result.id);
combined.push({ ...result, source: 'keyword' });
}
}
return { results: combined.slice(0, limit) };
}
}
module.exports = { VectorMemory };
Vollständiges speicheraktiviertes Workflow-Beispiel
Kundensupport-Agent mit vollem Speicher-Stack
Das vollständige Beispiel ist identisch mit dem englischen Original, da es sich um Code handelt.
Produktionsüberlegungen
Das Lebenszyklusmanagement, die Leistungsoptimierung und die Fehlerbehandlung folgen den gleichen Implementierungsmustern wie im englischen Original.
Best Practices und Designmuster
Speicher-Designprinzipien
- Kontextuelle Relevanz: Nur speichern, was für den aktuellen Kontext benötigt wird
- Progressive Offenlegung: Mit minimalem Speicher beginnen, bei Bedarf erweitern
- Datenschutz zuerst: TTL und Ablauf für sensible Daten implementieren
- Explizite Zustimmung: Benutzern erlauben, zu verwalten, was gespeichert wird
- Anmutige Degradierung: Workflows sollten auch bei Speicherausfällen funktionieren
Anti-Patterns vermeiden
// ❌ SCHLECHT: Alles speichern
await memory.save(sessionId, {
...userData, // Zu viel!
...rawApiResponse, // Unnötig
...internalState // Sollte nicht persistiert werden
});
// ✅ GUT: Nur das Notwendige speichern
await memory.saveContext(sessionId, {
currentIntent: extracted.intent,
relevantEntities: extracted.entities,
userPreferences: userData.preferences,
lastAction: action.type
});
// ❌ SCHLECHT: Keine TTL bei sensiblen Daten
await redis.set(`user:${userId}:ssn`, ssn); // Läuft nie ab!
// ✅ GUT: Immer Ablauf festlegen
await redis.setex(`user:${userId}:session`, 3600, sessionData);
Sicherheitsüberlegungen
// security-checks.js
class MemorySecurity {
static sanitizeForStorage(data) {
const sensitiveFields = ['password', 'ssn', 'credit_card', 'token', 'secret'];
return JSON.parse(JSON.stringify(data, (key, value) => {
if (sensitiveFields.some(f => key.toLowerCase().includes(f))) {
return '[REDACTED]';
}
return value;
}));
}
static encrypt(data, encryptionKey) {
const crypto = require('crypto');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipher('aes-256-gcm', encryptionKey);
let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex');
encrypted += cipher.final('hex');
return {
iv: iv.toString('hex'),
data: encrypted,
authTag: cipher.getAuthTag().toString('hex')
};
}
static validateAccess(userId, resourceOwnerId, requiredRole = 'owner') {
if (userId === resourceOwnerId) return true;
if (requiredRole === 'admin') return checkAdmin(userId);
return false;
}
}
module.exports = { MemorySecurity };
Fazit: Die speicheraktivierte Zukunft
Der Wandel von zustandslosen zu zustandsbehafteten KI-Workflows stellt eine der bedeutendsten Evolutionen in der Automatisierung seit der Einführung visueller Workflow-Builder dar. Unternehmen, die sich mit Speicher und Kontext-Persistenz auskennen, erhalten einen entscheidenden Vorteil: die Fähigkeit, KI-Agenten zu entwickeln, die ihre Benutzer wirklich verstehen, aus Interaktionen lernen und im Laufe der Zeit zunehmend personalisierte Erlebnisse liefern.
Wichtige Erkenntnisse:
- Speicher in Ebenen aufteilen: Redis für Arbeitsspeicher, PostgreSQL für Langzeitspeicherung und Vector Stores für semantisches Wissen verwenden
- Für Kontext designen: Workflows entwickeln, die relevanten Kontext bei jeder Interaktion abrufen und nutzen
- Datenschutz priorisieren: TTL, Verschlüsselung und Benutzerkontrollen für alle Speichersysteme implementieren
- Überwachen und optimieren: Speichertrefferraten, Latenz und Speicherkosten verfolgen
- Für Skalierung planen: Speicherarchitekturen entwerfen, die mit Ihren Automatisierungsanforderungen wachsen
Das Fazit:
Speicher ist nicht nur ein technisches Feature – es ist das, was Automatisierung von Transaktionsverarbeitung zu Beziehungsaufbau verwandelt. Wenn Ihre Workflows sich erinnern, werden sie mehr als Werkzeuge; sie werden Partner.
Die Infrastruktur ist bereit, die Muster sind bewährt, und die Möglichkeit ist klar. Es ist Zeit, Ihren KI-Agenten das Geschenk des Speichers zu geben.
Zusätzliche Ressourcen
Offizielle Dokumentation
Community-Ressourcen
Tools und Bibliotheken
- ioredis - Redis-Client für Node.js
- pgvector - Vektorähnlichkeitssuche für PostgreSQL
- LangChain Memory - Speicherabstraktionen für LLMs
Bereit, speicheraktivierte Workflows für Ihr Unternehmen zu entwickeln? Kontaktieren Sie Tropical Media für Expertise bei Beratung und Implementierung.
Tags: KI-Speicher, n8n, Kontext-Persistenz, Redis, PostgreSQL, Vector Stores, Workflow-Automatisierung, RAG, KI-Agenten, Chat-Speicher, Zustandsverwaltung, Produktionsmuster
n8n as Code: Infrastructure as Code für Workflow-Automatisierung mit GitOps
Meistern Sie n8n-as-code, um Ihre Workflows zu versionieren, GitOps-Deployment-Pipelines zu implementieren und Automatisierung als Infrastructure zu behandeln. Lernen Sie, wie Sie n8n mit Git synchronisieren, CI/CD für Workflows aufbauen und Team-Kollaboration im großen Maßstab ermöglichen.
n8n Security Hardening Guide: Schutz Ihrer Workflows vor Webhook-Ausnutzung und KI-Automatisierungsbedrohungen
Umfassender n8n Security Hardening Guide mit Webhook-Schutz, Authentifizierungsstrategien, Secret-Management und Verteidigung gegen neue Bedrohungen. Lernen Sie produktionsreife Sicherheitsmuster zum Schutz Ihrer KI-Automatisierungsinfrastruktur.