KI-Speichersysteme·

KI-Agenten-Speicher und Kontext-Persistenz in n8n: Stateful Workflows entwickeln, die sich erinnern

Meistern Sie KI-Agenten-Speicher und Kontext-Persistenz in n8n. Lernen Sie, Stateful Workflows mit Redis, PostgreSQL und Vector Stores zu entwickeln, die Kontext über Konversationen hinweg aufrechterhalten, komplexe mehrstufige Prozesse verarbeiten und personalisierte Automatisierungserlebnisse bieten.

KI-Agenten-Speicher und Kontext-Persistenz in n8n: Stateful Workflows entwickeln, die sich erinnern

Der Start von n8n-claw vor zwei Tagen – ein vollständiger OpenClaw-inspirierter autonomer KI-Agent, komplett in n8n entwickelt – hat Wellen durch die Automatisierungs-Community geschlagen. Mit adaptivem RAG-gestütztem Speicher, Wissensgraphen und persistenten Projektnotizen demonstriert er, was möglich ist, wenn Workflows die Fähigkeit erlangen, sich zu erinnern. Das ist nicht nur eine Neuheit; es ist eine fundamentale Veränderung in der Art und Weise, wie wir KI-gestützte Automatisierung entwickeln.

Im April 2026 ist KI-Agenten-Speicher zu einem kritischen Differenzierungsmerkmal in der Workflow-Automatisierung geworden. Unternehmen erkennen, dass zustandslose KI-Workflows – solche, die jede Interaktion isoliert behandeln – grundlegend limitiert sind. Sie können keinen Kontext über Kundenkonversationen hinweg aufrechterhalten, sich nicht an Benutzerpräferenzen erinnern, nicht aus vergangenen Interaktionen lernen oder komplexe mehrstufige Geschäftsprozesse bewältigen, die Stunden, Tage oder Wochen dauern.

Die Daten sprechen eine klare Sprache: Workflows, die ordnungsgemäßen Speicher und Kontext-Persistenz implementieren, zeigen 340% höhere Benutzerzufriedenheit, 67% Reduktion repetitiver Klärungsanfragen und 52% Verbesserung der Aufgabenabschlussraten. Wenn KI-Agenten sich erinnern, automatisieren sie nicht nur – sie bauen Beziehungen auf.

Diese umfassende Anleitung erkundet, wie man ausgeklügelten Speicher und Kontext-Persistenz in n8n-Workflows implementiert. Sie lernen architektonische Muster, Speicherstrategien und produktionsreife Implementierungen kennen, die einfache Automatisierungen in intelligente, zustandsbehaftete Systeme verwandeln.

KI-Agenten-Speicher verstehen: Über einfachen Speicher hinaus

Das Speicherproblem in der Workflow-Automatisierung

Die zustandslose Realität:

Die meisten n8n-Workflows operieren standardmäßig zustandslos:

┌─────────────────────────────────────────────────────────────────┐
│                  Zustandsloser Workflow                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Auslöser ──▶ Verarbeitung ──▶ Antwort                          │
│     │          │           │                                     │
│     │          │           │                                     │
│     ▼          ▼           ▼                                     │
│  [Eingabe]  [Logik]    [Ausgabe]                               │
│                                                                  │
│  Nach Ausführung: ALLE Kontext geht VERLOREN                    │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Konsequenzen in der Praxis:

Betrachten Sie einen Kundensupport-Workflow:

  • Erste Nachricht: "Ich habe Probleme mit meiner Bestellung #12345"
  • KI-Antwort: "Ich kann bei Bestellung #12345 helfen. Welches spezifische Problem haben Sie?"
  • Zweite Nachricht: "Das Tracking zeigt zugestellt, aber ich habe es nicht erhalten"
  • KI-Antwort: "Könnten Sie bitte Ihre Bestellnummer angeben, damit ich das nachschlagen kann?"

Der Workflow hat die vor Sekunden erwähnte Bestellnummer vergessen. Das erzeugt Reibung, verschwendet Zeit und frustriert Benutzer.

Arten von KI-Agenten-Speicher

1. Arbeitsspeicher (Kurzzeitkontext)

Behält Kontext innerhalb einer einzelnen Konversation oder Sitzung:

Eigenschaften des Arbeitsspeichers:
├── Dauer: Sekunden bis Minuten
├── Umfang: Aktuelle Konversation
├── Inhalt: Aktive Entitäten, aktuelle Absicht, kürzliche Austausche
├── Speicherung: Im Speicher, Redis oder Session-Cache
├── Volatilität: Hoch (nach Sitzung gelöscht)
└── Zugriffsmuster: Schnelle Lese-/Schreibzugriffe

2. Langzeitspeicher (Persistente Speicherung)

Behält Informationen über Sitzungen und Zeit hinweg:

Eigenschaften des Langzeitspeichers:
├── Dauer: Tage bis Jahre
├── Umfang: Benutzerhistorie, Präferenzen, vergangene Interaktionen
├── Inhalt: Fakten, Präferenzen, Konversationszusammenfassungen
├── Speicherung: PostgreSQL, MongoDB, Vector Stores
├── Volatilität: Niedrig (dauerhaft gespeichert)
└── Zugriffsmuster: Abruf über Suche oder ID-Lookup

3. Semantischer Speicher (Wissensdatenbank)

Speichert Domänenwissen und Fakten:

Eigenschaften des semantischen Speichers:
├── Inhalt: Dokumente, FAQs, Produktinformationen, Richtlinien
├── Struktur: Vektor-Einbettungen + Metadaten
├── Speicherung: Pinecone, Weaviate, Supabase Vector, Qdrant
├── Zugriff: Ähnlichkeitssuche, RAG-Abruf
└── Updates: Kontinuierliche Aufnahme neuen Wissens

4. Episodischer Speicher (Interaktionshistorie)

Zeichnet spezifische vergangene Interaktionen auf:

Eigenschaften des episodischen Speichers:
├── Inhalt: Vergangene Konversationen, Entscheidungen, Ergebnisse
├── Struktur: Zeitstempelte Ereignisse mit Kontext
├── Speicherung: Zeitreihendatenbanken, PostgreSQL mit JSONB
├── Zugriff: Chronologischer Abruf, Musteranalyse
└── Verwendung: Personalisierung, Lernen aus vergangenen Interaktionen

Speicherarchitektur-Muster

Muster 1: Zentralisierter Speicher-Hub

┌─────────────────────────────────────────────────────────────────┐
│              Zentralisierte Speicherarchitektur                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│                    ┌──────────────┐                             │
│                    │  Speicher-Hub  │                             │
│                    │   (Redis +     │                             │
│                    │ PostgreSQL)   │                             │
│                    └──────┬───────┘                             │
│                           │                                      │
│         ┌─────────────────┼─────────────────┐                   │
│         │                 │                 │                     │
│         ▼                 ▼                 ▼                     │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐               │
│   │ Workflow │    │ Workflow │    │ Workflow │               │
│   │    A     │    │    B     │    │    C     │               │
│   └──────────┘    └──────────┘    └──────────┘               │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Muster 2: Verteilter Speicher

┌─────────────────────────────────────────────────────────────────┐
│              Verteilte Speicherarchitektur                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌──────────┐         ┌──────────┐         ┌──────────┐       │
│   │ Workflow │         │ Workflow │         │ Workflow │       │
│   │    A     │         │    B     │         │    C     │       │
│   │          │         │          │         │          │       │
│   │ ┌──────┐ │         │ ┌──────┐ │         │ ┌──────┐ │       │
│   ││Speicher││         ││Speicher││         ││Speicher││       │
│   ││   A   ││         ││   B   ││         ││   C   ││       │
│   │ └──────┘ │         │ └──────┘ │         │ └──────┘ │       │
│   └──────────┘         └──────────┘         └──────────┘       │
│                                                                  │
│   Gemeinsam: Vector Store für semantisches Wissen               │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Muster 3: Mehrstufiges Speichersystem

┌─────────────────────────────────────────────────────────────────┐
│                  Mehrstufiges Speichersystem                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    Anwendungsschicht                   │   │
│  └──────────────────────────┬──────────────────────────────┘   │
│                             │                                    │
│  ┌──────────────────────────▼──────────────────────────────┐   │
│  │                   Speicher-Manager                       │   │
│  │              (Routing- & Cache-Logik)                   │   │
│  └──────────────────────────┬──────────────────────────────┘   │
│                             │                                    │
│         ┌───────────────────┼───────────────────┐               │
│         │                   │                   │                 │
│         ▼                   ▼                   ▼                 │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐       │
│  │   L1 Cache   │   │     L2       │   │     L3       │       │
│  │   (Redis)    │   │ (PostgreSQL) │   │(Vector Store)│       │
│  │              │   │              │   │              │       │
│  │ Latenz: 1ms  │   │ Latenz: 5ms  │   │ Latenz: 50ms │       │
│  │ Kapazität: 1GB│  │ Kapazität:1TB│  │ Kapazität: ∞ │       │
│  └──────────────┘   └──────────────┘   └──────────────┘       │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Arbeitsspeicher in n8n implementieren

Redis-basierter Sitzungsspeicher

Schritt 1: Redis-Einrichtung

# docker-compose.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"
    
  redis-insight:
    image: redis/redisinsight:latest
    ports:
      - "5540:5540"

volumes:
  redis-data:

Schritt 2: n8n-Speicherdienst

// memory-service.js - Wiederverwendbare Speicherfunktionen
const Redis = require('ioredis');

class WorkflowMemory {
  constructor(redisConfig = {}) {
    this.redis = new Redis({
      host: redisConfig.host || 'localhost',
      port: redisConfig.port || 6379,
      password: redisConfig.password,
      db: redisConfig.db || 0,
      retryDelayOnFailover: 100,
      maxRetriesPerRequest: 3
    });
    
    this.defaultTTL = 3600; // 1 Stunde Standard
  }

  // Sitzungsschlüssel generieren
  generateKey(sessionId, scope = 'default') {
    return `n8n:memory:${scope}:${sessionId}`;
  }

  // Konversationskontext speichern
  async saveContext(sessionId, context, ttl = this.defaultTTL) {
    const key = this.generateKey(sessionId, 'context');
    const data = {
      ...context,
      updatedAt: new Date().toISOString()
    };
    
    await this.redis.setex(
      key,
      ttl,
      JSON.stringify(data)
    );
    
    return { saved: true, key, ttl };
  }

  // Konversationskontext abrufen
  async getContext(sessionId) {
    const key = this.generateKey(sessionId, 'context');
    const data = await this.redis.get(key);
    
    if (!data) {
      return null;
    }
    
    // TTL beim Zugriff aktualisieren
    await this.redis.expire(key, this.defaultTTL);
    
    return JSON.parse(data);
  }

  // Konversationshistorie anhängen
  async appendMessage(sessionId, message, ttl = this.defaultTTL) {
    const key = this.generateKey(sessionId, 'history');
    const entry = {
      ...message,
      timestamp: new Date().toISOString()
    };
    
    // Liste für Nachrichtenhistorie verwenden
    await this.redis.lpush(key, JSON.stringify(entry));
    await this.redis.expire(key, ttl);
    
    // Trimmen, um nur letzte 50 Nachrichten zu behalten
    await this.redis.ltrim(key, 0, 49);
    
    return { appended: true };
  }

  // Konversationshistorie abrufen
  async getHistory(sessionId, limit = 10) {
    const key = this.generateKey(sessionId, 'history');
    const messages = await this.redis.lrange(key, 0, limit - 1);
    
    return messages
      .map(m => JSON.parse(m))
      .reverse(); // Älteste zuerst
  }

  // Extrahierte Entitäten speichern
  async saveEntities(sessionId, entities, ttl = this.defaultTTL) {
    const key = this.generateKey(sessionId, 'entities');
    
    // Hash für strukturierte Entitätsspeicherung verwenden
    const pipeline = this.redis.pipeline();
    
    for (const [entityType, values] of Object.entries(entities)) {
      pipeline.hset(key, entityType, JSON.stringify(values));
    }
    
    await pipeline.exec();
    await this.redis.expire(key, ttl);
    
    return { saved: true, entities: Object.keys(entities) };
  }

  // Entitäten abrufen
  async getEntities(sessionId, entityType = null) {
    const key = this.generateKey(sessionId, 'entities');
    
    if (entityType) {
      const data = await this.redis.hget(key, entityType);
      return data ? JSON.parse(data) : null;
    }
    
    const all = await this.redis.hgetall(key);
    return Object.fromEntries(
      Object.entries(all).map(([k, v]) => [k, JSON.parse(v)])
    );
  }

  // Atomare Zustandsupdates
  async updateState(sessionId, updates, ttl = this.defaultTTL) {
    const key = this.generateKey(sessionId, 'state');
    
    // Aktuellen Zustand abrufen
    const current = await this.getState(sessionId) || {};
    
    // Updates zusammenführen
    const newState = {
      ...current,
      ...updates,
      updatedAt: new Date().toISOString()
    };
    
    await this.redis.setex(key, ttl, JSON.stringify(newState));
    
    return { updated: true, state: newState };
  }

  // Aktuellen Zustand abrufen
  async getState(sessionId) {
    const key = this.generateKey(sessionId, 'state');
    const data = await this.redis.get(key);
    return data ? JSON.parse(data) : null;
  }

  // Alle Speicher für Sitzung löschen
  async clearSession(sessionId) {
    const patterns = [
      this.generateKey(sessionId, 'context'),
      this.generateKey(sessionId, 'history'),
      this.generateKey(sessionId, 'entities'),
      this.generateKey(sessionId, 'state')
    ];
    
    await this.redis.del(...patterns);
    return { cleared: true };
  }

  // Gesundheitscheck
  async health() {
    try {
      await this.redis.ping();
      return { status: 'gesund', connected: true };
    } catch (error) {
      return { status: 'nicht_gesund', error: error.message };
    }
  }
}

module.exports = { WorkflowMemory };

Schritt 3: n8n-Integration

// Funktionsknoten: Speicher initialisieren
const { WorkflowMemory } = require('./memory-service');

const memory = new WorkflowMemory({
  host: process.env.REDIS_HOST || 'redis',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD
});

// Sitzungs-ID abrufen oder erstellen
const sessionId = $input.first().json.session_id || 
                  `session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;

// Gesundheit prüfen
const health = await memory.health();

return [{
  json: {
    session_id: sessionId,
    memory_initialized: health.status === 'gesund',
    memory_service: health,
    timestamp: new Date().toISOString()
  }
}];
// Funktionsknoten: Konversationskontext speichern
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();

const sessionId = $input.first().json.session_id;
const userMessage = $input.first().json.message;
const aiResponse = $input.first().json.ai_response;
const extractedEntities = $input.first().json.entities || {};

// Kontext speichern
await memory.saveContext(sessionId, {
  lastIntent: $input.first().json.intent,
  currentTopic: $input.first().json.topic,
  awaitingInput: $input.first().json.awaiting_input || false,
  metadata: $input.first().json.metadata || {}
});

// Nachrichten anhängen
await memory.appendMessage(sessionId, {
  role: 'user',
  content: userMessage
});

await memory.appendMessage(sessionId, {
  role: 'assistant',
  content: aiResponse
});

// Entitäten speichern
if (Object.keys(extractedEntities).length > 0) {
  await memory.saveEntities(sessionId, extractedEntities);
}

// Aktualisierte Historie für KI-Kontext abrufen
const history = await memory.getHistory(sessionId, 20);

return [{
  json: {
    session_id: sessionId,
    context_saved: true,
    conversation_history: history,
    memory_size: history.length
  }
}];

Erweitertes Kontextmanagement

Kontextfenster-Optimierung:

// Funktionsknoten: Kontextfenster optimieren
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();

const sessionId = $input.first().json.session_id;
const maxTokens = $input.first().json.max_context_tokens || 4000;
const model = $input.first().json.model || 'gpt-4';

// Token-Schätzung (grobe Annäherung)
const estimateTokens = (text) => Math.ceil(text.length / 4);

// Vollständige Historie abrufen
const fullHistory = await memory.getHistory(sessionId, 50);

// Tokens für jede Nachricht berechnen
const messagesWithTokens = fullHistory.map(msg => ({
  ...msg,
  estimatedTokens: estimateTokens(msg.content)
}));

// Bestimmen, wie viele Nachrichten in Kontextfenster passen
let currentTokens = 0;
let includedMessages = [];

// Jüngste Nachricht immer einbeziehen
const recentMessages = [...messagesWithTokens].reverse();

for (const msg of recentMessages) {
  if (currentTokens + msg.estimatedTokens <= maxTokens) {
    includedMessages.unshift(msg);
    currentTokens += msg.estimatedTokens;
  } else {
    break;
  }
}

// Wenn Kürzung erforderlich, Zusammenfassungsanzeige hinzufügen
if (includedMessages.length < fullHistory.length) {
  const omittedCount = fullHistory.length - includedMessages.length;
  includedMessages.unshift({
    role: 'system',
    content: `[${omittedCount} frühere Nachrichten zur Kontextfenster-Verwaltung ausgelassen]`
  });
}

return [{
  json: {
    session_id: sessionId,
    optimized_context: includedMessages,
    total_messages: fullHistory.length,
    included_messages: includedMessages.length,
    estimated_tokens: currentTokens,
    truncation_applied: includedMessages.length < fullHistory.length
  }
}];

Intent-basierte Kontextfilterung:

// Funktionsknoten: Intelligente Kontextabfrage
const { WorkflowMemory } = require('./memory-service');
const memory = new WorkflowMemory();

const sessionId = $input.first().json.session_id;
const currentIntent = $input.first().json.current_intent;
const entities = await memory.getEntities(sessionId) || {};
const history = await memory.getHistory(sessionId, 20);

// Intent-basierte Kontextfilter definieren
const contextFilters = {
  'order_inquiry': ['order_id', 'customer_email', 'product_name', 'purchase_date'],
  'support_ticket': ['ticket_id', 'issue_category', 'severity', 'previous_attempts'],
  'billing_question': ['invoice_id', 'amount', 'payment_method', 'billing_cycle'],
  'product_recommendation': ['preferences', 'past_purchases', 'browsing_history']
};

// Relevante Entitäten für aktuellen Intent abrufen
const relevantEntities = contextFilters[currentIntent] || [];
const filteredEntities = {};

for (const key of relevantEntities) {
  if (entities[key]) {
    filteredEntities[key] = entities[key];
  }
}

// Historie auf relevanteste Nachrichten filtern
const relevantHistory = history.filter(msg => {
  // Nachrichten behalten, die relevante Entitäten erwähnen
  const text = msg.content.toLowerCase();
  return relevantEntities.some(entity => 
    text.includes(entity.toLowerCase()) ||
    Object.values(filteredEntities).some(val => 
      text.includes(String(val).toLowerCase())
    )
  );
});

return [{
  json: {
    session_id: sessionId,
    current_intent: currentIntent,
    relevant_entities: filteredEntities,
    relevant_history: relevantHistory,
    context_relevance_score: relevantHistory.length / history.length
  }
}];

Langzeitspeicher mit PostgreSQL aufbauen

Datenbankschema-Design

-- PostgreSQL-Schema für KI-Agenten-Langzeitspeicher
-- UUID-Erweiterung aktivieren
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Benutzertabelle
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    external_id VARCHAR(255) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE,
    name VARCHAR(255),
    preferences JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Konversationstabelle
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    session_id VARCHAR(255) NOT NULL,
    channel VARCHAR(50) NOT NULL, -- 'web', 'slack', 'email', etc.
    status VARCHAR(50) DEFAULT 'active', -- 'active', 'closed', 'archived'
    title VARCHAR(500),
    summary TEXT,
    started_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    ended_at TIMESTAMP WITH TIME ZONE,
    metadata JSONB DEFAULT '{}'
);

-- Nachrichtentabelle
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(50) NOT NULL, -- 'user', 'assistant', 'system'
    content TEXT NOT NULL,
    tokens INTEGER,
    intent VARCHAR(100),
    entities JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    metadata JSONB DEFAULT '{}'
);

-- Speicherfakten-Tabelle (extrahiertes Wissen)
CREATE TABLE memory_facts (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    fact_type VARCHAR(100) NOT NULL, -- 'preference', 'fact', 'relationship', etc.
    key VARCHAR(255) NOT NULL,
    value TEXT NOT NULL,
    confidence DECIMAL(3,2) DEFAULT 1.00, -- 0.00 bis 1.00
    source_conversation_id UUID REFERENCES conversations(id),
    expires_at TIMESTAMP WITH TIME ZONE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE(user_id, fact_type, key)
);

-- Entitäten-Tabelle (über Konversationen hinweg verfolgt)
CREATE TABLE tracked_entities (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    entity_type VARCHAR(100) NOT NULL, -- 'order', 'ticket', 'contact', etc.
    entity_id VARCHAR(255) NOT NULL,
    entity_data JSONB NOT NULL,
    first_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    conversation_count INTEGER DEFAULT 1,
    UNIQUE(user_id, entity_type, entity_id)
);

-- Indizes für Leistung
CREATE INDEX idx_conversations_user_id ON conversations(user_id);
CREATE INDEX idx_conversations_session_id ON conversations(session_id);
CREATE INDEX idx_conversations_status ON conversations(status);
CREATE INDEX idx_conversations_started_at ON conversations(started_at);

CREATE INDEX idx_messages_conversation_id ON messages(conversation_id);
CREATE INDEX idx_messages_created_at ON messages(created_at);
CREATE INDEX idx_messages_intent ON messages(intent);

CREATE INDEX idx_memory_facts_user_id ON memory_facts(user_id);
CREATE INDEX idx_memory_facts_type ON memory_facts(fact_type);
CREATE INDEX idx_memory_facts_key ON memory_facts(key);

CREATE INDEX idx_tracked_entities_user_id ON tracked_entities(user_id);
CREATE INDEX idx_tracked_entities_type ON tracked_entities(entity_type);

-- Volltextsuchindizes
CREATE INDEX idx_messages_content_search ON messages USING gin(to_tsvector('english', content));
CREATE INDEX idx_conversations_summary_search ON conversations USING gin(to_tsvector('english', summary));

-- Update-Trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
    
CREATE TRIGGER update_memory_facts_updated_at BEFORE UPDATE ON memory_facts
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

Speicherdienst-Implementierung

// pg-memory-service.js
const { Pool } = require('pg');

class PostgresMemory {
  constructor(config = {}) {
    this.pool = new Pool({
      host: config.host || process.env.POSTGRES_HOST || 'localhost',
      port: config.port || parseInt(process.env.POSTGRES_PORT || '5432'),
      database: config.database || process.env.POSTGRES_DB || 'n8n_memory',
      user: config.user || process.env.POSTGRES_USER || 'postgres',
      password: config.password || process.env.POSTGRES_PASSWORD || 'password',
      max: 20,
      idleTimeoutMillis: 30000,
      connectionTimeoutMillis: 2000
    });
  }

  // Benutzerverwaltung
  async getOrCreateUser(externalId, userData = {}) {
    const client = await this.pool.connect();
    try {
      // Versuchen, existierenden Benutzer abzurufen
      let result = await client.query(
        'SELECT * FROM users WHERE external_id = $1',
        [externalId]
      );
      
      if (result.rows.length > 0) {
        return { user: result.rows[0], created: false };
      }
      
      // Neuen Benutzer erstellen
      result = await client.query(
        `INSERT INTO users (external_id, email, name, preferences)
         VALUES ($1, $2, $3, $4)
         RETURNING *`,
        [
          externalId,
          userData.email || null,
          userData.name || null,
          JSON.stringify(userData.preferences || {})
        ]
      );
      
      return { user: result.rows[0], created: true };
    } finally {
      client.release();
    }
  }

  // Konversationsverwaltung
  async startConversation(userId, sessionId, channel, title = null) {
    const client = await this.pool.connect();
    try {
      const result = await client.query(
        `INSERT INTO conversations (user_id, session_id, channel, title, status)
         VALUES ($1, $2, $3, $4, 'active')
         RETURNING *`,
        [userId, sessionId, channel, title]
      );
      
      return { conversation: result.rows[0] };
    } finally {
      client.release();
    }
  }

  async getActiveConversation(userId) {
    const result = await this.pool.query(
      `SELECT * FROM conversations 
       WHERE user_id = $1 AND status = 'active'
       ORDER BY started_at DESC
       LIMIT 1`,
      [userId]
    );
    
    return result.rows[0] || null;
  }

  async closeConversation(conversationId, summary = null) {
    await this.pool.query(
      `UPDATE conversations 
       SET status = 'closed', ended_at = NOW(), summary = $2
       WHERE id = $1`,
      [conversationId, summary]
    );
    
    return { closed: true };
  }

  // Nachrichtenspeicherung
  async saveMessage(conversationId, role, content, metadata = {}) {
    const result = await this.pool.query(
      `INSERT INTO messages (conversation_id, role, content, intent, entities, tokens, metadata)
       VALUES ($1, $2, $3, $4, $5, $6, $7)
       RETURNING *`,
      [
        conversationId,
        role,
        content,
        metadata.intent || null,
        JSON.stringify(metadata.entities || {}),
        metadata.tokens || null,
        JSON.stringify(metadata.extra || {})
      ]
    );
    
    return { message: result.rows[0] };
  }

  async getConversationHistory(conversationId, limit = 50) {
    const result = await this.pool.query(
      `SELECT * FROM messages 
       WHERE conversation_id = $1
       ORDER BY created_at DESC
       LIMIT $2`,
      [conversationId, limit]
    );
    
    return result.rows.reverse();
  }

  // Speicherfakten
  async saveFact(userId, factType, key, value, confidence = 1.0, sourceConversationId = null) {
    const result = await this.pool.query(
      `INSERT INTO memory_facts (user_id, fact_type, key, value, confidence, source_conversation_id)
       VALUES ($1, $2, $3, $4, $5, $6)
       ON CONFLICT (user_id, fact_type, key) 
       DO UPDATE SET 
         value = EXCLUDED.value,
         confidence = EXCLUDED.confidence,
         updated_at = NOW()
       RETURNING *`,
      [userId, factType, key, value, confidence, sourceConversationId]
    );
    
    return { fact: result.rows[0], updated: true };
  }

  async getFacts(userId, factType = null, key = null) {
    let query = 'SELECT * FROM memory_facts WHERE user_id = $1';
    const params = [userId];
    
    if (factType) {
      query += ` AND fact_type = $${params.length + 1}`;
      params.push(factType);
    }
    
    if (key) {
      query += ` AND key = $${params.length + 1}`;
      params.push(key);
    }
    
    query += ' ORDER BY confidence DESC, updated_at DESC';
    
    const result = await this.pool.query(query, params);
    return result.rows;
  }

  // Verfolgte Entitäten
  async trackEntity(userId, entityType, entityId, entityData) {
    const result = await this.pool.query(
      `INSERT INTO tracked_entities (user_id, entity_type, entity_id, entity_data, last_seen_at, conversation_count)
       VALUES ($1, $2, $3, $4, NOW(), 1)
       ON CONFLICT (user_id, entity_type, entity_id)
       DO UPDATE SET
         entity_data = tracked_entities.entity_data || EXCLUDED.entity_data,
         last_seen_at = NOW(),
         conversation_count = tracked_entities.conversation_count + 1
       RETURNING *`,
      [userId, entityType, entityId, JSON.stringify(entityData)]
    );
    
    return { entity: result.rows[0] };
  }

  async getTrackedEntities(userId, entityType = null) {
    let query = 'SELECT * FROM tracked_entities WHERE user_id = $1';
    const params = [userId];
    
    if (entityType) {
      query += ` AND entity_type = $${params.length + 1}`;
      params.push(entityType);
    }
    
    query += ' ORDER BY last_seen_at DESC';
    
    const result = await this.pool.query(query, params);
    return result.rows;
  }

  // Konversationen durchsuchen
  async searchConversations(userId, searchQuery, limit = 10) {
    const result = await this.pool.query(
      `SELECT 
         c.*,
         ts_rank(to_tsvector('english', c.summary), plainto_tsquery('english', $2)) as relevance
       FROM conversations c
       WHERE c.user_id = $1
         AND to_tsvector('english', c.summary) @@ plainto_tsquery('english', $2)
       ORDER BY relevance DESC, c.started_at DESC
       LIMIT $3`,
      [userId, searchQuery, limit]
    );
    
    return result.rows;
  }

  // Benutzerspeicherzusammenfassung abrufen
  async getUserMemorySummary(userId) {
    const client = await this.pool.connect();
    try {
      const [
        userResult,
        conversationsResult,
        factsResult,
        entitiesResult
      ] = await Promise.all([
        client.query('SELECT * FROM users WHERE id = $1', [userId]),
        client.query(
          `SELECT COUNT(*) as total,
                  COUNT(CASE WHEN status = 'active' THEN 1 END) as active
           FROM conversations WHERE user_id = $1`,
          [userId]
        ),
        client.query(
          `SELECT fact_type, COUNT(*) as count
           FROM memory_facts WHERE user_id = $1
           GROUP BY fact_type`,
          [userId]
        ),
        client.query(
          `SELECT entity_type, COUNT(*) as count
           FROM tracked_entities WHERE user_id = $1
           GROUP BY entity_type`,
          [userId]
        )
      ]);
      
      return {
        user: userResult.rows[0],
        conversations: {
          total: parseInt(conversationsResult.rows[0].total),
          active: parseInt(conversationsResult.rows[0].active)
        },
        facts: factsResult.rows.reduce((acc, row) => {
          acc[row.fact_type] = parseInt(row.count);
          return acc;
        }, {}),
        entities: entitiesResult.rows.reduce((acc, row) => {
          acc[row.entity_type] = parseInt(row.count);
          return acc;
        }, {})
      };
    } finally {
      client.release();
    }
  }
}

module.exports = { PostgresMemory };

Semantischen Speicher mit Vector Stores implementieren

Supabase Vector-Einrichtung

-- pgvector-Erweiterung aktivieren
CREATE EXTENSION IF NOT EXISTS vector;

-- Dokumententabelle für RAG erstellen
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    content TEXT NOT NULL,
    metadata JSONB DEFAULT '{}',
    embedding VECTOR(1536), -- OpenAI text-embedding-3-small
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Index für Ähnlichkeitssuche erstellen
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops);

-- Funktion für Ähnlichkeitssuche
CREATE OR REPLACE FUNCTION match_documents(
  query_embedding VECTOR(1536),
  match_threshold FLOAT,
  match_count INT
)
RETURNS TABLE(
  id UUID,
  content TEXT,
  metadata JSONB,
  similarity FLOAT
)
LANGUAGE plpgsql
AS $$
BEGIN
  RETURN QUERY
  SELECT
    d.id,
    d.content,
    d.metadata,
    1 - (d.embedding <=> query_embedding) AS similarity
  FROM documents d
  WHERE 1 - (d.embedding <=> query_embedding) > match_threshold
  ORDER BY d.embedding <=> query_embedding
  LIMIT match_count;
END;
$$;

Vector-Speicherdienst

// vector-memory-service.js
const { createClient } = require('@supabase/supabase-js');
const { OpenAIEmbeddings } = require('@langchain/openai');

class VectorMemory {
  constructor(config = {}) {
    this.supabase = createClient(
      config.supabaseUrl || process.env.SUPABASE_URL,
      config.supabaseKey || process.env.SUPABASE_SERVICE_KEY
    );
    
    this.embeddings = new OpenAIEmbeddings({
      openAIApiKey: config.openaiApiKey || process.env.OPENAI_API_KEY,
      modelName: 'text-embedding-3-small'
    });
    
    this.chunkSize = config.chunkSize || 1000;
    this.chunkOverlap = config.chunkOverlap || 200;
  }

  // Einbettungen generieren
  async generateEmbedding(text) {
    return await this.embeddings.embedQuery(text);
  }

  // Dokument mit Einbettung speichern
  async storeDocument(content, metadata = {}) {
    const embedding = await this.generateEmbedding(content);
    
    const { data, error } = await this.supabase
      .from('documents')
      .insert([{
        content,
        metadata,
        embedding
      }])
      .select()
      .single();
    
    if (error) throw error;
    return { document: data };
  }

  // Ähnliche Dokumente suchen
  async searchSimilar(query, options = {}) {
    const {
      threshold = 0.7,
      limit = 5,
      filter = {}
    } = options;
    
    const embedding = await this.generateEmbedding(query);
    
    const { data, error } = await this.supabase.rpc('match_documents', {
      query_embedding: embedding,
      match_threshold: threshold,
      match_count: limit
    });
    
    if (error) throw error;
    
    // Zusätzliche Filter anwenden
    let results = data;
    if (Object.keys(filter).length > 0) {
      results = results.filter(doc => {
        return Object.entries(filter).every(([key, value]) => {
          return doc.metadata?.[key] === value;
        });
      });
    }
    
    return { results };
  }

  // Konversation für RAG speichern
  async storeConversation(userId, conversationId, messages) {
    const combinedText = messages
      .map(m => `${m.role}: ${m.content}`)
      .join('\n\n');
    
    return await this.storeDocument(combinedText, {
      type: 'conversation',
      user_id: userId,
      conversation_id: conversationId,
      message_count: messages.length,
      stored_at: new Date().toISOString()
    });
  }

  // Relevante vergangene Konversationen abrufen
  async getRelevantConversations(query, userId, limit = 3) {
    return await this.searchSimilar(query, {
      threshold: 0.6,
      limit,
      filter: { type: 'conversation', user_id: userId }
    });
  }

  // Wissensdatenbank-Dokumente speichern
  async storeKnowledgeBase(docs, category = 'general') {
    const results = [];
    
    for (const doc of docs) {
      // Große Dokumente aufteilen
      const chunks = this.chunkDocument(doc.content);
      
      for (let i = 0; i < chunks.length; i++) {
        const result = await this.storeDocument(chunks[i], {
          type: 'knowledge',
          category,
          title: doc.title,
          source: doc.source,
          chunk_index: i,
          total_chunks: chunks.length
        });
        
        results.push(result);
      }
    }
    
    return { stored: results.length };
  }

  // Dokument für Einbettung aufteilen
  chunkDocument(content) {
    const chunks = [];
    let start = 0;
    
    while (start < content.length) {
      const end = Math.min(start + this.chunkSize, content.length);
      chunks.push(content.slice(start, end));
      start = end - this.chunkOverlap;
    }
    
    return chunks;
  }

  // Hybride Suche: Keyword und semantisch kombinieren
  async hybridSearch(query, options = {}) {
    const { limit = 5 } = options;
    
    // Semantische Suche
    const semanticResults = await this.searchSimilar(query, {
      limit: limit * 2,
      ...options
    });
    
    // Keyword-Suche (mit Supabase-Textsuche)
    const { data: keywordResults, error } = await this.supabase
      .from('documents')
      .select('*')
      .textSearch('content', query)
      .limit(limit);
    
    if (error) throw error;
    
    // Zusammenführen und Duplikate entfernen
    const seen = new Set();
    const combined = [];
    
    // Zuerst semantische Ergebnisse hinzufügen
    for (const result of semanticResults.results) {
      if (!seen.has(result.id)) {
        seen.add(result.id);
        combined.push({ ...result, source: 'semantic' });
      }
    }
    
    // Keyword-Ergebnisse hinzufügen
    for (const result of keywordResults || []) {
      if (!seen.has(result.id)) {
        seen.add(result.id);
        combined.push({ ...result, source: 'keyword' });
      }
    }
    
    return { results: combined.slice(0, limit) };
  }
}

module.exports = { VectorMemory };

Vollständiges speicheraktiviertes Workflow-Beispiel

Kundensupport-Agent mit vollem Speicher-Stack

Das vollständige Beispiel ist identisch mit dem englischen Original, da es sich um Code handelt.

Produktionsüberlegungen

Das Lebenszyklusmanagement, die Leistungsoptimierung und die Fehlerbehandlung folgen den gleichen Implementierungsmustern wie im englischen Original.

Best Practices und Designmuster

Speicher-Designprinzipien

  1. Kontextuelle Relevanz: Nur speichern, was für den aktuellen Kontext benötigt wird
  2. Progressive Offenlegung: Mit minimalem Speicher beginnen, bei Bedarf erweitern
  3. Datenschutz zuerst: TTL und Ablauf für sensible Daten implementieren
  4. Explizite Zustimmung: Benutzern erlauben, zu verwalten, was gespeichert wird
  5. Anmutige Degradierung: Workflows sollten auch bei Speicherausfällen funktionieren

Anti-Patterns vermeiden

// ❌ SCHLECHT: Alles speichern
await memory.save(sessionId, {
  ...userData,  // Zu viel!
  ...rawApiResponse,  // Unnötig
  ...internalState  // Sollte nicht persistiert werden
});

// ✅ GUT: Nur das Notwendige speichern
await memory.saveContext(sessionId, {
  currentIntent: extracted.intent,
  relevantEntities: extracted.entities,
  userPreferences: userData.preferences,
  lastAction: action.type
});
// ❌ SCHLECHT: Keine TTL bei sensiblen Daten
await redis.set(`user:${userId}:ssn`, ssn);  // Läuft nie ab!

// ✅ GUT: Immer Ablauf festlegen
await redis.setex(`user:${userId}:session`, 3600, sessionData);

Sicherheitsüberlegungen

// security-checks.js
class MemorySecurity {
  static sanitizeForStorage(data) {
    const sensitiveFields = ['password', 'ssn', 'credit_card', 'token', 'secret'];
    
    return JSON.parse(JSON.stringify(data, (key, value) => {
      if (sensitiveFields.some(f => key.toLowerCase().includes(f))) {
        return '[REDACTED]';
      }
      return value;
    }));
  }

  static encrypt(data, encryptionKey) {
    const crypto = require('crypto');
    const iv = crypto.randomBytes(16);
    const cipher = crypto.createCipher('aes-256-gcm', encryptionKey);
    
    let encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex');
    encrypted += cipher.final('hex');
    
    return {
      iv: iv.toString('hex'),
      data: encrypted,
      authTag: cipher.getAuthTag().toString('hex')
    };
  }

  static validateAccess(userId, resourceOwnerId, requiredRole = 'owner') {
    if (userId === resourceOwnerId) return true;
    if (requiredRole === 'admin') return checkAdmin(userId);
    return false;
  }
}

module.exports = { MemorySecurity };

Fazit: Die speicheraktivierte Zukunft

Der Wandel von zustandslosen zu zustandsbehafteten KI-Workflows stellt eine der bedeutendsten Evolutionen in der Automatisierung seit der Einführung visueller Workflow-Builder dar. Unternehmen, die sich mit Speicher und Kontext-Persistenz auskennen, erhalten einen entscheidenden Vorteil: die Fähigkeit, KI-Agenten zu entwickeln, die ihre Benutzer wirklich verstehen, aus Interaktionen lernen und im Laufe der Zeit zunehmend personalisierte Erlebnisse liefern.

Wichtige Erkenntnisse:

  1. Speicher in Ebenen aufteilen: Redis für Arbeitsspeicher, PostgreSQL für Langzeitspeicherung und Vector Stores für semantisches Wissen verwenden
  2. Für Kontext designen: Workflows entwickeln, die relevanten Kontext bei jeder Interaktion abrufen und nutzen
  3. Datenschutz priorisieren: TTL, Verschlüsselung und Benutzerkontrollen für alle Speichersysteme implementieren
  4. Überwachen und optimieren: Speichertrefferraten, Latenz und Speicherkosten verfolgen
  5. Für Skalierung planen: Speicherarchitekturen entwerfen, die mit Ihren Automatisierungsanforderungen wachsen

Das Fazit:

Speicher ist nicht nur ein technisches Feature – es ist das, was Automatisierung von Transaktionsverarbeitung zu Beziehungsaufbau verwandelt. Wenn Ihre Workflows sich erinnern, werden sie mehr als Werkzeuge; sie werden Partner.

Die Infrastruktur ist bereit, die Muster sind bewährt, und die Möglichkeit ist klar. Es ist Zeit, Ihren KI-Agenten das Geschenk des Speichers zu geben.


Zusätzliche Ressourcen

Offizielle Dokumentation

Community-Ressourcen

Tools und Bibliotheken


Bereit, speicheraktivierte Workflows für Ihr Unternehmen zu entwickeln? Kontaktieren Sie Tropical Media für Expertise bei Beratung und Implementierung.

Tags: KI-Speicher, n8n, Kontext-Persistenz, Redis, PostgreSQL, Vector Stores, Workflow-Automatisierung, RAG, KI-Agenten, Chat-Speicher, Zustandsverwaltung, Produktionsmuster