Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions evolve_server/engines/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -984,8 +984,34 @@ async def run_once(self) -> dict:
queued_candidates,
elapsed,
)
if uploaded_skills > 0:
await self._notify_proxy_reload()
return summary

async def _notify_proxy_reload(self) -> None:
mode = str(getattr(self.config, "skill_reload_mode", "") or "poll").strip().lower()
url = str(getattr(self.config, "proxy_reload_url", "") or "").strip().rstrip("/")
if mode != "callback" or not url:
return
headers: dict[str, str] = {}
api_key = str(getattr(self.config, "proxy_reload_api_key", "") or "")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
import httpx

for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.post(f"{url}/internal/reload-skills", headers=headers)
resp.raise_for_status()
logger.info("[EvolveServer] notified proxy to reload skills: %s", url)
return
except Exception as exc:
if attempt < 2:
await asyncio.sleep(1.0 * (attempt + 1))
else:
logger.warning("[EvolveServer] proxy reload notify failed after 3 attempts: %s", exc)

async def run_periodic(self) -> None:
self._running = True
logger.info("[EvolveServer] periodic mode: interval=%ds", self.config.interval_seconds)
Expand Down
76 changes: 69 additions & 7 deletions skillclaw/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ def __init__(
self._background_tasks: set[asyncio.Task] = set() # transient async tasks (upload, submit)
self._responses_store: dict[str, dict[str, Any]] = {} # response_id -> stored response/history
self._session_sweeper_task: Optional[asyncio.Task] = None
self._skill_reload_task: Optional[asyncio.Task] = None
self._session_idle_close_seconds = max(
0,
int(getattr(config, "session_idle_close_seconds", _SESSION_IDLE_CLOSE_SECONDS)),
Expand All @@ -1440,6 +1441,10 @@ def __init__(
1,
int(getattr(config, "shutdown_drain_timeout_seconds", _SHUTDOWN_DRAIN_TIMEOUT_SECONDS)),
)
self._skill_reload_interval_seconds = max(
5,
int(getattr(config, "sharing_skill_reload_interval_seconds", 30) or 30),
)

# Session boundary detection for non-OpenClaw agents (QwenPaw, IronClaw, etc.)
# Maps pseudo-session key (e.g. "tui-model") to tracking metadata.
Expand Down Expand Up @@ -1478,6 +1483,7 @@ def _build_app(self) -> FastAPI:
async def lifespan(_app: FastAPI):
owner._ready_event.set()
owner._start_session_idle_sweeper()
owner._start_skill_reload_polling()
try:
yield
finally:
Expand All @@ -1491,6 +1497,17 @@ async def lifespan(_app: FastAPI):
async def healthz():
return {"ok": True}

@app.post("/internal/reload-skills")
async def reload_skills(
request: Request,
authorization: Optional[str] = Header(default=None),
):
owner: SkillClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
await owner._pull_skills_from_cloud()
skill_count = len(owner.skill_manager.get_all_skills()) if owner.skill_manager else 0
return {"ok": True, "skills": skill_count}

@app.get("/v1/models")
async def list_models(
request: Request,
Expand Down Expand Up @@ -1909,6 +1926,42 @@ async def _await_background_tasks(self, timeout_seconds: float) -> None:
else:
logger.info("[OpenClaw] background drain complete (%d task(s))", len(done))

def _start_skill_reload_polling(self) -> None:
if not self.config.sharing_enabled:
return
mode = str(getattr(self.config, "sharing_skill_reload_mode", "") or "poll").strip().lower()
if mode != "poll":
return
if self._skill_reload_task is not None and not self._skill_reload_task.done():
return
self._skill_reload_task = asyncio.create_task(self._skill_reload_poll_loop())
self._skill_reload_task.add_done_callback(self._task_done_cb)
logger.info(
"[SkillHub] skill reload polling enabled interval=%ds",
self._skill_reload_interval_seconds,
)

async def _skill_reload_poll_loop(self) -> None:
consecutive_failures = 0
try:
while True:
jitter = random.uniform(0, self._skill_reload_interval_seconds * 0.1)
backoff = min(consecutive_failures * 5.0, 60.0)
await asyncio.sleep(self._skill_reload_interval_seconds + jitter + backoff)
try:
await self._pull_skills_from_cloud()
consecutive_failures = 0
except Exception as exc:
consecutive_failures += 1
logger.warning(
"[SkillHub] skill reload poll failed (streak=%d): %s",
consecutive_failures,
exc,
)
except asyncio.CancelledError:
logger.info("[SkillHub] skill reload polling stopped")
raise

async def _drain_active_sessions(self, reason: str) -> None:
active_ids = self._collect_active_session_ids()
if not active_ids:
Expand All @@ -1918,6 +1971,10 @@ async def _drain_active_sessions(self, reason: str) -> None:
await self._close_session(sid, reason=reason)

async def _shutdown_cleanup(self) -> None:
if self._skill_reload_task is not None:
self._skill_reload_task.cancel()
await asyncio.gather(self._skill_reload_task, return_exceptions=True)
self._skill_reload_task = None
if self._session_sweeper_task is not None:
self._session_sweeper_task.cancel()
await asyncio.gather(self._session_sweeper_task, return_exceptions=True)
Expand Down Expand Up @@ -2798,14 +2855,19 @@ async def _trigger_evolve(self) -> None:
url = str(getattr(self.config, "evolve_server_url", "") or "").strip().rstrip("/")
if not url:
return
try:
import httpx
import httpx

async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(f"{url}/trigger")
logger.info("[SkillHub] triggered evolve server: %s", url)
except Exception as e:
logger.warning("[SkillHub] evolve trigger failed: %s", e)
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(f"{url}/trigger")
logger.info("[SkillHub] triggered evolve server: %s", url)
return
except Exception as e:
if attempt < 2:
await asyncio.sleep(1.0 * (attempt + 1))
else:
logger.warning("[SkillHub] evolve trigger failed after 3 attempts: %s", e)

# ------------------------------------------------------------------ #
# Skill pull (cloud -> local) #
Expand Down
107 changes: 107 additions & 0 deletions tests/test_evolve_proxy_reload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

import types

import pytest

from evolve_server.core.config import EvolveServerConfig
from evolve_server.engines.workflow import EvolveServer


@pytest.mark.anyio
async def test_notify_proxy_reload_posts_callback_with_auth(monkeypatch) -> None:
server = EvolveServer.__new__(EvolveServer)
server.config = EvolveServerConfig(
skill_reload_mode="callback",
proxy_reload_url="http://proxy.test/",
proxy_reload_api_key="secret",
)
captured = {}

class FakeAsyncClient:
def __init__(self, *args, **kwargs):
captured["timeout"] = kwargs.get("timeout")

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False

async def post(self, url, headers):
captured["url"] = url
captured["headers"] = headers
return types.SimpleNamespace(raise_for_status=lambda: None)

fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient)
monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx)

await server._notify_proxy_reload()

assert captured == {
"timeout": 5.0,
"url": "http://proxy.test/internal/reload-skills",
"headers": {"Authorization": "Bearer secret"},
}


@pytest.mark.anyio
async def test_notify_proxy_reload_retries_on_http_error(monkeypatch) -> None:
server = EvolveServer.__new__(EvolveServer)
server.config = EvolveServerConfig(
skill_reload_mode="callback",
proxy_reload_url="http://proxy.test",
proxy_reload_api_key="secret",
)
attempts = {"count": 0}

class FakeResponse:
def raise_for_status(self):
raise RuntimeError("401 Unauthorized")

class FakeAsyncClient:
def __init__(self, *args, **kwargs):
pass

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
return False

async def post(self, url, headers):
attempts["count"] += 1
return FakeResponse()

async def fake_sleep(_delay):
return None

fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient)
monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx)
monkeypatch.setattr("evolve_server.engines.workflow.asyncio.sleep", fake_sleep)

await server._notify_proxy_reload()

assert attempts == {"count": 3}


@pytest.mark.anyio
async def test_notify_proxy_reload_skips_non_callback_modes(monkeypatch) -> None:
server = EvolveServer.__new__(EvolveServer)
server.config = EvolveServerConfig(
skill_reload_mode="poll",
proxy_reload_url="http://proxy.test",
proxy_reload_api_key="secret",
)
called = {"http": False}

class FakeAsyncClient:
def __init__(self, *args, **kwargs):
called["http"] = True

fake_httpx = types.SimpleNamespace(AsyncClient=FakeAsyncClient)
monkeypatch.setitem(__import__("sys").modules, "httpx", fake_httpx)

await server._notify_proxy_reload()

assert called == {"http": False}
96 changes: 96 additions & 0 deletions tests/test_session_upload_trigger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import httpx
import pytest

from skillclaw.api_server import SkillClawAPIServer
Expand Down Expand Up @@ -92,3 +93,98 @@ async def fake_trigger():
await server._upload_session_snapshot_and_trigger("session-a", [{"turn_num": 2}])

assert calls == {"upload": 2, "trigger": 1}


def test_skill_reload_polling_starts_only_in_poll_mode(monkeypatch) -> None:
server = object.__new__(SkillClawAPIServer)
server.config = SkillClawConfig(sharing_enabled=True, sharing_skill_reload_mode="poll")
server._skill_reload_task = None
server._skill_reload_interval_seconds = 30
created = []

class FakeTask:
def done(self):
return False

def add_done_callback(self, _callback):
return None

def fake_create_task(coro):
created.append(coro)
return FakeTask()

import asyncio

monkeypatch.setattr(asyncio, "create_task", fake_create_task)
server._start_skill_reload_polling()

assert len(created) == 1
created[0].close()


def test_skill_reload_polling_does_not_start_when_disabled_or_callback(monkeypatch) -> None:
created = []

class FakeTask:
def done(self):
return False

def fake_create_task(coro):
created.append(coro)
return FakeTask()

import asyncio

monkeypatch.setattr(asyncio, "create_task", fake_create_task)
for mode in ("off", "callback"):
server = object.__new__(SkillClawAPIServer)
server.config = SkillClawConfig(sharing_enabled=True, sharing_skill_reload_mode=mode)
server._skill_reload_task = None
server._skill_reload_interval_seconds = 30
server._start_skill_reload_polling()

server = object.__new__(SkillClawAPIServer)
server.config = SkillClawConfig(sharing_enabled=False, sharing_skill_reload_mode="poll")
server._skill_reload_task = None
server._skill_reload_interval_seconds = 30
server._start_skill_reload_polling()

assert created == []


@pytest.mark.anyio
async def test_internal_reload_skills_endpoint_requires_auth_and_pulls(tmp_path) -> None:
server = SkillClawAPIServer(
SkillClawConfig(
proxy_api_key="secret",
record_enabled=False,
record_dir=str(tmp_path),
)
)
calls = {"pull": 0}

async def fake_pull(skip_names=None):
assert skip_names is None
calls["pull"] += 1

class FakeSkillManager:
def get_all_skills(self):
return [{"name": "weekly-report"}, {"name": "demo"}]

server._pull_skills_from_cloud = fake_pull
server.skill_manager = FakeSkillManager()

client = httpx.AsyncClient(transport=httpx.ASGITransport(app=server.app), base_url="http://test")
try:
unauthorized = await client.post("/internal/reload-skills")
authorized = await client.post(
"/internal/reload-skills",
headers={"Authorization": "Bearer secret"},
)
finally:
await client.aclose()

assert unauthorized.status_code == 401
assert authorized.status_code == 200
assert authorized.json() == {"ok": True, "skills": 2}
assert calls == {"pull": 1}
Loading