Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import contextlib
import json
import logging
import re
import sys
import threading
import time
Expand Down Expand Up @@ -156,12 +157,9 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
},
)
self._session_id = resp.get("sessionId") or session_id
ep = self._bridge.request("episode.open", {"sessionId": self._session_id})
self._episode_id = ep.get("episodeId", "")
logger.info(
"MemOS: bridge ready session=%s episode=%s platform=%s",
"MemOS: bridge ready session=%s platform=%s",
self._session_id,
self._episode_id,
self._platform,
)
except Exception as err:
Expand Down Expand Up @@ -212,10 +210,13 @@ def queue_prefetch(self, query: str, *, session_id: str = "") -> None: # type:

def _run() -> None:
try:
result = self._turn_start(query, session_id=session_id) if self._bridge else ""
if result:
with self._prefetch_lock:
self._prefetch_result = result
# Skip turn.start for auto-skill eval prompts to avoid
# creating a trace that _turn_end will never complete.
if not self._AUTO_SKILL_EVAL_RE.search(query):
result = self._turn_start(query, session_id=session_id) if self._bridge else ""
if result:
with self._prefetch_lock:
self._prefetch_result = result
except Exception as err:
logger.debug("MemOS: queue_prefetch failed — %s", err)

Expand Down Expand Up @@ -488,6 +489,17 @@ def _turn_start(self, query: str, *, session_id: str = "") -> str:
return ""
return f"## Recalled Memories\n{context}"

# Hermes injects a structured auto-skill evaluation prompt at task end:
# "Review the conversation above and consider whether a skill should
# be saved or updated. Work in this order… SURVEY … THINK CLASS-FIRST …"
# Capturing this system-level scaffolding as conversation content pollutes
# memory search, task summaries, and downstream skill generation.
_AUTO_SKILL_EVAL_RE = re.compile(
r"^Review the conversation above and consider whether a "
r"skill should be saved or updated\.",
re.MULTILINE,
)

def _turn_end(
self,
user_content: str,
Expand All @@ -497,6 +509,17 @@ def _turn_end(
) -> None:
if not self._bridge:
return
# Strip Hermes auto-skill evaluation blocks. The prompt may appear
# in assistant_content (main agent) or user_content (fork agent that
# receives the review prompt as a user_message).
m = self._AUTO_SKILL_EVAL_RE.search(assistant_content)
if m:
assistant_content = assistant_content[: m.start()].strip()
m = self._AUTO_SKILL_EVAL_RE.search(user_content)
if m:
user_content = user_content[: m.start()].strip()
if not assistant_content.strip() and not user_content.strip():
return
self._bridge.request(
"turn.end",
{
Expand Down
43 changes: 30 additions & 13 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,36 @@ export function createMemoryCore(
for (const ep of openEpisodes) {
const epAge = nowMs - (ep.endedAt ?? ep.startedAt);
if (epAge > STALE_EPISODE_TIMEOUT_MS) {
log.info("stale_episode.auto_abandon", {
episodeId: ep.id,
sessionId: ep.sessionId,
ageMs: epAge,
thresholdMs: STALE_EPISODE_TIMEOUT_MS,
});
try {
handle.episodeManager.abandon(
ep.id as import("../../agent-contract/dto.js").EpisodeId,
`自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`,
);
} catch {
// Episode may have been finalized concurrently — safe to ignore.
const idleReason = `自动关闭:空闲 ${Math.round(epAge / 60_000)} 分钟(阈值 ${Math.round(STALE_EPISODE_TIMEOUT_MS / 60_000)} 分钟)`;
if (ep.traceIds && ep.traceIds.length > 0) {
log.info("stale_episode.auto_finalize", {
episodeId: ep.id,
sessionId: ep.sessionId,
ageMs: epAge,
thresholdMs: STALE_EPISODE_TIMEOUT_MS,
traceCount: ep.traceIds.length,
reason: idleReason,
});
try {
handle.sessionManager.finalizeEpisode(ep.id);
} catch {
// Episode may have been finalized concurrently — safe to ignore.
}
} else {
log.info("stale_episode.auto_abandon", {
episodeId: ep.id,
sessionId: ep.sessionId,
ageMs: epAge,
thresholdMs: STALE_EPISODE_TIMEOUT_MS,
});
try {
handle.episodeManager.abandon(
ep.id as import("../../agent-contract/dto.js").EpisodeId,
idleReason,
);
} catch {
// Episode may have been finalized concurrently — safe to ignore.
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/core/reward/reward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ export function createRewardRunner(deps: RewardDeps): RewardRunner {

try {
deps.episodesRepo.updateMeta(input.episodeId, {
closeReason: "finalized",
abandonReason: undefined,
reward: {
rHuman: humanScore.rHuman,
source: humanScore.source,
Expand Down
32 changes: 23 additions & 9 deletions apps/memos-local-plugin/core/session/episode-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,16 +200,30 @@ export function createEpisodeManager(deps: EpisodeManagerDeps): EpisodeManager {
const endedAt = now();
snap.status = "closed";
snap.endedAt = endedAt;
snap.meta = { ...snap.meta, closeReason: "abandoned", abandonReason: reason };
const hasReward = snap.rTask != null;
if (hasReward) {
snap.meta = { ...snap.meta, closeReason: "finalized", abandonReason: undefined };
log.info("episode.abandon_finalized", {
episodeId: id,
sessionId: snap.sessionId,
turnCount: snap.turnCount,
rTask: snap.rTask,
reason,
});
} else {
snap.meta = { ...snap.meta, closeReason: "abandoned", abandonReason: reason };
log.warn("episode.abandoned", {
episodeId: id,
sessionId: snap.sessionId,
turnCount: snap.turnCount,
reason,
});
}
deps.episodesRepo.close(id, endedAt, snap.rTask ?? undefined, snap.meta);
log.warn("episode.abandoned", {
episodeId: id,
sessionId: snap.sessionId,
turnCount: snap.turnCount,
reason,
});
deps.bus.emit({ kind: "episode.finalized", episode: cloneSnapshot(snap), closedBy: "abandoned" });
deps.bus.emit({ kind: "episode.abandoned", episodeId: id, reason });
deps.bus.emit({ kind: "episode.finalized", episode: cloneSnapshot(snap), closedBy: hasReward ? "finalized" : "abandoned" });
if (!hasReward) {
deps.bus.emit({ kind: "episode.abandoned", episodeId: id, reason });
}
return cloneSnapshot(snap);
},

Expand Down
Loading