From 599cc83e7aeee062f71227a86eaaa64159280353 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:55:56 +0800 Subject: [PATCH 1/3] feat: poll for shared skill reloads --- skillclaw/api_server.py | 37 ++++++++++++++++++ tests/test_session_upload_trigger.py | 57 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index 283724e..a645cb7 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -1428,6 +1428,7 @@ def __init__( self._background_tasks: set[asyncio.Task] = set() # transient async tasks (upload, submit) self._responses_store: dict[str, dict[str, Any]] = {} # response_id -> stored response/history self._session_sweeper_task: Optional[asyncio.Task] = None + self._skill_reload_task: Optional[asyncio.Task] = None self._session_idle_close_seconds = max( 0, int(getattr(config, "session_idle_close_seconds", _SESSION_IDLE_CLOSE_SECONDS)), @@ -1440,6 +1441,10 @@ def __init__( 1, int(getattr(config, "shutdown_drain_timeout_seconds", _SHUTDOWN_DRAIN_TIMEOUT_SECONDS)), ) + self._skill_reload_interval_seconds = max( + 5, + int(getattr(config, "sharing_skill_reload_interval_seconds", 30) or 30), + ) # Session boundary detection for non-OpenClaw agents (QwenPaw, IronClaw, etc.) # Maps pseudo-session key (e.g. "tui-model") to tracking metadata. @@ -1478,6 +1483,7 @@ def _build_app(self) -> FastAPI: async def lifespan(_app: FastAPI): owner._ready_event.set() owner._start_session_idle_sweeper() + owner._start_skill_reload_polling() try: yield finally: @@ -1909,6 +1915,33 @@ async def _await_background_tasks(self, timeout_seconds: float) -> None: else: logger.info("[OpenClaw] background drain complete (%d task(s))", len(done)) + def _start_skill_reload_polling(self) -> None: + if not self.config.sharing_enabled: + return + mode = str(getattr(self.config, "sharing_skill_reload_mode", "") or "poll").strip().lower() + if mode != "poll": + return + if self._skill_reload_task is not None and not self._skill_reload_task.done(): + return + self._skill_reload_task = asyncio.create_task(self._skill_reload_poll_loop()) + self._skill_reload_task.add_done_callback(self._task_done_cb) + logger.info( + "[SkillHub] skill reload polling enabled interval=%ds", + self._skill_reload_interval_seconds, + ) + + async def _skill_reload_poll_loop(self) -> None: + try: + while True: + await asyncio.sleep(self._skill_reload_interval_seconds) + try: + await self._pull_skills_from_cloud() + except Exception as exc: + logger.warning("[SkillHub] skill reload poll failed: %s", exc) + except asyncio.CancelledError: + logger.info("[SkillHub] skill reload polling stopped") + raise + async def _drain_active_sessions(self, reason: str) -> None: active_ids = self._collect_active_session_ids() if not active_ids: @@ -1918,6 +1951,10 @@ async def _drain_active_sessions(self, reason: str) -> None: await self._close_session(sid, reason=reason) async def _shutdown_cleanup(self) -> None: + if self._skill_reload_task is not None: + self._skill_reload_task.cancel() + await asyncio.gather(self._skill_reload_task, return_exceptions=True) + self._skill_reload_task = None if self._session_sweeper_task is not None: self._session_sweeper_task.cancel() await asyncio.gather(self._session_sweeper_task, return_exceptions=True) diff --git a/tests/test_session_upload_trigger.py b/tests/test_session_upload_trigger.py index 11daf73..718fdc2 100644 --- a/tests/test_session_upload_trigger.py +++ b/tests/test_session_upload_trigger.py @@ -92,3 +92,60 @@ async def fake_trigger(): await server._upload_session_snapshot_and_trigger("session-a", [{"turn_num": 2}]) assert calls == {"upload": 2, "trigger": 1} + + +def test_skill_reload_polling_starts_only_in_poll_mode(monkeypatch) -> None: + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig(sharing_enabled=True, sharing_skill_reload_mode="poll") + server._skill_reload_task = None + server._skill_reload_interval_seconds = 30 + created = [] + + class FakeTask: + def done(self): + return False + + def add_done_callback(self, _callback): + return None + + def fake_create_task(coro): + created.append(coro) + return FakeTask() + + import asyncio + + monkeypatch.setattr(asyncio, "create_task", fake_create_task) + server._start_skill_reload_polling() + + assert len(created) == 1 + created[0].close() + + +def test_skill_reload_polling_does_not_start_when_disabled_or_callback(monkeypatch) -> None: + created = [] + + class FakeTask: + def done(self): + return False + + def fake_create_task(coro): + created.append(coro) + return FakeTask() + + import asyncio + + monkeypatch.setattr(asyncio, "create_task", fake_create_task) + for mode in ("off", "callback"): + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig(sharing_enabled=True, sharing_skill_reload_mode=mode) + server._skill_reload_task = None + server._skill_reload_interval_seconds = 30 + server._start_skill_reload_polling() + + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig(sharing_enabled=False, sharing_skill_reload_mode="poll") + server._skill_reload_task = None + server._skill_reload_interval_seconds = 30 + server._start_skill_reload_polling() + + assert created == [] From a2876196496089e3e93d3cb36c2dc9cd0f450283 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 00:57:36 +0800 Subject: [PATCH 2/3] feat: reload proxy skills after evolve uploads --- evolve_server/engines/workflow.py | 26 +++++++ skillclaw/api_server.py | 43 ++++++++--- tests/test_evolve_proxy_reload.py | 107 +++++++++++++++++++++++++++ tests/test_session_upload_trigger.py | 39 ++++++++++ 4 files changed, 206 insertions(+), 9 deletions(-) create mode 100644 tests/test_evolve_proxy_reload.py diff --git a/evolve_server/engines/workflow.py b/evolve_server/engines/workflow.py index 601728d..86a6026 100644 --- a/evolve_server/engines/workflow.py +++ b/evolve_server/engines/workflow.py @@ -984,8 +984,34 @@ async def run_once(self) -> dict: queued_candidates, elapsed, ) + if uploaded_skills > 0: + await self._notify_proxy_reload() return summary + async def _notify_proxy_reload(self) -> None: + mode = str(getattr(self.config, "skill_reload_mode", "") or "poll").strip().lower() + url = str(getattr(self.config, "proxy_reload_url", "") or "").strip().rstrip("/") + if mode != "callback" or not url: + return + headers: dict[str, str] = {} + api_key = str(getattr(self.config, "proxy_reload_api_key", "") or "") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + import httpx + + for attempt in range(3): + try: + async with httpx.AsyncClient(timeout=5.0) as client: + resp = await client.post(f"{url}/internal/reload-skills", headers=headers) + resp.raise_for_status() + logger.info("[EvolveServer] notified proxy to reload skills: %s", url) + return + except Exception as exc: + if attempt < 2: + await asyncio.sleep(1.0 * (attempt + 1)) + else: + logger.warning("[EvolveServer] proxy reload notify failed after 3 attempts: %s", exc) + async def run_periodic(self) -> None: self._running = True logger.info("[EvolveServer] periodic mode: interval=%ds", self.config.interval_seconds) diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index a645cb7..8655d99 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -1497,6 +1497,17 @@ async def lifespan(_app: FastAPI): async def healthz(): return {"ok": True} + @app.post("/internal/reload-skills") + async def reload_skills( + request: Request, + authorization: Optional[str] = Header(default=None), + ): + owner: SkillClawAPIServer = request.app.state.owner + await owner._check_auth(authorization) + await owner._pull_skills_from_cloud() + skill_count = len(owner.skill_manager.get_all_skills()) if owner.skill_manager else 0 + return {"ok": True, "skills": skill_count} + @app.get("/v1/models") async def list_models( request: Request, @@ -1931,13 +1942,22 @@ def _start_skill_reload_polling(self) -> None: ) async def _skill_reload_poll_loop(self) -> None: + consecutive_failures = 0 try: while True: - await asyncio.sleep(self._skill_reload_interval_seconds) + jitter = random.uniform(0, self._skill_reload_interval_seconds * 0.1) + backoff = min(consecutive_failures * 5.0, 60.0) + await asyncio.sleep(self._skill_reload_interval_seconds + jitter + backoff) try: await self._pull_skills_from_cloud() + consecutive_failures = 0 except Exception as exc: - logger.warning("[SkillHub] skill reload poll failed: %s", exc) + consecutive_failures += 1 + logger.warning( + "[SkillHub] skill reload poll failed (streak=%d): %s", + consecutive_failures, + exc, + ) except asyncio.CancelledError: logger.info("[SkillHub] skill reload polling stopped") raise @@ -2835,14 +2855,19 @@ async def _trigger_evolve(self) -> None: url = str(getattr(self.config, "evolve_server_url", "") or "").strip().rstrip("/") if not url: return - try: - import httpx + import httpx - async with httpx.AsyncClient(timeout=5.0) as client: - await client.post(f"{url}/trigger") - logger.info("[SkillHub] triggered evolve server: %s", url) - except Exception as e: - logger.warning("[SkillHub] evolve trigger failed: %s", e) + for attempt in range(3): + try: + async with httpx.AsyncClient(timeout=5.0) as client: + await client.post(f"{url}/trigger") + logger.info("[SkillHub] triggered evolve server: %s", url) + return + except Exception as e: + if attempt < 2: + await asyncio.sleep(1.0 * (attempt + 1)) + else: + logger.warning("[SkillHub] evolve trigger failed after 3 attempts: %s", e) # ------------------------------------------------------------------ # # Skill pull (cloud -> local) # diff --git a/tests/test_evolve_proxy_reload.py b/tests/test_evolve_proxy_reload.py new file mode 100644 index 0000000..ff1bcfb --- /dev/null +++ b/tests/test_evolve_proxy_reload.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import types + +import pytest + +from evolve_server.core.config import EvolveServerConfig +from evolve_server.engines.workflow import EvolveServer + + +@pytest.mark.anyio +async def test_notify_proxy_reload_posts_callback_with_auth(monkeypatch) -> None: + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig( + skill_reload_mode="callback", + proxy_reload_url="http://proxy.test/", + proxy_reload_api_key="secret", + ) + captured = {} + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + captured["timeout"] = kwargs.get("timeout") + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def post(self, url, headers): + captured["url"] = url + captured["headers"] = headers + return types.SimpleNamespace(raise_for_status=lambda: None) + + fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient) + monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx) + + await server._notify_proxy_reload() + + assert captured == { + "timeout": 5.0, + "url": "http://proxy.test/internal/reload-skills", + "headers": {"Authorization": "Bearer secret"}, + } + + +@pytest.mark.anyio +async def test_notify_proxy_reload_retries_on_http_error(monkeypatch) -> None: + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig( + skill_reload_mode="callback", + proxy_reload_url="http://proxy.test", + proxy_reload_api_key="secret", + ) + attempts = {"count": 0} + + class FakeResponse: + def raise_for_status(self): + raise RuntimeError("401 Unauthorized") + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def post(self, url, headers): + attempts["count"] += 1 + return FakeResponse() + + async def fake_sleep(_delay): + return None + + fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient) + monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx) + monkeypatch.setattr("evolve_server.engines.workflow.asyncio.sleep", fake_sleep) + + await server._notify_proxy_reload() + + assert attempts == {"count": 3} + + +@pytest.mark.anyio +async def test_notify_proxy_reload_skips_non_callback_modes(monkeypatch) -> None: + server = EvolveServer.__new__(EvolveServer) + server.config = EvolveServerConfig( + skill_reload_mode="poll", + proxy_reload_url="http://proxy.test", + proxy_reload_api_key="secret", + ) + called = {"http": False} + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + called["http"] = True + + fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient) + monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx) + + await server._notify_proxy_reload() + + assert called == {"http": False} diff --git a/tests/test_session_upload_trigger.py b/tests/test_session_upload_trigger.py index 718fdc2..4b6cf13 100644 --- a/tests/test_session_upload_trigger.py +++ b/tests/test_session_upload_trigger.py @@ -1,6 +1,7 @@ from __future__ import annotations import pytest +import httpx from skillclaw.api_server import SkillClawAPIServer from skillclaw.config import SkillClawConfig @@ -149,3 +150,41 @@ def fake_create_task(coro): server._start_skill_reload_polling() assert created == [] + + +@pytest.mark.anyio +async def test_internal_reload_skills_endpoint_requires_auth_and_pulls(tmp_path) -> None: + server = SkillClawAPIServer( + SkillClawConfig( + proxy_api_key="secret", + record_enabled=False, + record_dir=str(tmp_path), + ) + ) + calls = {"pull": 0} + + async def fake_pull(skip_names=None): + assert skip_names is None + calls["pull"] += 1 + + class FakeSkillManager: + def get_all_skills(self): + return [{"name": "weekly-report"}, {"name": "demo"}] + + server._pull_skills_from_cloud = fake_pull + server.skill_manager = FakeSkillManager() + + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=server.app), base_url="http://test") + try: + unauthorized = await client.post("/internal/reload-skills") + authorized = await client.post( + "/internal/reload-skills", + headers={"Authorization": "Bearer secret"}, + ) + finally: + await client.aclose() + + assert unauthorized.status_code == 401 + assert authorized.status_code == 200 + assert authorized.json() == {"ok": True, "skills": 2} + assert calls == {"pull": 1} From c1a771693eb5d42da434407ed805ef0e4cb3bc06 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 17:20:19 +0800 Subject: [PATCH 3/3] fix: sort session upload test imports --- tests/test_session_upload_trigger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_session_upload_trigger.py b/tests/test_session_upload_trigger.py index 4b6cf13..6e570ec 100644 --- a/tests/test_session_upload_trigger.py +++ b/tests/test_session_upload_trigger.py @@ -1,7 +1,7 @@ from __future__ import annotations -import pytest import httpx +import pytest from skillclaw.api_server import SkillClawAPIServer from skillclaw.config import SkillClawConfig