From 7d42dca105b4c697d0370655689da50ca169eb58 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:46:58 +0800 Subject: [PATCH 1/5] feat: add Nacos lifecycle and reload config --- evolve_server/__main__.py | 8 ++++ evolve_server/core/config.py | 21 ++++++++++ skillclaw/config.py | 10 +++++ skillclaw/config_store.py | 62 ++++++++++++++++++++++++++++++ tests/test_skill_backend_config.py | 49 ++++++++++++++++++++++- 5 files changed, 149 insertions(+), 1 deletion(-) diff --git a/evolve_server/__main__.py b/evolve_server/__main__.py index b59d21d..78fee75 100644 --- a/evolve_server/__main__.py +++ b/evolve_server/__main__.py @@ -66,6 +66,8 @@ def _build_config_from_args(args: argparse.Namespace) -> EvolveServerConfig: config.http_port = args.port if args.publish_mode: config.publish_mode = args.publish_mode + if args.nacos_publish_mode: + config.nacos_publish_mode = args.nacos_publish_mode if args.use_skill_verifier is not None: config.use_skill_verifier = args.use_skill_verifier if args.skill_verifier_min_score is not None: @@ -115,6 +117,12 @@ def build_parser() -> argparse.ArgumentParser: default=None, help="Direct publish to skills/ or stage jobs for client-side validation before publish.", ) + parser.add_argument( + "--nacos-publish-mode", + choices=["draft", "review", "direct"], + default=None, + help="Nacos Skill lifecycle mode: draft only, submit for review, or publish latest directly.", + ) skill_verifier_group = parser.add_mutually_exclusive_group() skill_verifier_group.add_argument( "--skill-verifier", diff --git a/evolve_server/core/config.py b/evolve_server/core/config.py index a9f14c4..f9d40e4 100644 --- a/evolve_server/core/config.py +++ b/evolve_server/core/config.py @@ -15,6 +15,8 @@ _PACKAGE_DIR = Path(__file__).resolve().parent _DEFAULT_AGENT_EVOLVE_BASE_URL = "https://api.openai.com/v1" _DEFAULT_AGENT_EVOLVE_MODEL = "gpt-5.4" +_NACOS_PUBLISH_MODES = {"draft", "review", "direct"} +_SKILL_RELOAD_MODES = {"off", "poll", "callback"} def _load_dotenv() -> None: @@ -60,6 +62,11 @@ def _infer_storage_backend(endpoint: str, bucket: str, local_root: str) -> str: return "" +def _normalize_choice(value: str, allowed: set[str], default: str) -> str: + normalized = str(value or "").strip().lower() + return normalized if normalized in allowed else default + + @dataclass class EvolveServerConfig: engine: str = "workflow" @@ -113,6 +120,10 @@ class EvolveServerConfig: nacos_username: str = "" nacos_password: str = "" nacos_label: str = "latest" + nacos_publish_mode: str = "review" + skill_reload_mode: str = "poll" + proxy_reload_url: str = "" + proxy_reload_api_key: str = "" # Scheduling interval_seconds: int = 600 @@ -139,6 +150,8 @@ def __post_init__(self) -> None: self.publish_mode = str(self.publish_mode or "direct").strip().lower() or "direct" if self.publish_mode not in {"direct", "validated"}: self.publish_mode = "direct" + self.nacos_publish_mode = _normalize_choice(self.nacos_publish_mode, _NACOS_PUBLISH_MODES, "review") + self.skill_reload_mode = _normalize_choice(self.skill_reload_mode, _SKILL_RELOAD_MODES, "poll") self.validation_required_results = max(1, int(self.validation_required_results or 1)) self.validation_required_approvals = max(1, int(self.validation_required_approvals or 1)) self.validation_min_mean_score = max( @@ -239,6 +252,10 @@ def from_env(cls) -> "EvolveServerConfig": nacos_username=os.environ.get("EVOLVE_NACOS_USERNAME", ""), nacos_password=os.environ.get("EVOLVE_NACOS_PASSWORD", ""), nacos_label=os.environ.get("EVOLVE_NACOS_LABEL", "latest"), + nacos_publish_mode=os.environ.get("EVOLVE_NACOS_PUBLISH_MODE", "review"), + skill_reload_mode=os.environ.get("EVOLVE_SKILL_RELOAD_MODE", "poll"), + proxy_reload_url=os.environ.get("EVOLVE_PROXY_RELOAD_URL", ""), + proxy_reload_api_key=os.environ.get("EVOLVE_PROXY_RELOAD_API_KEY", ""), interval_seconds=int(os.environ.get("EVOLVE_INTERVAL", "600")), http_port=int(os.environ.get("EVOLVE_PORT", "8787")), history_path=os.environ.get("EVOLVE_HISTORY_LOG", "evolve_history.jsonl"), @@ -352,6 +369,10 @@ def from_skillclaw_config(cls, config) -> "EvolveServerConfig": nacos_username=str(getattr(config, "sharing_nacos_username", "") or ""), nacos_password=str(getattr(config, "sharing_nacos_password", "") or ""), nacos_label=str(getattr(config, "sharing_nacos_label", "") or "latest"), + nacos_publish_mode=str(getattr(config, "sharing_nacos_publish_mode", "") or "review"), + skill_reload_mode=str(getattr(config, "sharing_skill_reload_mode", "") or "poll"), + proxy_reload_url=str(getattr(config, "evolve_proxy_reload_url", "") or ""), + proxy_reload_api_key=str(getattr(config, "proxy_api_key", "") or ""), openclaw_bin=os.environ.get("AGENT_EVOLVE_OPENCLAW_BIN", "openclaw"), openclaw_home=os.environ.get("AGENT_EVOLVE_OPENCLAW_HOME", ""), fresh=os.environ.get("AGENT_EVOLVE_FRESH", "1").lower() not in {"0", "false", "no"}, diff --git a/skillclaw/config.py b/skillclaw/config.py index bbd5a6c..9bb065a 100644 --- a/skillclaw/config.py +++ b/skillclaw/config.py @@ -99,12 +99,22 @@ class SkillClawConfig: sharing_nacos_username: str = "" sharing_nacos_password: str = "" sharing_nacos_label: str = "latest" + sharing_nacos_publish_mode: str = "review" sharing_group_id: str = "default" sharing_user_alias: str = "" sharing_auto_pull_on_start: bool = False sharing_push_min_injections: int = 5 sharing_push_min_effectiveness: float = 0.3 + sharing_session_upload_interval: int = 0 + sharing_skill_reload_mode: str = "poll" + sharing_skill_reload_interval_seconds: int = 30 + + # ------------------------------------------------------------------ # + # Evolve server integration # + # ------------------------------------------------------------------ # + evolve_server_url: str = "" + evolve_proxy_reload_url: str = "" # ------------------------------------------------------------------ # # Background validation # diff --git a/skillclaw/config_store.py b/skillclaw/config_store.py index 53669e9..b2aa7c9 100644 --- a/skillclaw/config_store.py +++ b/skillclaw/config_store.py @@ -23,6 +23,9 @@ "codex": "responses", } _FALLBACK_LLM_API_MODE = "chat" +_NACOS_PUBLISH_MODES = {"draft", "review", "direct"} +_SKILL_RELOAD_MODES = {"off", "poll", "callback"} +_MIN_SKILL_RELOAD_INTERVAL_SECONDS = 5 _DEFAULTS: dict = { "llm": { @@ -77,11 +80,19 @@ "nacos_username": "", "nacos_password": "", "nacos_label": "latest", + "nacos_publish_mode": "review", "group_id": "default", "user_alias": "", "auto_pull_on_start": False, "push_min_injections": 5, "push_min_effectiveness": 0.3, + "session_upload_interval": 0, + "skill_reload_mode": "poll", + "skill_reload_interval_seconds": 30, + }, + "evolve": { + "server_url": "", + "proxy_reload_url": "", }, "validation": { "enabled": False, @@ -159,6 +170,26 @@ def _normalize_validation_mode(value: Any) -> str: return "replay" +def _normalize_choice(value: Any, allowed: set[str], default: str) -> str: + normalized = str(value or "").strip().lower() + return normalized if normalized in allowed else default + + +def _normalize_non_negative_int(value: Any, default: int = 0) -> int: + try: + return max(0, int(value or 0)) + except (TypeError, ValueError): + return default + + +def _normalize_reload_interval(value: Any) -> int: + try: + interval = int(value or 30) + except (TypeError, ValueError): + interval = 30 + return max(_MIN_SKILL_RELOAD_INTERVAL_SECONDS, interval) + + def default_skills_dir_for_claw(claw_type: str) -> Path: """Return the default local skills directory for the selected agent.""" normalized = str(claw_type or "").strip().lower() @@ -284,6 +315,7 @@ def to_skillclaw_config(self) -> SkillClawConfig: raw_claw_type = "none" sharing = data.get("sharing", {}) + evolve = data.get("evolve", {}) validation = data.get("validation", {}) dashboard = data.get("dashboard", {}) sharing_backend = _infer_sharing_backend(sharing) @@ -372,11 +404,30 @@ def to_skillclaw_config(self) -> SkillClawConfig: sharing_nacos_username=str(sharing.get("nacos_username", "") or sharing.get("username", "") or ""), sharing_nacos_password=str(sharing.get("nacos_password", "") or sharing.get("password", "") or ""), sharing_nacos_label=str(sharing.get("nacos_label", "") or sharing.get("label", "") or "latest"), + sharing_nacos_publish_mode=_normalize_choice( + sharing.get("nacos_publish_mode", "review"), + _NACOS_PUBLISH_MODES, + "review", + ), sharing_group_id=str(sharing.get("group_id", "default") or "default"), sharing_user_alias=str(sharing.get("user_alias", "") or ""), sharing_auto_pull_on_start=bool(sharing.get("auto_pull_on_start", False)), sharing_push_min_injections=int(sharing.get("push_min_injections", 5)), sharing_push_min_effectiveness=float(sharing.get("push_min_effectiveness", 0.3)), + sharing_session_upload_interval=_normalize_non_negative_int( + sharing.get("session_upload_interval", 0), + default=0, + ), + sharing_skill_reload_mode=_normalize_choice( + sharing.get("skill_reload_mode", "poll"), + _SKILL_RELOAD_MODES, + "poll", + ), + sharing_skill_reload_interval_seconds=_normalize_reload_interval( + sharing.get("skill_reload_interval_seconds", 30), + ), + evolve_server_url=str(evolve.get("server_url", "") or ""), + evolve_proxy_reload_url=str(evolve.get("proxy_reload_url", "") or ""), validation_enabled=bool(validation.get("enabled", False)), validation_mode=_normalize_validation_mode(validation.get("mode", "replay")), validation_idle_after_seconds=int(validation.get("idle_after_seconds", 300)), @@ -400,6 +451,7 @@ def describe(self) -> str: llm = data.get("llm", {}) skills = data.get("skills", {}) prm = data.get("prm", {}) + evolve = data.get("evolve", {}) dashboard = data.get("dashboard", {}) claw_type = str(data.get("claw_type", "openclaw") or "openclaw") effective_skills_dir = resolve_skills_dir( @@ -458,6 +510,8 @@ def describe(self) -> str: "sharing.nacos_namespace: " f"{sharing.get('nacos_namespace_id') or sharing.get('namespace_id', 'public')}", f"sharing.nacos_label: {sharing.get('nacos_label') or sharing.get('label', 'latest')}", + "sharing.nacos_publish_mode: " + f"{_normalize_choice(sharing.get('nacos_publish_mode', 'review'), _NACOS_PUBLISH_MODES, 'review')}", "sharing.nacos_lifecycle: upload -> submit; review/publish policy is controlled by Nacos", "sharing.session_backend: " f"{sharing.get('session_backend') or ('local' if sharing.get('local_root') else 'not configured')}", @@ -466,10 +520,18 @@ def describe(self) -> str: f"sharing.group: {sharing.get('group_id', 'default')}", f"sharing.alias: {sharing.get('user_alias', '?')}", f"sharing.auto_pull: {sharing.get('auto_pull_on_start', False)}", + "sharing.session_upload_interval: " + f"{_normalize_non_negative_int(sharing.get('session_upload_interval', 0), default=0)}", + "sharing.skill_reload_mode: " + f"{_normalize_choice(sharing.get('skill_reload_mode', 'poll'), _SKILL_RELOAD_MODES, 'poll')}", + "sharing.skill_reload_interval: " + f"{_normalize_reload_interval(sharing.get('skill_reload_interval_seconds', 30))}", ] else: lines.append("sharing.enabled: False") lines += [ + f"evolve.server_url: {evolve.get('server_url', '') or '(not set)'}", + f"evolve.proxy_reload_url: {evolve.get('proxy_reload_url', '') or '(not set)'}", f"validation.enabled: {validation.get('enabled', False)}", f"validation.mode: {_normalize_validation_mode(validation.get('mode', 'replay'))}", f"validation.idle_after: {validation.get('idle_after_seconds', 300)}", diff --git a/tests/test_skill_backend_config.py b/tests/test_skill_backend_config.py index 7042bab..a82e437 100644 --- a/tests/test_skill_backend_config.py +++ b/tests/test_skill_backend_config.py @@ -15,6 +15,10 @@ def test_skill_backend_overrides_skill_storage_without_changing_session_storage( sharing_access_key_id="ak", sharing_secret_access_key="sk", sharing_nacos_server="http://nacos.test", + sharing_nacos_publish_mode="direct", + sharing_skill_reload_mode="callback", + evolve_proxy_reload_url="http://proxy.test", + proxy_api_key="proxy-secret", sharing_group_id="team-a", ) @@ -22,6 +26,10 @@ def test_skill_backend_overrides_skill_storage_without_changing_session_storage( assert evolve_config.skill_storage_backend == "nacos" assert evolve_config.nacos_server == "http://nacos.test" + assert evolve_config.nacos_publish_mode == "direct" + assert evolve_config.skill_reload_mode == "callback" + assert evolve_config.proxy_reload_url == "http://proxy.test" + assert evolve_config.proxy_reload_api_key == "proxy-secret" assert evolve_config.storage_backend == "oss" assert evolve_config.storage_endpoint == "https://oss-cn-hangzhou.aliyuncs.com" assert evolve_config.storage_bucket == "skillclaw-sessions" @@ -39,6 +47,8 @@ def test_skill_backend_empty_keeps_legacy_nacos_backend_behavior(monkeypatch) -> assert evolve_config.skill_storage_backend == "nacos" assert evolve_config.nacos_server == "http://legacy-nacos.test" + assert evolve_config.nacos_publish_mode == "review" + assert evolve_config.skill_reload_mode == "poll" assert evolve_config.storage_backend == "" assert evolve_config.storage_endpoint == "" @@ -54,7 +64,15 @@ def load(self) -> dict: "endpoint": "https://oss-cn-hangzhou.aliyuncs.com", "bucket": "skillclaw-sessions", "nacos_server": "http://nacos.test", - } + "nacos_publish_mode": "direct", + "session_upload_interval": "3", + "skill_reload_mode": "callback", + "skill_reload_interval_seconds": "10", + }, + "evolve": { + "server_url": "http://evolve.test", + "proxy_reload_url": "http://proxy.test", + }, } cfg = InlineConfigStore().to_skillclaw_config() @@ -62,3 +80,32 @@ def load(self) -> dict: assert cfg.sharing_backend == "oss" assert cfg.sharing_skill_backend == "nacos" assert cfg.sharing_nacos_server == "http://nacos.test" + assert cfg.sharing_nacos_publish_mode == "direct" + assert cfg.sharing_session_upload_interval == 3 + assert cfg.sharing_skill_reload_mode == "callback" + assert cfg.sharing_skill_reload_interval_seconds == 10 + assert cfg.evolve_server_url == "http://evolve.test" + assert cfg.evolve_proxy_reload_url == "http://proxy.test" + + +def test_config_store_normalizes_new_nacos_and_reload_options() -> None: + class InlineConfigStore(ConfigStore): + def load(self) -> dict: + return { + "sharing": { + "enabled": True, + "backend": "nacos", + "endpoint": "http://nacos.test", + "nacos_publish_mode": "bad-mode", + "session_upload_interval": "-2", + "skill_reload_mode": "bad-mode", + "skill_reload_interval_seconds": "1", + } + } + + cfg = InlineConfigStore().to_skillclaw_config() + + assert cfg.sharing_nacos_publish_mode == "review" + assert cfg.sharing_session_upload_interval == 0 + assert cfg.sharing_skill_reload_mode == "poll" + assert cfg.sharing_skill_reload_interval_seconds == 5 From 8b5d302d931051a74721c7843ed086f25fd707fb Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:50:03 +0800 Subject: [PATCH 2/5] feat: support Nacos publish lifecycle modes --- evolve_server/engines/workflow.py | 66 ++++++++++-- tests/test_nacos_working_version_push.py | 127 +++++++++++++++++++++++ 2 files changed, 187 insertions(+), 6 deletions(-) diff --git a/evolve_server/engines/workflow.py b/evolve_server/engines/workflow.py index 2a77af8..601728d 100644 --- a/evolve_server/engines/workflow.py +++ b/evolve_server/engines/workflow.py @@ -136,6 +136,36 @@ def _overlay_manifest_metadata( skill["description"] = description return skill + def _wait_nacos_publish(self, name: str, version: str, timeout: float = 30.0) -> bool: + """Publish a Nacos skill version and wait for it to go online. + + Nacos v3 runs a publish pipeline on submit. The pipeline must approve + before the publish call succeeds. This method polls for approval and + retries publish until the version goes online with the latest label. + """ + import time + + deadline = time.monotonic() + timeout + published = False + while time.monotonic() < deadline: + try: + detail = self._nacos_skill_client.get_skill(name) + labels = detail.get("labels", {}) if detail else {} + if labels.get("latest") == version: + return True + if not published: + try: + self._nacos_skill_client.publish(name, version, update_latest_label=True) + logger.info("[EvolveServer] Nacos publish accepted for %s %s", name, version) + published = True + except Exception: + pass + except Exception: + pass + time.sleep(2.0) + logger.warning("[EvolveServer] Nacos publish not confirmed for %s %s within %.0fs", name, version, timeout) + return False + def _fetch_skill(self, name: str) -> Optional[str]: if self._nacos_skill_client is not None: try: @@ -237,13 +267,25 @@ def _upload_skill(self, skill: dict, action: str) -> str: overwrite=True, target_version=target_version, ) - self._nacos_skill_client.submit(name, target_version) + publish_mode = str(getattr(self.config, "nacos_publish_mode", "") or "review").strip().lower() + if publish_mode in {"review", "direct"}: + self._nacos_skill_client.submit(name, target_version) + publish_confirmed = False + if publish_mode == "direct": + publish_confirmed = self._wait_nacos_publish(name, target_version) logger.info( - "[EvolveServer] uploaded and submitted skill %s to Nacos as %s via action=%s", + "[EvolveServer] uploaded skill %s to Nacos as %s via action=%s publish_mode=%s", name, target_version, action, + publish_mode, ) + if publish_mode == "draft": + return "uploaded_draft" + if publish_mode == "review": + return "uploaded_pending_review" + if not publish_confirmed: + return "uploaded_pending_publish" return "uploaded" skill_id = self._id_registry.get_or_create(name) @@ -314,13 +356,13 @@ async def _resolve_and_upload(self, skill: dict, action_type: str) -> tuple[str, has_conflict = await self._call_storage(self._detect_conflict, name, skill) if not has_conflict: upload_status = await self._call_storage(self._upload_skill, skill, action_type) - return (action_type, True) if upload_status == "uploaded" else (upload_status, False) + return self._upload_status_to_action(action_type, upload_status) logger.info("[EvolveServer] conflict detected for '%s' - merging", name) existing_md = await self._call_storage(self._fetch_skill, name) if not existing_md: upload_status = await self._call_storage(self._upload_skill, skill, action_type) - return (action_type, True) if upload_status == "uploaded" else (upload_status, False) + return self._upload_status_to_action(action_type, upload_status) existing_skill = parse_skill_content(name, existing_md) existing_skill = self._overlay_manifest_metadata( @@ -332,11 +374,23 @@ async def _resolve_and_upload(self, skill: dict, action_type: str) -> tuple[str, if merged and merged.get("name"): merged["name"] = name upload_status = await self._call_storage(self._upload_skill, merged, "merge") - return ("merge", True) if upload_status == "uploaded" else (upload_status, False) + return self._upload_status_to_action("merge", upload_status) logger.warning("[EvolveServer] merge failed for '%s' - keeping incoming version", name) upload_status = await self._call_storage(self._upload_skill, skill, action_type) - return (action_type, True) if upload_status == "uploaded" else (upload_status, False) + return self._upload_status_to_action(action_type, upload_status) + + @staticmethod + def _upload_status_to_action(action_type: str, upload_status: str) -> tuple[str, bool]: + if upload_status == "uploaded": + return action_type, True + if upload_status == "uploaded_pending_review": + return f"{action_type}_pending_review", False + if upload_status == "uploaded_pending_publish": + return f"{action_type}_pending_publish", False + if upload_status == "uploaded_draft": + return f"{action_type}_draft", False + return upload_status, False def _empty_judge_summary(self) -> dict[str, Any]: return { diff --git a/tests/test_nacos_working_version_push.py b/tests/test_nacos_working_version_push.py index 0970e79..f373a24 100644 --- a/tests/test_nacos_working_version_push.py +++ b/tests/test_nacos_working_version_push.py @@ -44,6 +44,7 @@ def __init__(self, items: list[dict], downloads: dict[tuple[str, str], bytes]) - self.downloads = downloads self.uploads: list[dict] = [] self.submits: list[tuple[str, str]] = [] + self.publishes: list[dict] = [] self.download_calls: list[dict] = [] def list_skills(self) -> list[dict]: @@ -74,6 +75,15 @@ def submit(self, name: str, version: str) -> str: self.submits.append((name, version)) return version + def publish(self, name: str, version: str, *, update_latest_label: bool = True) -> None: + self.publishes.append( + { + "name": name, + "version": version, + "update_latest_label": update_latest_label, + } + ) + def _write_skill(root: Path, body: str = SKILL_MD) -> None: skill_dir = root / "demo-skill" @@ -217,11 +227,128 @@ def test_evolve_upload_creates_version_when_nacos_skill_has_no_versions() -> Non "create_skill", ) + assert status == "uploaded_pending_review" + assert client.uploads[0]["target_version"] == "0.0.1" + assert client.submits == [("demo-skill", "0.0.1")] + assert client.publishes == [] + + +def test_evolve_upload_leaves_nacos_version_as_draft_in_draft_mode() -> None: + client = FakeNacosClient( + [{"name": "demo-skill", "labels": {}, "versions": [], "editingVersion": None, "reviewingVersion": None}], + {}, + ) + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig(skill_storage_backend="nacos", nacos_publish_mode="draft") + server._nacos_skill_client = client + server._load_remote_skill_record = lambda name: client.get_skill(name) + + status = server._upload_skill( + {"name": "demo-skill", "description": "Demo skill", "content": "New content."}, + "create_skill", + ) + + assert status == "uploaded_draft" + assert client.uploads[0]["target_version"] == "0.0.1" + assert client.submits == [] + assert client.publishes == [] + + +def test_evolve_upload_publishes_nacos_version_in_direct_mode() -> None: + client = FakeNacosClient( + [{"name": "demo-skill", "labels": {}, "versions": [], "editingVersion": None, "reviewingVersion": None}], + {}, + ) + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig(skill_storage_backend="nacos", nacos_publish_mode="direct") + server._nacos_skill_client = client + server._load_remote_skill_record = lambda name: client.get_skill(name) + server._wait_nacos_publish = lambda name, version: True + + status = server._upload_skill( + {"name": "demo-skill", "description": "Demo skill", "content": "New content."}, + "create_skill", + ) + assert status == "uploaded" assert client.uploads[0]["target_version"] == "0.0.1" assert client.submits == [("demo-skill", "0.0.1")] +def test_evolve_upload_marks_direct_nacos_publish_without_confirmation_as_pending() -> None: + client = FakeNacosClient( + [{"name": "demo-skill", "labels": {}, "versions": [], "editingVersion": None, "reviewingVersion": None}], + {}, + ) + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig(skill_storage_backend="nacos", nacos_publish_mode="direct") + server._nacos_skill_client = client + server._load_remote_skill_record = lambda name: client.get_skill(name) + server._wait_nacos_publish = lambda name, version: False + + status = server._upload_skill( + {"name": "demo-skill", "description": "Demo skill", "content": "New content."}, + "create_skill", + ) + + assert status == "uploaded_pending_publish" + assert client.uploads[0]["target_version"] == "0.0.1" + assert client.submits == [("demo-skill", "0.0.1")] + + +@pytest.mark.anyio +async def test_evolve_resolve_marks_nacos_review_upload_as_not_runtime_visible() -> None: + client = FakeNacosClient( + [{"name": "demo-skill", "labels": {}, "versions": [], "editingVersion": None, "reviewingVersion": None}], + {}, + ) + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig(skill_storage_backend="nacos", nacos_publish_mode="review") + server._nacos_skill_client = client + server._load_remote_skill_record = lambda name: client.get_skill(name) + + async def fake_call_storage(func, *args): + return func(*args) + + server._call_storage = fake_call_storage + server._detect_conflict = lambda _name, _skill: False + + action, uploaded = await server._resolve_and_upload( + {"name": "demo-skill", "description": "Demo skill", "content": "New content."}, + "create_skill", + ) + + assert action == "create_skill_pending_review" + assert uploaded is False + + +@pytest.mark.anyio +async def test_evolve_resolve_marks_unconfirmed_nacos_direct_publish_as_not_runtime_visible() -> None: + client = FakeNacosClient( + [{"name": "demo-skill", "labels": {}, "versions": [], "editingVersion": None, "reviewingVersion": None}], + {}, + ) + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig(skill_storage_backend="nacos", nacos_publish_mode="direct") + server._nacos_skill_client = client + server._load_remote_skill_record = lambda name: client.get_skill(name) + server._wait_nacos_publish = lambda name, version: False + + async def fake_call_storage(func, *args): + return func(*args) + + server._call_storage = fake_call_storage + server._detect_conflict = lambda _name, _skill: False + + action, uploaded = await server._resolve_and_upload( + {"name": "demo-skill", "description": "Demo skill", "content": "New content."}, + "create_skill", + ) + + assert action == "create_skill_pending_publish" + assert uploaded is False + + def test_evolve_fetch_returns_none_when_nacos_skill_has_no_published_label() -> None: client = FakeNacosClient( [{"name": "demo-skill", "labels": {}, "editingVersion": None, "reviewingVersion": None}], From 6267325910c6f80c08e1c435603ea007895ba077 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:52:31 +0800 Subject: [PATCH 3/5] feat: trigger evolve from session snapshots --- evolve_server/engines/EVOLVE_AGENTS.md | 2 +- skillclaw/api_server.py | 37 ++++++++- tests/test_session_upload_trigger.py | 95 +++++++++++++++++++++++ tests/test_summarizer_skill_references.py | 41 ++++++++++ 4 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 tests/test_session_upload_trigger.py create mode 100644 tests/test_summarizer_skill_references.py diff --git a/evolve_server/engines/EVOLVE_AGENTS.md b/evolve_server/engines/EVOLVE_AGENTS.md index 28ffb8c..98e243e 100644 --- a/evolve_server/engines/EVOLVE_AGENTS.md +++ b/evolve_server/engines/EVOLVE_AGENTS.md @@ -60,7 +60,7 @@ compact format. Each file contains: - `mean_score`: average ORM score across rollouts - `success_count` / `fail_count`: how many rollouts passed / failed - `stability`: `"all_success"`, `"all_fail"`, or `"unstable"` -- `_skills_referenced`: list of skill names the agent read or was injected +- `_skills_referenced`: list of skill names the agent concretely read or modified - `_avg_prm`: mean PRM score across all turns (0.0–1.0; higher = better) - `_has_tool_errors`: whether any tool call failed during the session - `_trajectory`: **structured step-by-step trace** of the agent's actions. diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index 0fcc4b7..283724e 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -10,6 +10,7 @@ import asyncio import base64 +import copy import json import logging import os @@ -2387,6 +2388,7 @@ def _prompt_len(msgs): "prm_score": None, } ) + self._maybe_upload_session_snapshot(session_id, turn_num) self._pending_turn_data.setdefault(session_id, {})[turn_num] = { "prompt_text": prompt_text, "response_text": response_text, @@ -2736,7 +2738,7 @@ async def _upload_session_data( self, session_id: str, turns: list[dict], - ) -> None: + ) -> bool: """Upload the complete session turn records to cloud storage. Session data and skill data live in *separate* cloud paths so they @@ -2753,7 +2755,7 @@ async def _upload_session_data( "[SkillHub] session remote upload skipped: no local/OSS/S3 storage configured " "(skill registry may still use Nacos)" ) - return + return False session_payload = { "session_id": session_id, "timestamp": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"), @@ -2771,8 +2773,39 @@ async def _upload_session_data( len(turns), len(content), ) + return True except Exception as e: logger.warning("[SkillHub] session upload failed: %s", e) + return False + + def _maybe_upload_session_snapshot(self, session_id: str, turn_num: int) -> None: + interval = max(0, int(getattr(self.config, "sharing_session_upload_interval", 0) or 0)) + if not self.config.sharing_enabled or interval <= 0: + return + if turn_num <= 0 or turn_num % interval != 0: + return + turns = copy.deepcopy(self._session_turns.get(session_id, [])) + if not turns: + return + self._safe_create_task(self._upload_session_snapshot_and_trigger(session_id, turns)) + + async def _upload_session_snapshot_and_trigger(self, session_id: str, turns: list[dict]) -> None: + uploaded = await self._upload_session_data(session_id, turns) + if uploaded: + await self._trigger_evolve() + + async def _trigger_evolve(self) -> None: + url = str(getattr(self.config, "evolve_server_url", "") or "").strip().rstrip("/") + if not url: + return + try: + import httpx + + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post(f"{url}/trigger") + logger.info("[SkillHub] triggered evolve server: %s", url) + except Exception as e: + logger.warning("[SkillHub] evolve trigger failed: %s", e) # ------------------------------------------------------------------ # # Skill pull (cloud -> local) # diff --git a/tests/test_session_upload_trigger.py b/tests/test_session_upload_trigger.py new file mode 100644 index 0000000..858f72b --- /dev/null +++ b/tests/test_session_upload_trigger.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import httpx +import pytest + +from skillclaw.api_server import SkillClawAPIServer +from skillclaw.config import SkillClawConfig + + +def _server_for_snapshot_tests() -> SkillClawAPIServer: + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig( + sharing_enabled=True, + sharing_session_upload_interval=2, + evolve_server_url="http://evolve.test", + ) + server._session_turns = { + "session-a": [ + {"turn_num": 1, "prompt_text": "one"}, + {"turn_num": 2, "prompt_text": "two"}, + ] + } + return server + + +def test_session_snapshot_upload_only_queues_on_configured_interval() -> None: + server = _server_for_snapshot_tests() + queued = [] + + def fake_create_task(coro): + queued.append(coro) + return None + + server._safe_create_task = fake_create_task + + server._maybe_upload_session_snapshot("session-a", 1) + assert queued == [] + + server._maybe_upload_session_snapshot("session-a", 2) + assert len(queued) == 1 + + queued[0].close() + + +def test_session_snapshot_upload_uses_stable_deep_copy() -> None: + server = _server_for_snapshot_tests() + queued = [] + captured = {} + + class DummyCoro: + def close(self): + return None + + def fake_create_task(coro): + queued.append(coro) + return None + + def fake_upload_snapshot(session_id, turns): + captured["session_id"] = session_id + captured["turns"] = turns + return DummyCoro() + + server._safe_create_task = fake_create_task + server._upload_session_snapshot_and_trigger = fake_upload_snapshot + server._session_turns["session-a"][1]["tool_results"] = [{"text": "before"}] + + server._maybe_upload_session_snapshot("session-a", 2) + server._session_turns["session-a"][1]["tool_results"][0]["text"] = "after" + + assert len(queued) == 1 + assert queued[0] is not None + queued[0].close() + assert captured["session_id"] == "session-a" + assert captured["turns"][1]["tool_results"][0]["text"] == "before" + + +@pytest.mark.anyio +async def test_session_snapshot_triggers_evolve_only_after_successful_upload() -> None: + server = _server_for_snapshot_tests() + calls = {"upload": 0, "trigger": 0} + + async def fake_upload(_session_id, _turns): + calls["upload"] += 1 + return calls["upload"] == 1 + + async def fake_trigger(): + calls["trigger"] += 1 + + server._upload_session_data = fake_upload + server._trigger_evolve = fake_trigger + + await server._upload_session_snapshot_and_trigger("session-a", [{"turn_num": 1}]) + await server._upload_session_snapshot_and_trigger("session-a", [{"turn_num": 2}]) + + assert calls == {"upload": 2, "trigger": 1} diff --git a/tests/test_summarizer_skill_references.py b/tests/test_summarizer_skill_references.py new file mode 100644 index 0000000..69bbbe5 --- /dev/null +++ b/tests/test_summarizer_skill_references.py @@ -0,0 +1,41 @@ +from evolve_server.core.constants import NO_SKILL_KEY +from evolve_server.pipeline.aggregation import aggregate_sessions_by_skill +from evolve_server.pipeline.summarizer import _extract_session_metadata + + +def test_injected_skill_catalog_does_not_count_as_skill_reference(): + session = { + "session_id": "s1", + "turns": [ + { + "prompt_text": "task", + "response_text": "answer", + "injected_skills": ["api-helper", "debug-helper"], + } + ], + } + + _extract_session_metadata(session) + grouped = aggregate_sessions_by_skill([session]) + + assert session["_skills_referenced"] == set() + assert grouped == {NO_SKILL_KEY: [session]} + + +def test_read_or_modified_skills_count_as_skill_references(): + session = { + "session_id": "s1", + "turns": [ + { + "read_skills": [{"skill_name": "api-helper"}], + "modified_skills": [{"skill_name": "debug-helper"}], + "injected_skills": ["catalog-only"], + } + ], + } + + _extract_session_metadata(session) + grouped = aggregate_sessions_by_skill([session]) + + assert session["_skills_referenced"] == {"api-helper", "debug-helper"} + assert set(grouped) == {"api-helper", "debug-helper"} From 7ecf0954b05c4b6de5a6f5b636a1ce7018a14e61 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:53:48 +0800 Subject: [PATCH 4/5] feat: use local session storage for Nacos demos --- skillclaw/skill_hub.py | 2 ++ tests/test_skill_backend_config.py | 26 ++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/skillclaw/skill_hub.py b/skillclaw/skill_hub.py index 632dd11..3bd16d7 100644 --- a/skillclaw/skill_hub.py +++ b/skillclaw/skill_hub.py @@ -128,6 +128,8 @@ def object_storage_from_config(cls, config) -> Optional["SkillHub"]: backend = str(getattr(config, "sharing_session_backend", "") or "").strip().lower() if not backend and sharing_backend != "nacos": backend = sharing_backend + if not backend and sharing_backend == "nacos" and str(getattr(config, "sharing_local_root", "") or ""): + backend = "local" endpoint = str(getattr(config, "sharing_endpoint", "") or "") bucket = str(getattr(config, "sharing_bucket", "") or "") diff --git a/tests/test_skill_backend_config.py b/tests/test_skill_backend_config.py index a82e437..afdf900 100644 --- a/tests/test_skill_backend_config.py +++ b/tests/test_skill_backend_config.py @@ -3,6 +3,7 @@ from evolve_server.core.config import EvolveServerConfig from skillclaw.config import SkillClawConfig from skillclaw.config_store import ConfigStore +from skillclaw.skill_hub import SkillHub def test_skill_backend_overrides_skill_storage_without_changing_session_storage(monkeypatch) -> None: @@ -53,6 +54,31 @@ def test_skill_backend_empty_keeps_legacy_nacos_backend_behavior(monkeypatch) -> assert evolve_config.storage_endpoint == "" +def test_nacos_backend_uses_local_root_for_session_object_storage(tmp_path) -> None: + cfg = SkillClawConfig( + sharing_backend="nacos", + sharing_endpoint="http://legacy-nacos.test", + sharing_local_root=str(tmp_path / "share"), + sharing_group_id="team-a", + ) + + hub = SkillHub.object_storage_from_config(cfg) + + assert hub is not None + hub._bucket.put_object("team-a/sessions/demo.json", b"{}") + assert (tmp_path / "share" / "team-a" / "sessions" / "demo.json").read_text() == "{}" + + +def test_nacos_backend_without_local_root_has_no_session_object_storage() -> None: + cfg = SkillClawConfig( + sharing_backend="nacos", + sharing_endpoint="http://legacy-nacos.test", + sharing_group_id="team-a", + ) + + assert SkillHub.object_storage_from_config(cfg) is None + + def test_config_store_reads_skill_backend() -> None: class InlineConfigStore(ConfigStore): def load(self) -> dict: From 46ccd13117348e575ad51d61af59048066259dbb Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 17:17:52 +0800 Subject: [PATCH 5/5] fix: remove unused session upload test import --- tests/test_session_upload_trigger.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_session_upload_trigger.py b/tests/test_session_upload_trigger.py index 858f72b..11daf73 100644 --- a/tests/test_session_upload_trigger.py +++ b/tests/test_session_upload_trigger.py @@ -1,6 +1,5 @@ from __future__ import annotations -import httpx import pytest from skillclaw.api_server import SkillClawAPIServer