Database Design Guide #2
Replies: 2 comments
-
Layer-by-Layer Implementation GuideLayer 1: Instructions (System Configuration & Core Directives)Database Type: NoSQL Document Store (MongoDB, DynamoDB, CosmosDB) Schema Design: {
"_id": "inst_v2.1.0_customer_support",
"version": "2.1.0",
"agent_id": "customer_support_v2",
"created_at": "2024-12-15T10:00:00Z",
"active": true,
"system_role": "You are a helpful customer service agent...",
"behavioral_constraints": [
"Never process payments directly",
"Escalate security concerns immediately",
"Maintain professional tone"
],
"capabilities": [
"Order status lookup",
"Return processing",
"FAQ responses"
],
"tone_and_style": "Professional, empathetic, solution-oriented"
} Implementation Notes:
Layer 2: User Info (Personalization & User Profile Context)Database Type: Hybrid SQL + Vector SQL Schema (PostgreSQL): -- Core user profile
CREATE TABLE user_profiles (
user_id UUID PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
language_preference VARCHAR(10) DEFAULT 'en',
technical_level VARCHAR(20) DEFAULT 'intermediate',
communication_preferences JSONB DEFAULT '{}'::jsonb
);
-- User preferences with history
CREATE TABLE user_preferences (
id SERIAL PRIMARY KEY,
user_id UUID REFERENCES user_profiles(user_id),
preference_key VARCHAR(100),
preference_value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_pref (user_id, preference_key)
); Vector Storage Component: # Store user interest embeddings for personalization
user_interest_embedding = {
"user_id": "user_123",
"embedding": generate_embedding(user_interaction_history),
"metadata": {
"last_updated": "2024-12-15",
"interaction_count": 145,
"primary_topics": ["data_science", "python", "automation"]
}
}
# Store in Pinecone, Qdrant, or Weaviate Layer 3: Curated Knowledge Context (Domain-Specific Grounding)Database Type: Vector Database (Pinecone, Qdrant, Weaviate) Implementation Pattern: from typing import List, Dict
import hashlib
class KnowledgeStore:
def __init__(self, vector_db_client):
self.db = vector_db_client
self.collection = "knowledge_base"
def add_document(self, text: str, metadata: Dict) -> str:
# Chunk document
chunks = self.chunk_text(text, max_tokens=500, overlap=50)
# Generate unique IDs
doc_id = hashlib.md5(text.encode()).hexdigest()
# Store each chunk
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}_{i}"
embedding = generate_embedding(chunk)
self.db.upsert(
collection=self.collection,
id=chunk_id,
values=embedding,
metadata={
**metadata,
"chunk_index": i,
"doc_id": doc_id,
"text": chunk # Store original text
}
)
return doc_id Metadata Schema: {
"source": "product_manual_v3.pdf",
"category": "technical_documentation",
"last_updated": "2024-12-01",
"access_level": "public",
"relevance_decay": 0.95, // For time-based relevance
"language": "en"
} Layer 4: Task/Goal State Context (Multi-Turn Task Management)Database Type: NoSQL with TTL (Redis, DynamoDB) Redis Implementation: import json
from datetime import datetime, timedelta
class TaskStateManager:
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 86400 # 24 hours
def save_task_state(self, user_id: str, task_data: dict, ttl: int = None):
key = f"task:{user_id}:{task_data['task_id']}"
task_state = {
**task_data,
"last_updated": datetime.utcnow().isoformat(),
"version": task_data.get("version", 1) + 1
}
self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(task_state)
)
# Add to user's active tasks set
self.redis.sadd(f"active_tasks:{user_id}", task_data['task_id']) Complex Task Structure: {
"task_id": "deploy_v2.3",
"user_id": "user_123",
"created_at": "2024-12-15T10:00:00Z",
"status": "in_progress",
"progress": {
"completed_steps": [
{"id": "code_review", "completed_at": "2024-12-15T11:00:00Z"},
{"id": "unit_tests", "completed_at": "2024-12-15T12:00:00Z"}
],
"current_step": "integration_tests",
"next_steps": ["security_review", "deployment"],
"blockers": ["pending_security_approval"]
},
"context": {
"repository": "main-app",
"branch": "release/v2.3",
"assigned_team": "platform"
}
} Layer 5: Chat History Summary (Long-Term Conversational Memory)Database Type: NoSQL + Vector Hybrid Document Structure: {
"_id": "summary_2024_12_15_user123",
"user_id": "user_123",
"session_ids": ["sess_001", "sess_002"],
"date_range": {
"start": "2024-12-15T09:00:00Z",
"end": "2024-12-15T17:00:00Z"
},
"summary": "User discussed migrating database from MySQL to PostgreSQL...",
"key_topics": [
{"topic": "database_migration", "weight": 0.8},
{"topic": "performance_optimization", "weight": 0.6}
],
"important_decisions": [
{
"decision": "Use pg_dump for data migration",
"timestamp": "2024-12-15T14:30:00Z",
"confidence": 0.9
}
],
"entities_mentioned": ["PostgreSQL", "AWS RDS", "pg_dump"],
"sentiment_trend": [0.6, 0.7, 0.8, 0.9], // Improving satisfaction
"vector_ref": "vec_summary_123" // Reference to vector DB
} Vector Storage for Semantic Search: # Store summary embeddings for similarity search
summary_vector = {
"id": "vec_summary_123",
"embedding": generate_embedding(summary_text),
"metadata": {
"user_id": "user_123",
"date": "2024-12-15",
"topic_tags": ["database", "migration", "postgresql"]
}
} Layer 6: Chat History (Recent Conversational Flow)Database Type: Time-Series NoSQL (Cassandra, InfluxDB, TimescaleDB) Cassandra Schema: CREATE TABLE chat_messages (
user_id UUID,
session_id UUID,
timestamp TIMESTAMP,
message_id UUID,
role TEXT, -- 'user', 'assistant', 'system'
content TEXT,
metadata MAP<TEXT, TEXT>,
PRIMARY KEY ((user_id, session_id), timestamp, message_id)
) WITH CLUSTERING ORDER BY (timestamp DESC, message_id ASC);
-- Separate table for session metadata
CREATE TABLE chat_sessions (
user_id UUID,
session_id UUID,
started_at TIMESTAMP,
last_activity TIMESTAMP,
message_count INT,
session_metadata MAP<TEXT, TEXT>,
PRIMARY KEY (user_id, session_id)
); Retention Policy: class ChatHistoryManager:
def __init__(self, db_client):
self.db = db_client
self.retention_days = 90
self.max_messages_per_session = 1000
def cleanup_old_messages(self):
cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
# Archive before deletion
old_messages = self.db.query(
"SELECT * FROM chat_messages WHERE timestamp < ?",
cutoff_date
)
self.archive_to_cold_storage(old_messages)
# Delete from hot storage
self.db.execute(
"DELETE FROM chat_messages WHERE timestamp < ?",
cutoff_date
) Layer 7: Tool Explanation (System Capabilities & Affordances)Database Type: SQL (PostgreSQL) Complete Schema: -- Tool definitions
CREATE TABLE tools (
tool_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(100) UNIQUE NOT NULL,
category VARCHAR(50),
description TEXT,
version VARCHAR(20) NOT NULL,
parameters JSONB NOT NULL,
response_schema JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
deprecated BOOLEAN DEFAULT FALSE,
INDEX idx_tool_category (category),
INDEX idx_tool_active (deprecated)
);
-- Tool permissions
CREATE TABLE tool_permissions (
permission_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tool_id UUID REFERENCES tools(tool_id),
role VARCHAR(50),
access_level VARCHAR(20) CHECK (access_level IN ('none', 'read', 'execute', 'admin')),
UNIQUE(tool_id, role)
);
-- Tool usage tracking
CREATE TABLE tool_usage_logs (
log_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tool_id UUID REFERENCES tools(tool_id),
user_id UUID,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_ms INT,
success BOOLEAN,
error_message TEXT,
INDEX idx_usage_time (executed_at),
INDEX idx_usage_user (user_id)
);
-- Memory management functions as special tools
INSERT INTO tools (name, category, description, version, parameters) VALUES
(
'search_recall_memory',
'memory_management',
'Search through conversation history',
'1.0.0',
'{
"type": "object",
"properties": {
"query": {"type": "string"},
"date_range": {"type": "object"},
"limit": {"type": "integer", "default": 10}
},
"required": ["query"]
}'::jsonb
); Layer 8: Function Call Results (Feedback from External Actions)Database Type: NoSQL Cache (Redis) Caching Strategy: import json
import hashlib
from typing import Optional, Any
class FunctionResultCache:
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 3600 # 1 hour
def _generate_cache_key(self, function_name: str, params: dict) -> str:
# Create deterministic cache key
param_str = json.dumps(params, sort_keys=True)
param_hash = hashlib.md5(param_str.encode()).hexdigest()
return f"func_result:{function_name}:{param_hash}"
def get_cached_result(self, function_name: str, params: dict) -> Optional[Any]:
key = self._generate_cache_key(function_name, params)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def cache_result(self, function_name: str, params: dict,
result: Any, ttl: Optional[int] = None):
key = self._generate_cache_key(function_name, params)
cache_data = {
"result": result,
"cached_at": datetime.utcnow().isoformat(),
"function": function_name,
"params": params
}
self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(cache_data)
) Cache Configuration by Function Type: CACHE_CONFIGS = {
"weather_api": {"ttl": 3600, "cache": True}, # 1 hour
"stock_price": {"ttl": 60, "cache": True}, # 1 minute
"user_data": {"ttl": 0, "cache": False}, # Never cache
"search_web": {"ttl": 86400, "cache": True}, # 24 hours
"send_email": {"ttl": 0, "cache": False} # Never cache
} Layer 9: Few-Shot Examples (Behavioral Guidance & Pattern Recognition)Database Type: Vector Database Dynamic Example Selection: class ExampleStore:
def __init__(self, vector_db):
self.db = vector_db
self.collection = "few_shot_examples"
def add_example(self, input_text: str, output_text: str,
category: str, metadata: dict = None):
example_id = f"ex_{hashlib.md5(input_text.encode()).hexdigest()[:8]}"
# Generate embedding from input
input_embedding = generate_embedding(input_text)
self.db.upsert(
collection=self.collection,
id=example_id,
values=input_embedding,
metadata={
"input_text": input_text,
"output_text": output_text,
"category": category,
"effectiveness_score": 0.5, # Initial score
"usage_count": 0,
"created_at": datetime.utcnow().isoformat(),
**(metadata or {})
}
)
def get_relevant_examples(self, query: str, k: int = 3,
category: str = None) -> List[dict]:
query_embedding = generate_embedding(query)
# Build filter
filter_dict = {}
if category:
filter_dict["category"] = category
# Search for similar examples
results = self.db.query(
collection=self.collection,
query_vector=query_embedding,
top_k=k * 2, # Get extra for filtering
filter=filter_dict
)
# Sort by effectiveness and recency
examples = sorted(
results,
key=lambda x: (
x["metadata"]["effectiveness_score"] * 0.7 +
x["score"] * 0.3 # Combine effectiveness with similarity
),
reverse=True
)[:k]
return examples Layer 10: Dynamic Output Formatting & Constraints (Immediate Response Specification)Database Type: NoSQL (Template Store) - Conditional Template Management: // Template document structure
{
"_id": "tmpl_email_support_response",
"name": "Email Support Response",
"channel": "email",
"format_type": "structured",
"template": {
"structure": [
{"type": "greeting", "template": "Dear {{customer_name}},"},
{"type": "acknowledgment", "template": "Thank you for contacting us regarding {{issue_summary}}."},
{"type": "body", "min_length": 100, "max_length": 500},
{"type": "next_steps", "required": true},
{"type": "signature", "template": "Best regards,\n{{agent_name}}\n{{company}} Support Team"}
]
},
"constraints": {
"total_max_length": 800,
"forbidden_phrases": ["unfortunately", "unable to help"],
"required_elements": ["greeting", "body", "signature"],
"tone": "professional_empathetic"
},
"variables": ["customer_name", "issue_summary", "agent_name", "company"],
"active": true,
"created_at": "2024-12-01T10:00:00Z"
} Layer 11: User's Latest Question (Immediate Input Trigger)Database Type: None (Ephemeral) This layer contains only the immediate input being processed. It's already captured in Layer 6 (Chat History) once processing completes. |
Beta Was this translation helpful? Give feedback.
-
Architecture Patterns1. Unified Memory Serviceclass UnifiedMemoryService:
def __init__(self):
self.sql_db = PostgreSQLClient() # Layers 2, 7
self.vector_db = QdrantClient() # Layers 3, 5, 9
self.nosql_db = MongoDBClient() # Layers 1, 4, 10
self.time_series_db = CassandraClient() # Layer 6
self.cache = RedisClient() # Layer 8
def prepare_context_window(self, user_id: str, query: str) -> dict:
"""Assemble all layers for the context window"""
# Parallel retrieval for efficiency
tasks = [
self.get_instructions(),
self.get_user_info(user_id),
self.retrieve_knowledge(query),
self.get_active_tasks(user_id),
self.get_chat_summary(user_id),
self.get_recent_chat(user_id),
self.get_tools(user_id),
self.get_cached_results(user_id),
self.get_examples(query),
self.get_output_format()
]
results = await asyncio.gather(*tasks)
return self.assemble_layers(*results, query) 2. Memory Hierarchy
3. Data Lifecycle Managementclass MemoryLifecycleManager:
def __init__(self):
self.hot_ttl = timedelta(hours=1)
self.warm_ttl = timedelta(days=30)
self.cool_ttl = timedelta(days=90)
def age_data(self):
"""Move data through storage tiers based on age and access patterns"""
# Hot → Warm: Active sessions to summaries
self.summarize_old_sessions()
# Warm → Cool: Compress and move to vector storage
self.compress_old_summaries()
# Cool → Cold: Archive to object storage
self.archive_old_data() Performance Optimization1. Caching Strategy# Multi-level caching
L1_CACHE = {} # In-memory (process-level)
L2_CACHE = Redis() # Distributed cache
L3_CACHE = CDN() # Edge cache for static content
def get_with_cache(key: str, fetcher: callable):
# Check L1
if key in L1_CACHE:
return L1_CACHE[key]
# Check L2
cached = L2_CACHE.get(key)
if cached:
L1_CACHE[key] = cached
return cached
# Fetch and cache
result = fetcher()
L2_CACHE.setex(key, 3600, result)
L1_CACHE[key] = result
return result 2. Batch Operations# Batch vector operations for efficiency
def batch_retrieve_knowledge(queries: List[str], batch_size: int = 100):
embeddings = generate_embeddings_batch(queries)
results = []
for i in range(0, len(embeddings), batch_size):
batch = embeddings[i:i + batch_size]
batch_results = vector_db.search_batch(batch)
results.extend(batch_results)
return results Monitoring and ObservabilityKey Metrics to TrackMETRICS = {
"latency": {
"vector_search_p50": 5, # ms
"vector_search_p99": 50,
"sql_query_p50": 2,
"sql_query_p99": 20,
"context_assembly_p50": 100,
"context_assembly_p99": 500
},
"throughput": {
"queries_per_second": 1000,
"writes_per_second": 500
},
"storage": {
"vector_db_size_gb": 100,
"sql_db_size_gb": 50,
"cache_hit_rate": 0.85
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Overview
This guide provides recommendations for implementing persistent storage for each layer of the Context Window Architecture (CWA). It analyzes which layers benefit from database storage, the optimal database types, and practical implementation patterns.
Quick Reference Table
Optimization Strategies
Implementation Checklist
Phase 1: Core Storage
Phase 2: Vector Search
Phase 3: Time-Series & NoSQL
Phase 4: Integration & Optimization
Conclusion
The CWA's effectiveness depends heavily on proper database architecture. By following these guidelines:
Start with the core layers that provide immediate value for your use case, then expand to additional layers as your system matures.
Beta Was this translation helpful? Give feedback.
All reactions