Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions src/agents/judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ def _format_similar_block(
return "\n".join(lines)


def _active_memory_results(results: List[SearchResult], limit: int) -> List[SearchResult]:
active = [
result for result in results
if result.metadata.get("is_current") is not False
and not result.metadata.get("forgotten_at")
]
return active[:limit]


# ---------------------------------------------------------------------------
# Type alias for the Neo4j event search callable that the pipeline injects.
#
Expand Down Expand Up @@ -264,7 +273,7 @@ async def _search_one(item_str: str) -> tuple[str, List[SearchResult]]:
query_text=item_str,
filters=filters if filters else None,
)
return item_str, results
return item_str, _active_memory_results(results, self.top_k)
except Exception as exc:
self.logger.warning(
"Vector search failed for '%s': %s", item_str[:60], exc
Expand Down Expand Up @@ -306,9 +315,9 @@ async def _lookup_one(idx: int, item_str: str) -> tuple[str, List[SearchResult]]
if search_fn is not None:
# search_by_metadata is sync — run in thread pool
results = await asyncio.to_thread(
search_fn, filters=filters, top_k=self.top_k,
search_fn, filters=filters, top_k=max(self.top_k * 5, 10),
)
return item_str, results if results else []
return item_str, _active_memory_results(results or [], self.top_k)
else:
self.logger.debug(
"Vector store has no search_by_metadata — "
Expand All @@ -318,7 +327,7 @@ async def _lookup_one(idx: int, item_str: str) -> tuple[str, List[SearchResult]]
query_text=item_str,
filters={"user_id": user_id, "domain": "profile"} if user_id else None,
)
return item_str, results
return item_str, _active_memory_results(results, self.top_k)
except Exception as exc:
self.logger.warning(
"Profile metadata search failed for '%s': %s",
Expand All @@ -344,7 +353,11 @@ async def _search_vector_store(

search_fn = getattr(self.vector_store, "search_by_text", None)
if search_fn is not None:
return await search_fn(query_text, top_k=self.top_k, filters=filters)
return await search_fn(
query_text,
top_k=max(self.top_k * 5, 10),
filters=filters,
)

self.logger.debug(
"Vector store has no search_by_text — skipping search for this item."
Expand Down
12 changes: 11 additions & 1 deletion src/pipelines/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
logger = logging.getLogger("xmem.pipelines.retrieval")


def _is_active_memory(metadata: Dict[str, Any]) -> bool:
return metadata.get("is_current") is not False and not metadata.get("forgotten_at")


# ═══════════════════════════════════════════════════════════════════════════
# Tool schemas — These are the "function signatures" exposed to the LLM
# ═══════════════════════════════════════════════════════════════════════════
Expand Down Expand Up @@ -417,7 +421,7 @@ async def _search_summary(

results = await self.vector_store.search_by_text(
query_text=query,
top_k=top_k,
top_k=max(top_k * 2, 10),
filters={
"user_id": user_id,
"domain": "summary",
Expand All @@ -426,12 +430,16 @@ async def _search_summary(

records = []
for r in results:
if not _is_active_memory(r.metadata):
continue
records.append(SourceRecord(
domain="summary",
content=r.content,
score=r.score,
metadata={"id": r.id, **r.metadata},
))
if len(records) >= top_k:
break

logger.info(" → Summary [%s]: %d results", query, len(records))
return records
Expand Down Expand Up @@ -507,6 +515,8 @@ def _fetch_profile_catalog(self, user_id: str):
seen = set()

for r in results:
if not _is_active_memory(r.metadata):
continue
main_content = r.metadata.get("main_content", "")
if not main_content or main_content in seen:
continue
Expand Down
Loading
Loading