Skip to content

Commit d8a67ce

Browse files
authored
Upgrade AMS to Docket 0.18.2 cluster support (#216)
* Upgrade to Docket 0.18.2 so we can support Redis Cluster
1 parent 8097900 commit d8a67ce

File tree

14 files changed

+261
-96
lines changed

14 files changed

+261
-96
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# Redis connection
22
REDIS_URL=redis://localhost:6379
3+
# For Redis Cluster with Docket 0.18.2+, use redis+cluster://host:port
34

45
# Server port
56
PORT=8000

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ expects a separate `agent-memory task-worker` process for non-blocking
5454
background tasks. The example above shows how to override this to use the
5555
asyncio backend for a single-container development setup.
5656

57+
If you are connecting to a Redis Cluster and want Docket-backed workers, set
58+
`REDIS_URL` to a `redis+cluster://...` or `rediss+cluster://...` URL. AMS will
59+
translate that URL for its other Redis clients internally.
60+
5761
**Production Deployment**:
5862

5963
For production, run separate containers for the API and background workers:

agent_memory_server/cli.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@
2323
migrate_add_memory_type_3,
2424
migrate_normalize_tag_separators_4,
2525
)
26-
from agent_memory_server.utils.redis import get_redis_conn
26+
from agent_memory_server.utils.redis import (
27+
docket_stream_key,
28+
get_redis_conn,
29+
redis_url_for_docket,
30+
)
2731

2832

2933
logger = get_logger(__name__)
@@ -435,7 +439,7 @@ async def setup_and_run_task():
435439
# Initialize Docket client
436440
async with Docket(
437441
name=settings.docket_name,
438-
url=settings.redis_url,
442+
url=redis_url_for_docket(settings.redis_url),
439443
) as docket:
440444
click.echo(f"Scheduling task {task_path} with arguments: {task_args}")
441445
await docket.add(task_func)(**task_args)
@@ -475,7 +479,7 @@ async def _ensure_stream_and_group():
475479
from redis.exceptions import ResponseError
476480

477481
redis = await get_redis_conn()
478-
stream_key = f"{settings.docket_name}:stream"
482+
stream_key = docket_stream_key(settings.docket_name, settings.redis_url)
479483
group_name = "docket-workers"
480484

481485
try:
@@ -493,7 +497,7 @@ async def _run_worker():
493497
await get_redis_conn()
494498
await Worker.run(
495499
docket_name=settings.docket_name,
496-
url=settings.redis_url,
500+
url=redis_url_for_docket(settings.redis_url),
497501
concurrency=concurrency,
498502
redelivery_timeout=timedelta(seconds=redelivery_timeout),
499503
tasks=["agent_memory_server.docket_tasks:task_collection"],

agent_memory_server/dependencies.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from agent_memory_server.config import settings
1010
from agent_memory_server.logging import get_logger
11+
from agent_memory_server.utils.redis import redis_url_for_docket
1112

1213

1314
logger = get_logger(__name__)
@@ -49,7 +50,7 @@ def run_in_thread():
4950
async def schedule_task():
5051
async with Docket(
5152
name=settings.docket_name,
52-
url=settings.redis_url,
53+
url=redis_url_for_docket(settings.redis_url),
5354
) as docket:
5455
# Schedule task in Docket's queue
5556
await docket.add(func)(*args, **kwargs)

agent_memory_server/docket_tasks.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
periodic_refresh_summary_views,
2727
refresh_summary_view,
2828
)
29+
from agent_memory_server.utils.redis import redis_url_for_docket
2930

3031

3132
logger = logging.getLogger(__name__)
@@ -58,7 +59,7 @@ async def register_tasks() -> None:
5859
# Initialize Docket client
5960
async with Docket(
6061
name=settings.docket_name,
61-
url=settings.redis_url,
62+
url=redis_url_for_docket(settings.redis_url),
6263
) as docket:
6364
# Register all tasks
6465
for task in task_collection:

agent_memory_server/long_term_memory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
rerank_with_recency,
4747
update_memory_hash_if_text_changed,
4848
)
49-
from agent_memory_server.utils.redis import get_redis_conn
49+
from agent_memory_server.utils.redis import get_redis_conn, redis_url_for_docket
5050
from agent_memory_server.utils.tag_codec import encode_tag_values
5151

5252

@@ -249,7 +249,7 @@ async def schedule_trailing_extraction(
249249
try:
250250
async with Docket(
251251
name=settings.docket_name,
252-
url=settings.redis_url,
252+
url=redis_url_for_docket(settings.redis_url),
253253
) as docket:
254254
# Schedule with a unique key per session
255255
# If there's already a pending task for this session, the new one

agent_memory_server/memory_vector_db_factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
MemoryVectorDatabase,
3232
RedisVLMemoryVectorDatabase,
3333
)
34+
from agent_memory_server.utils.redis import redis_url_for_redisvl
3435

3536

3637
logger = logging.getLogger(__name__)
@@ -187,7 +188,10 @@ def create_redis_memory_vector_db(
187188
"""
188189
try:
189190
schema = _build_redis_schema()
190-
index = AsyncSearchIndex.from_dict(schema, redis_url=settings.redis_url)
191+
index = AsyncSearchIndex.from_dict(
192+
schema,
193+
redis_url=redis_url_for_redisvl(settings.redis_url),
194+
)
191195
return RedisVLMemoryVectorDatabase(index, embeddings)
192196
except Exception as e:
193197
logger.error(f"Error creating Redis memory vector database: {e}")

agent_memory_server/utils/redis.py

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,118 @@
22

33
import logging
44
from typing import Any
5+
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
56

67
from redis.asyncio import Redis
8+
from redis.asyncio.cluster import RedisCluster
79

810
from agent_memory_server.config import settings
911

1012

1113
logger = logging.getLogger(__name__)
12-
_redis_pool: Redis | None = None
14+
_redis_pool: Redis | RedisCluster | None = None
15+
_CLUSTER_SCHEMES = {"redis+cluster", "rediss+cluster"}
1316

1417

15-
async def get_redis_conn(url: str = settings.redis_url, **kwargs) -> Redis:
18+
def _netloc_has_multiple_hosts(netloc: str) -> bool:
19+
host_part = netloc.rsplit("@", 1)[-1]
20+
return "," in host_part
21+
22+
23+
def is_redis_cluster_url(url: str) -> bool:
24+
"""Return True when the URL targets a Redis Cluster deployment."""
25+
parsed = urlparse(url)
26+
if parsed.scheme in _CLUSTER_SCHEMES:
27+
return True
28+
29+
query = dict(parse_qsl(parsed.query, keep_blank_values=True))
30+
if query.get("cluster", "").lower() == "true":
31+
return True
32+
33+
return _netloc_has_multiple_hosts(parsed.netloc)
34+
35+
36+
def _strip_cluster_query(url: str) -> str:
37+
parsed = urlparse(url)
38+
query_items = [
39+
(key, value)
40+
for key, value in parse_qsl(parsed.query, keep_blank_values=True)
41+
if key.lower() != "cluster"
42+
]
43+
return urlunparse(parsed._replace(query=urlencode(query_items, doseq=True)))
44+
45+
46+
def redis_url_for_docket(url: str) -> str:
47+
"""Normalize a Redis URL for Docket's cluster-aware URL scheme."""
48+
if not is_redis_cluster_url(url):
49+
return url
50+
51+
parsed = urlparse(_strip_cluster_query(url))
52+
if parsed.scheme in _CLUSTER_SCHEMES:
53+
return urlunparse(parsed)
54+
55+
return urlunparse(parsed._replace(scheme=f"{parsed.scheme}+cluster"))
56+
57+
58+
def redis_url_for_redisvl(url: str) -> str:
59+
"""Normalize a Redis URL for RedisVL's cluster detection."""
60+
if not is_redis_cluster_url(url):
61+
return url
62+
63+
parsed = urlparse(_strip_cluster_query(url))
64+
query_items = parse_qsl(parsed.query, keep_blank_values=True)
65+
query_items.append(("cluster", "true"))
66+
return urlunparse(
67+
parsed._replace(
68+
scheme=parsed.scheme.replace("+cluster", ""),
69+
query=urlencode(query_items, doseq=True),
70+
)
71+
)
72+
73+
74+
def redis_url_for_async_redis(url: str) -> str:
75+
"""Normalize a Redis URL for redis-py's async standalone/cluster clients."""
76+
if not is_redis_cluster_url(url):
77+
return url
78+
79+
parsed = urlparse(_strip_cluster_query(url))
80+
return urlunparse(parsed._replace(scheme=parsed.scheme.replace("+cluster", "")))
81+
82+
83+
def docket_prefix(name: str, redis_url: str) -> str:
84+
"""Return the Docket key prefix for the given deployment mode."""
85+
if is_redis_cluster_url(redis_url):
86+
return f"{{{name}}}"
87+
return name
88+
89+
90+
def docket_stream_key(name: str, redis_url: str) -> str:
91+
"""Return the Docket stream key with cluster-safe hashing when needed."""
92+
return f"{docket_prefix(name, redis_url)}:stream"
93+
94+
95+
async def get_redis_conn(
96+
url: str = settings.redis_url, **kwargs
97+
) -> Redis | RedisCluster:
1698
"""Get a Redis connection.
1799
18100
Args:
19101
url: Redis connection URL, or None to use settings.redis_url
20-
**kwargs: Additional arguments to pass to Redis.from_url
102+
**kwargs: Additional arguments to pass to the Redis client
21103
22104
Returns:
23-
A Redis client instance
105+
A Redis or RedisCluster client instance
24106
"""
25107
global _redis_pool
26108

27109
# Always use the existing _redis_pool if it's not None, regardless of the URL parameter
28110
# This ensures connection reuse and prevents multiple Redis connections
29111
if _redis_pool is None:
30-
_redis_pool = Redis.from_url(url, **kwargs)
112+
normalized_url = redis_url_for_async_redis(url)
113+
if is_redis_cluster_url(url):
114+
_redis_pool = RedisCluster.from_url(normalized_url, **kwargs)
115+
else:
116+
_redis_pool = Redis.from_url(normalized_url, **kwargs)
31117
return _redis_pool
32118

33119

docs/configuration.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ log_level: INFO
3636
REDIS_URL=redis://localhost:6379 # Redis connection string
3737
```
3838

39+
For Redis Cluster deployments, use `redis+cluster://host:port` or
40+
`rediss+cluster://host:port`. AMS translates that setting for Docket,
41+
RedisVL, and direct Redis connections internally.
42+
3943
### AI Model Configuration
4044
```bash
4145
# Generation models for LLM tasks
@@ -306,7 +310,7 @@ compaction_every_minutes: 15
306310
### High-Performance Setup
307311
```yaml
308312
# config-performance.yaml
309-
redis_url: redis://redis-cluster:6379
313+
redis_url: redis+cluster://redis-cluster:6379
310314
fast_model: gpt-4o-mini
311315
slow_model: gpt-4o
312316
redisvl_indexing_algorithm: HNSW

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ dependencies = [
2626
"pydantic>=2.5.2",
2727
"pydantic-settings>=2.8.1",
2828
"python-dotenv>=1.0.0",
29-
"pydocket>=0.6.3",
29+
"pydocket>=0.18.2",
3030
"redisvl>=0.6.0",
3131
"structlog>=25.2.0",
3232
"tiktoken>=0.5.1",

0 commit comments

Comments
 (0)