Skip to content

Add a CLI #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 87 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,91 @@ Agent Memory Server offers an MCP (Model Context Protocol) server interface powe
- **search_memory**: Perform semantic search across long-term memories.
- **memory_prompt**: Generate prompts enriched with session context and long-term memories.

## Command Line Interface

The `agent-memory-server` provides a command-line interface (CLI) for managing the server and related tasks. You can access the CLI using the `agent-memory` command (assuming the package is installed in a way that makes the script available in your PATH, e.g., via `pip install ...`).

### Available Commands

Here's a list of available commands and their functions:

#### `version`
Displays the current version of `agent-memory-server`.
```bash
agent-memory version
```

#### `api`
Starts the REST API server.
```bash
agent-memory api [OPTIONS]
```
**Options:**
* `--port INTEGER`: Port to run the server on. (Default: value from `settings.port`, usually 8000)
* `--host TEXT`: Host to run the server on. (Default: "0.0.0.0")
* `--reload`: Enable auto-reload for development.

Example:
```bash
agent-memory api --port 8080 --reload
```

#### `mcp`
Starts the Model Context Protocol (MCP) server.
```bash
agent-memory mcp [OPTIONS]
```
**Options:**
* `--port INTEGER`: Port to run the MCP server on. (Default: value from `settings.mcp_port`, usually 9000)
* `--sse`: Run the MCP server in Server-Sent Events (SSE) mode. If not provided, it runs in stdio mode.

Example (SSE mode):
```bash
agent-memory mcp --port 9001 --sse
```
Example (stdio mode):
```bash
agent-memory mcp --port 9001
```

#### `schedule-task`
Schedules a background task to be processed by a Docket worker.
```bash
agent-memory schedule-task <TASK_PATH> [OPTIONS]
```
**Arguments:**
* `TASK_PATH`: The Python import path to the task function. For example: `"agent_memory_server.long_term_memory.compact_long_term_memories"`

**Options:**
* `--args TEXT` / `-a TEXT`: Arguments to pass to the task in `key=value` format. Can be specified multiple times. Values are automatically converted to boolean, integer, or float if possible, otherwise they remain strings.

Example:
```bash
agent-memory schedule-task "agent_memory_server.long_term_memory.compact_long_term_memories" -a limit=500 -a namespace=my_namespace -a compact_semantic_duplicates=false
```

#### `task-worker`
Starts a Docket worker to process background tasks from the queue. This worker uses the Docket name configured in settings.
```bash
agent-memory task-worker [OPTIONS]
```
**Options:**
* `--concurrency INTEGER`: Number of tasks to process concurrently. (Default: 10)
* `--redelivery-timeout INTEGER`: Seconds to wait before a task is redelivered to another worker if the current worker fails or times out. (Default: 30)

Example:
```bash
agent-memory task-worker --concurrency 5 --redelivery-timeout 60
```

To see help for any command, you can use `--help`:
```bash
agent-memory --help
agent-memory api --help
agent-memory mcp --help
# etc.
```

## Getting Started

### Local Install
Expand Down Expand Up @@ -297,7 +382,7 @@ Currently, memory compaction is only available as a function in `agent_memory_se
- **Semantic Deduplication**: Finds and merges memories with similar meaning using vector search
- **LLM-powered Merging**: Uses language models to intelligently combine memories

### Contributing
## Contributing
1. Fork the repository
2. Create a feature branch
3. Commit your changes
Expand All @@ -308,8 +393,5 @@ Currently, memory compaction is only available as a function in `agent_memory_se

```bash
# Run all tests
python -m pytest tests/test_memory_compaction.py

# Run specific integration test
python -m pytest tests/test_memory_compaction.py::TestMemoryCompaction::test_compact_memories_integration -v
pytest tests
```
198 changes: 198 additions & 0 deletions agent_memory_server/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
#!/usr/bin/env python
"""
Command-line interface for agent-memory-server.
"""

import datetime
import importlib
import sys

import click
import uvicorn

from agent_memory_server.config import settings
from agent_memory_server.logging import configure_logging, get_logger
from agent_memory_server.utils.redis import ensure_search_index_exists, get_redis_conn


configure_logging()
logger = get_logger(__name__)

VERSION = "0.2.0"


@click.group()
def cli():
"""Command-line interface for agent-memory-server."""
pass


@cli.command()
def version():
"""Show the version of agent-memory-server."""
click.echo(f"agent-memory-server version {VERSION}")


@cli.command()
@click.option("--port", default=settings.port, help="Port to run the server on")
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
@click.option("--reload", is_flag=True, help="Enable auto-reload")
def api(port: int, host: str, reload: bool):
"""Run the REST API server."""
import asyncio

from agent_memory_server.main import app, on_start_logger

async def setup_redis():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Run the async setup
asyncio.run(setup_redis())

on_start_logger(port)
uvicorn.run(
app,
host=host,
port=port,
reload=reload,
)


@cli.command()
@click.option("--port", default=settings.mcp_port, help="Port to run the MCP server on")
@click.option("--sse", is_flag=True, help="Run the MCP server in SSE mode")
def mcp(port: int, sse: bool):
"""Run the MCP server."""
import asyncio

# Update the port in settings FIRST
settings.mcp_port = port

# Import mcp_app AFTER settings have been updated
from agent_memory_server.mcp import mcp_app

async def setup_and_run():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Run the MCP server
if sse:
await mcp_app.run_sse_async()
else:
await mcp_app.run_stdio_async()

# Update the port in settings
settings.mcp_port = port

click.echo(f"Starting MCP server on port {port}")

if sse:
click.echo("Running in SSE mode")
else:
click.echo("Running in stdio mode")

asyncio.run(setup_and_run())


@cli.command()
@click.argument("task_path")
@click.option(
"--args",
"-a",
multiple=True,
help="Arguments to pass to the task in the format key=value",
)
def schedule_task(task_path: str, args: list[str]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo I may very well steal this for Docket :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah! Nice! This was my MVP for scheduling recurring tasks. Cron, here we come!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Check out Perpetual, it's not documented that awesomely, but I think it's what you're looking for: https://github.com/chrisguidry/docket/blob/main/tests/test_fundamentals.py#L973-L997

Docket doesn't have cron syntax, but it can reschedule tasks on intervals forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, I'll check that out!

"""
Schedule a background task by path.

TASK_PATH is the import path to the task function, e.g.,
"agent_memory_server.long_term_memory.compact_long_term_memories"
"""
import asyncio

from docket import Docket

# Parse the arguments
task_args = {}
for arg in args:
try:
key, value = arg.split("=", 1)
# Try to convert to appropriate type
if value.lower() == "true":
task_args[key] = True
elif value.lower() == "false":
task_args[key] = False
elif value.isdigit():
task_args[key] = int(value)
elif value.replace(".", "", 1).isdigit() and value.count(".") <= 1:
task_args[key] = float(value)
else:
task_args[key] = value
except ValueError:
click.echo(f"Invalid argument format: {arg}. Use key=value format.")
sys.exit(1)

async def setup_and_run_task():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Import the task function
module_path, function_name = task_path.rsplit(".", 1)
try:
module = importlib.import_module(module_path)
task_func = getattr(module, function_name)
except (ImportError, AttributeError) as e:
click.echo(f"Error importing task: {e}")
sys.exit(1)

# Initialize Docket client
async with Docket(
name=settings.docket_name,
url=settings.redis_url,
) as docket:
click.echo(f"Scheduling task {task_path} with arguments: {task_args}")
await docket.add(task_func)(**task_args)
click.echo("Task scheduled successfully")

asyncio.run(setup_and_run_task())


@cli.command()
@click.option(
"--concurrency", default=10, help="Number of tasks to process concurrently"
)
@click.option(
"--redelivery-timeout",
default=30,
help="Seconds to wait before redelivering a task to another worker",
)
def task_worker(concurrency: int, redelivery_timeout: int):
"""
Start a Docket worker using the Docket name from settings.

This command starts a worker that processes background tasks registered
with Docket. The worker uses the Docket name from settings.
"""
import asyncio

from docket import Worker

if not settings.use_docket:
click.echo("Docket is disabled in settings. Cannot run worker.")
sys.exit(1)

asyncio.run(
Worker.run(
docket_name=settings.docket_name,
url=settings.redis_url,
concurrency=concurrency,
redelivery_timeout=datetime.timedelta(seconds=redelivery_timeout),
tasks=["agent_memory_server.docket_tasks:task_collection"],
)
)


if __name__ == "__main__":
cli()
2 changes: 2 additions & 0 deletions agent_memory_server/docket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from agent_memory_server.config import settings
from agent_memory_server.long_term_memory import (
compact_long_term_memories,
extract_memory_structure,
index_long_term_memories,
)
Expand All @@ -22,6 +23,7 @@
extract_memory_structure,
summarize_session,
index_long_term_memories,
compact_long_term_memories,
]


Expand Down
5 changes: 1 addition & 4 deletions agent_memory_server/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ async def compact_long_term_memories(
if compact_hash_duplicates:
logger.info("Starting hash-based duplicate compaction")
try:
# TODO: Use RedisVL index
index_name = Keys.search_index_name()

# Create aggregation query to group by memory_hash and find duplicates
Expand Down Expand Up @@ -386,7 +385,7 @@ async def compact_long_term_memories(
logger.warning(f"Error checking index: {info_e}")

# Get all memories matching the filters
index = await get_search_index(redis_client)
index = get_search_index(redis_client)
query_str = filter_str if filter_str != "*" else ""

# Create a query to get all memories
Expand Down Expand Up @@ -675,8 +674,6 @@ async def index_long_term_memories(
redis_client: Optional Redis client to use. If None, a new connection will be created.
"""
redis = redis_client or await get_redis_conn()
# Ensure search index exists before indexing memories
await ensure_search_index_exists(redis)
background_tasks = get_background_tasks()
vectorizer = OpenAITextVectorizer()
embeddings = await vectorizer.aembed_many(
Expand Down
4 changes: 3 additions & 1 deletion agent_memory_server/utils/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging

from agent_memory_server.config import settings


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,4 +58,4 @@ def metadata_key(session_id: str, namespace: str | None = None) -> str:
@staticmethod
def search_index_name() -> str:
"""Return the name of the search index."""
return "memory_idx"
return settings.redisvl_index_name
Loading