Skip to content

Add a CLI, fix compaction #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ First, you'll need to download this repository. After you've downloaded it, you
#### MCP Server
The MCP server can run in either SSE mode or stdio:
```bash
python -m agent_memory_server.mcp <sse|stdio>
agent-memory mcp --mode <sse|stdio>
```

**NOTE:** With uv, just prefix the command with `uv`, e.g.: `uv run python -m agent_memory_server.mcp sse`.
**NOTE:** With uv, prefix the command with `uv`, e.g.: `uv run agent-memory --mode sse`. If you installed from source, you'll probably need to add `--directory` to tell uv where to find the code: `uv run --directory <path/to/checkout> run agent-memory --mode stdio`.

### Docker Compose

Expand Down Expand Up @@ -332,12 +332,12 @@ uv sync --all-extras

3. Run the API server:
```bash
python -m agent_memory_server.main
agent-memory api
```

4. In a separate terminal, run the MCP server (use either the "stdio" or "sse" options to set the running mode):
4. In a separate terminal, run the MCP server (use either the "stdio" or "sse" options to set the running mode) if you want to test with tools like Cursor or Claude:
```bash
python -m agent_memory_server.mcp [stdio|sse]
agent-memory mcp --mode <stdio|sse>
```

### Running Tests
Expand Down Expand Up @@ -374,7 +374,12 @@ The memory compaction functionality optimizes storage by merging duplicate and s

### Running Compaction

Currently, memory compaction is only available as a function in `agent_memory_server.long_term_memory.compact_long_term_memories`. You can run it manually or trigger it (manually, via code) to run as a background task.
Memory compaction is available as a task function in `agent_memory_server.long_term_memory.compact_long_term_memories`. You can trigger it manually
by running the `agent-memory schedule-task` command:

```bash
agent-memory schedule-task "agent_memory_server.long_term_memory.compact_long_term_memories"
```

### Key Features

Expand Down
44 changes: 22 additions & 22 deletions agent_memory_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import datetime
import importlib
import logging
import sys

import click
Expand Down Expand Up @@ -39,17 +40,8 @@ def version():
@click.option("--reload", is_flag=True, help="Enable auto-reload")
def api(port: int, host: str, reload: bool):
"""Run the REST API server."""
import asyncio

from agent_memory_server.main import app, on_start_logger

async def setup_redis():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Run the async setup
asyncio.run(setup_redis())

on_start_logger(port)
uvicorn.run(
app,
Expand All @@ -61,8 +53,13 @@ async def setup_redis():

@cli.command()
@click.option("--port", default=settings.mcp_port, help="Port to run the MCP server on")
@click.option("--sse", is_flag=True, help="Run the MCP server in SSE mode")
def mcp(port: int, sse: bool):
@click.option(
"--mode",
default="stdio",
help="Run the MCP server in SSE or stdio mode",
type=click.Choice(["stdio", "sse"]),
)
def mcp(port: int, mode: str):
"""Run the MCP server."""
import asyncio

Expand All @@ -73,25 +70,28 @@ def mcp(port: int, sse: bool):
from agent_memory_server.mcp import mcp_app

async def setup_and_run():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)
# Redis setup is handled by the MCP app before it starts

# Run the MCP server
if sse:
if mode == "sse":
logger.info(f"Starting MCP server on port {port}\n")
await mcp_app.run_sse_async()
else:
elif mode == "stdio":
# Try to force all logging to stderr because stdio-mode MCP servers
# use standard output for the protocol.
logging.basicConfig(
level=settings.log_level,
stream=sys.stderr,
force=True, # remove any existing handlers
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
await mcp_app.run_stdio_async()
else:
raise ValueError(f"Invalid mode: {mode}")

# Update the port in settings
settings.mcp_port = port

click.echo(f"Starting MCP server on port {port}")

if sse:
click.echo("Running in SSE mode")
else:
click.echo("Running in stdio mode")

asyncio.run(setup_and_run())


Expand Down
4 changes: 4 additions & 0 deletions agent_memory_server/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from typing import Literal

from dotenv import load_dotenv
from pydantic_settings import BaseSettings
Expand Down Expand Up @@ -34,5 +35,8 @@ class Settings(BaseSettings):
docket_name: str = "memory-server"
use_docket: bool = True

# Other Application settings
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"


settings = Settings()
18 changes: 17 additions & 1 deletion agent_memory_server/logging.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import logging
import sys

import structlog

from agent_memory_server.config import settings


_configured = False

Expand All @@ -10,14 +15,25 @@ def configure_logging():
if _configured:
return

# Configure standard library logging based on settings.log_level
level = getattr(logging, settings.log_level.upper(), logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
logging.basicConfig(level=level, handlers=[handler], format="%(message)s")

# Configure structlog with processors honoring the log level and structured output
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
_configured = True

Expand Down
97 changes: 47 additions & 50 deletions agent_memory_server/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,6 @@ async def compact_long_term_memories(
f"semantic_duplicates={compact_semantic_duplicates}"
)

# Get all memory keys using scan
memory_keys = []
pattern = "memory:*"
# Scan for memory keys
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match=pattern, count=limit)
memory_keys.extend(keys)
if cursor == 0 or len(memory_keys) >= limit:
break

# Build filters for memory queries
filters = []
if namespace:
Expand Down Expand Up @@ -372,21 +361,30 @@ async def compact_long_term_memories(
if compact_semantic_duplicates:
logger.info("Starting semantic duplicate compaction")
try:
# Check if the index exists before proceeding
# Get the correct index name
index_name = Keys.search_index_name()
logger.info(
f"Using index '{index_name}' for semantic duplicate compaction."
)

# Check if the index exists before proceeding
try:
await redis_client.execute_command(f"FT.INFO {index_name}")
except Exception as info_e:
if "unknown index name" in str(info_e).lower():
# Index doesn't exist, create it
logger.info(f"Search index {index_name} doesn't exist, creating it")
await ensure_search_index_exists(redis_client)
# Ensure 'get_search_index' is called with the correct name to create it if needed
await ensure_search_index_exists(
redis_client, index_name=index_name
)
else:
logger.warning(f"Error checking index: {info_e}")
logger.warning(
f"Error checking index '{index_name}': {info_e} - attempting to proceed."
)

# Get all memories matching the filters
index = get_search_index(redis_client)
query_str = filter_str if filter_str != "*" else ""
# Get all memories matching the filters, using the correct index name
index = get_search_index(redis_client, index_name=index_name)
query_str = filter_str if filter_str != "*" else "*"

# Create a query to get all memories
q = Query(query_str).paging(0, limit)
Expand Down Expand Up @@ -509,10 +507,9 @@ async def compact_long_term_memories(
if filter_expression:
vector_query.set_filter(filter_expression)

# Execute the vector search
similar_results = None
# Execute the vector search using the AsyncSearchIndex
try:
similar_results = await index.search(vector_query)
vector_search_result = await index.search(vector_query)
except Exception as e:
logger.error(
f"Error in vector search for memory {memory_id}: {e}"
Expand All @@ -521,14 +518,14 @@ async def compact_long_term_memories(

# Filter out the current memory and already processed memories
similar_memories = []
if similar_results:
for doc in similar_results.docs:
similar_id = doc.id.replace("memory:", "")
if (
similar_id != memory_id
and similar_id not in processed_ids
):
similar_memories.append(doc)
for doc in getattr(vector_search_result, "docs", []):
# Extract the ID field safely
similar_id = safe_get(doc, "id_").replace("memory:", "")
if (
similar_id != memory_id
and similar_id not in processed_ids
):
similar_memories.append(doc)

# If we found similar memories, merge them
if similar_memories:
Expand All @@ -541,7 +538,7 @@ async def compact_long_term_memories(
similar_memory_keys = []

for similar_memory in similar_memories:
similar_id = similar_memory.id.replace(
similar_id = similar_memory["id_"].replace(
"memory:", ""
)
similar_key = Keys.memory_key(
Expand All @@ -552,30 +549,30 @@ async def compact_long_term_memories(
# Get similar memory data with error handling
similar_data = {}
try:
# Use pipeline for Redis operations - only await the execute() method
pipeline = redis_client.pipeline()
pipeline.hgetall(similar_key)
# Execute the pipeline and await the result
similar_data_raw = await pipeline.execute()
similar_data_raw = await redis_client.hgetall(
similar_key # type: ignore
)

if similar_data_raw and similar_data_raw[0]:
# hgetall returns a dict of field to value
if similar_data_raw:
# Convert from bytes to strings
similar_data = {
k.decode()
if isinstance(k, bytes)
else k: v.decode()
if isinstance(v, bytes)
and k != b"vector"
else v
for k, v in similar_data_raw[0].items()
(
k.decode()
if isinstance(k, bytes)
else k
): (
v.decode()
if isinstance(v, bytes)
else v
)
for k, v in similar_data_raw.items()
}
similar_memory_data_list.append(
similar_data
)
similar_memory_keys.append(similar_key)
processed_ids.add(
similar_id
) # Mark as processed
similar_memory_data_list.append(similar_data)
similar_memory_keys.append(similar_key)
processed_ids.add(
similar_id
) # Mark as processed
except Exception as e:
logger.error(
f"Error retrieving similar memory {similar_id}: {e}"
Expand Down
Loading