Skip to content

Add a CLI #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ Currently, memory compaction is only available as a function in `agent_memory_se
- **Semantic Deduplication**: Finds and merges memories with similar meaning using vector search
- **LLM-powered Merging**: Uses language models to intelligently combine memories

### Contributing
## Contributing
1. Fork the repository
2. Create a feature branch
3. Commit your changes
Expand All @@ -308,8 +308,5 @@ Currently, memory compaction is only available as a function in `agent_memory_se

```bash
# Run all tests
python -m pytest tests/test_memory_compaction.py

# Run specific integration test
python -m pytest tests/test_memory_compaction.py::TestMemoryCompaction::test_compact_memories_integration -v
pytest tests
```
200 changes: 200 additions & 0 deletions agent_memory_server/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python
"""
Command-line interface for agent-memory-server.
"""

import datetime
import importlib
import sys

import click
import uvicorn

from agent_memory_server.config import settings
from agent_memory_server.logging import configure_logging, get_logger
from agent_memory_server.utils.redis import ensure_search_index_exists, get_redis_conn


# Set up logging
configure_logging()
logger = get_logger(__name__)

# Define the version
VERSION = "0.2.0" # Matches the version in pyproject.toml


@click.group()
def cli():
"""Command-line interface for agent-memory-server."""
pass


@cli.command()
def version():
"""Show the version of agent-memory-server."""
click.echo(f"agent-memory-server version {VERSION}")


@cli.command()
@click.option("--port", default=settings.port, help="Port to run the server on")
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
@click.option("--reload", is_flag=True, help="Enable auto-reload")
def api(port: int, host: str, reload: bool):
"""Run the REST API server."""
import asyncio

from agent_memory_server.main import app, on_start_logger

async def setup_redis():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Run the async setup
asyncio.run(setup_redis())

on_start_logger(port)
uvicorn.run(
app,
host=host,
port=port,
reload=reload,
)


@cli.command()
@click.option("--port", default=settings.mcp_port, help="Port to run the MCP server on")
@click.option("--sse", is_flag=True, help="Run the MCP server in SSE mode")
def mcp(port: int, sse: bool):
"""Run the MCP server."""
import asyncio

from agent_memory_server.mcp import mcp_app

async def setup_and_run():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Run the MCP server
if sse:
await mcp_app.run_sse_async()
else:
await mcp_app.run_stdio_async()

# Update the port in settings
settings.mcp_port = port

# Update the port in the mcp_app
mcp_app._port = port

click.echo(f"Starting MCP server on port {port}")

if sse:
click.echo("Running in SSE mode")
else:
click.echo("Running in stdio mode")

asyncio.run(setup_and_run())


@cli.command()
@click.argument("task_path")
@click.option(
"--args",
"-a",
multiple=True,
help="Arguments to pass to the task in the format key=value",
)
def schedule_task(task_path: str, args: list[str]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooo I may very well steal this for Docket :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah! Nice! This was my MVP for scheduling recurring tasks. Cron, here we come!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Check out Perpetual, it's not documented that awesomely, but I think it's what you're looking for: https://github.com/chrisguidry/docket/blob/main/tests/test_fundamentals.py#L973-L997

Docket doesn't have cron syntax, but it can reschedule tasks on intervals forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice, I'll check that out!

"""
Schedule a background task by path.

TASK_PATH is the import path to the task function, e.g.,
"agent_memory_server.long_term_memory.compact_long_term_memories"
"""
import asyncio

from docket import Docket

# Parse the arguments
task_args = {}
for arg in args:
try:
key, value = arg.split("=", 1)
# Try to convert to appropriate type
if value.lower() == "true":
task_args[key] = True
elif value.lower() == "false":
task_args[key] = False
elif value.isdigit():
task_args[key] = int(value)
elif value.replace(".", "", 1).isdigit() and value.count(".") <= 1:
task_args[key] = float(value)
else:
task_args[key] = value
except ValueError:
click.echo(f"Invalid argument format: {arg}. Use key=value format.")
sys.exit(1)

async def setup_and_run_task():
redis = await get_redis_conn()
await ensure_search_index_exists(redis)

# Import the task function
module_path, function_name = task_path.rsplit(".", 1)
try:
module = importlib.import_module(module_path)
task_func = getattr(module, function_name)
except (ImportError, AttributeError) as e:
click.echo(f"Error importing task: {e}")
sys.exit(1)

# Initialize Docket client
async with Docket(
name=settings.docket_name,
url=settings.redis_url,
) as docket:
click.echo(f"Scheduling task {task_path} with arguments: {task_args}")
await docket.add(task_func)(**task_args)
click.echo("Task scheduled successfully")

asyncio.run(setup_and_run_task())


@cli.command()
@click.option(
"--concurrency", default=10, help="Number of tasks to process concurrently"
)
@click.option(
"--redelivery-timeout",
default=30,
help="Seconds to wait before redelivering a task to another worker",
)
def task_worker(concurrency: int, redelivery_timeout: int):
"""
Start a Docket worker using the Docket name from settings.

This command starts a worker that processes background tasks registered
with Docket. The worker uses the Docket name from settings.
"""
import asyncio

from docket import Worker

if not settings.use_docket:
click.echo("Docket is disabled in settings. Cannot run worker.")
sys.exit(1)

asyncio.run(
Worker.run(
docket_name=settings.docket_name,
url=settings.redis_url,
name="agent-memory-worker",
concurrency=concurrency,
redelivery_timeout=datetime.timedelta(seconds=redelivery_timeout),
tasks=["agent_memory_server.docket_tasks:task_collection"],
)
)


if __name__ == "__main__":
cli()
2 changes: 2 additions & 0 deletions agent_memory_server/docket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from agent_memory_server.config import settings
from agent_memory_server.long_term_memory import (
compact_long_term_memories,
extract_memory_structure,
index_long_term_memories,
)
Expand All @@ -22,6 +23,7 @@
extract_memory_structure,
summarize_session,
index_long_term_memories,
compact_long_term_memories,
]


Expand Down
5 changes: 1 addition & 4 deletions agent_memory_server/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ async def compact_long_term_memories(
if compact_hash_duplicates:
logger.info("Starting hash-based duplicate compaction")
try:
# TODO: Use RedisVL index
index_name = Keys.search_index_name()

# Create aggregation query to group by memory_hash and find duplicates
Expand Down Expand Up @@ -386,7 +385,7 @@ async def compact_long_term_memories(
logger.warning(f"Error checking index: {info_e}")

# Get all memories matching the filters
index = await get_search_index(redis_client)
index = get_search_index(redis_client)
query_str = filter_str if filter_str != "*" else ""

# Create a query to get all memories
Expand Down Expand Up @@ -675,8 +674,6 @@ async def index_long_term_memories(
redis_client: Optional Redis client to use. If None, a new connection will be created.
"""
redis = redis_client or await get_redis_conn()
# Ensure search index exists before indexing memories
await ensure_search_index_exists(redis)
background_tasks = get_background_tasks()
vectorizer = OpenAITextVectorizer()
embeddings = await vectorizer.aembed_many(
Expand Down
4 changes: 3 additions & 1 deletion agent_memory_server/utils/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging

from agent_memory_server.config import settings


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,4 +58,4 @@ def metadata_key(session_id: str, namespace: str | None = None) -> str:
@staticmethod
def search_index_name() -> str:
"""Return the name of the search index."""
return "memory_idx"
return settings.redisvl_index_name
110 changes: 0 additions & 110 deletions agent_memory_server/worker.py

This file was deleted.

Loading