Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions evolve_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def _build_config_from_args(args: argparse.Namespace) -> EvolveServerConfig:
config.http_port = args.port
if args.publish_mode:
config.publish_mode = args.publish_mode
if args.nacos_publish_mode:
config.nacos_publish_mode = args.nacos_publish_mode
if args.use_skill_verifier is not None:
config.use_skill_verifier = args.use_skill_verifier
if args.skill_verifier_min_score is not None:
Expand Down Expand Up @@ -115,6 +117,12 @@ def build_parser() -> argparse.ArgumentParser:
default=None,
help="Direct publish to skills/ or stage jobs for client-side validation before publish.",
)
parser.add_argument(
"--nacos-publish-mode",
choices=["draft", "review", "direct"],
default=None,
help="Nacos Skill lifecycle mode: draft only, submit for review, or publish latest directly.",
)
skill_verifier_group = parser.add_mutually_exclusive_group()
skill_verifier_group.add_argument(
"--skill-verifier",
Expand Down
21 changes: 21 additions & 0 deletions evolve_server/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
_PACKAGE_DIR = Path(__file__).resolve().parent
_DEFAULT_AGENT_EVOLVE_BASE_URL = "https://api.openai.com/v1"
_DEFAULT_AGENT_EVOLVE_MODEL = "gpt-5.4"
_NACOS_PUBLISH_MODES = {"draft", "review", "direct"}
_SKILL_RELOAD_MODES = {"off", "poll", "callback"}


def _load_dotenv() -> None:
Expand Down Expand Up @@ -60,6 +62,11 @@ def _infer_storage_backend(endpoint: str, bucket: str, local_root: str) -> str:
return ""


def _normalize_choice(value: str, allowed: set[str], default: str) -> str:
normalized = str(value or "").strip().lower()
return normalized if normalized in allowed else default


@dataclass
class EvolveServerConfig:
engine: str = "workflow"
Expand Down Expand Up @@ -113,6 +120,10 @@ class EvolveServerConfig:
nacos_username: str = ""
nacos_password: str = ""
nacos_label: str = "latest"
nacos_publish_mode: str = "review"
skill_reload_mode: str = "poll"
proxy_reload_url: str = ""
proxy_reload_api_key: str = ""

# Scheduling
interval_seconds: int = 600
Expand All @@ -139,6 +150,8 @@ def __post_init__(self) -> None:
self.publish_mode = str(self.publish_mode or "direct").strip().lower() or "direct"
if self.publish_mode not in {"direct", "validated"}:
self.publish_mode = "direct"
self.nacos_publish_mode = _normalize_choice(self.nacos_publish_mode, _NACOS_PUBLISH_MODES, "review")
self.skill_reload_mode = _normalize_choice(self.skill_reload_mode, _SKILL_RELOAD_MODES, "poll")
self.validation_required_results = max(1, int(self.validation_required_results or 1))
self.validation_required_approvals = max(1, int(self.validation_required_approvals or 1))
self.validation_min_mean_score = max(
Expand Down Expand Up @@ -239,6 +252,10 @@ def from_env(cls) -> "EvolveServerConfig":
nacos_username=os.environ.get("EVOLVE_NACOS_USERNAME", ""),
nacos_password=os.environ.get("EVOLVE_NACOS_PASSWORD", ""),
nacos_label=os.environ.get("EVOLVE_NACOS_LABEL", "latest"),
nacos_publish_mode=os.environ.get("EVOLVE_NACOS_PUBLISH_MODE", "review"),
skill_reload_mode=os.environ.get("EVOLVE_SKILL_RELOAD_MODE", "poll"),
proxy_reload_url=os.environ.get("EVOLVE_PROXY_RELOAD_URL", ""),
proxy_reload_api_key=os.environ.get("EVOLVE_PROXY_RELOAD_API_KEY", ""),
interval_seconds=int(os.environ.get("EVOLVE_INTERVAL", "600")),
http_port=int(os.environ.get("EVOLVE_PORT", "8787")),
history_path=os.environ.get("EVOLVE_HISTORY_LOG", "evolve_history.jsonl"),
Expand Down Expand Up @@ -352,6 +369,10 @@ def from_skillclaw_config(cls, config) -> "EvolveServerConfig":
nacos_username=str(getattr(config, "sharing_nacos_username", "") or ""),
nacos_password=str(getattr(config, "sharing_nacos_password", "") or ""),
nacos_label=str(getattr(config, "sharing_nacos_label", "") or "latest"),
nacos_publish_mode=str(getattr(config, "sharing_nacos_publish_mode", "") or "review"),
skill_reload_mode=str(getattr(config, "sharing_skill_reload_mode", "") or "poll"),
proxy_reload_url=str(getattr(config, "evolve_proxy_reload_url", "") or ""),
proxy_reload_api_key=str(getattr(config, "proxy_api_key", "") or ""),
openclaw_bin=os.environ.get("AGENT_EVOLVE_OPENCLAW_BIN", "openclaw"),
openclaw_home=os.environ.get("AGENT_EVOLVE_OPENCLAW_HOME", ""),
fresh=os.environ.get("AGENT_EVOLVE_FRESH", "1").lower() not in {"0", "false", "no"},
Expand Down
2 changes: 1 addition & 1 deletion evolve_server/engines/EVOLVE_AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ compact format. Each file contains:
- `mean_score`: average ORM score across rollouts
- `success_count` / `fail_count`: how many rollouts passed / failed
- `stability`: `"all_success"`, `"all_fail"`, or `"unstable"`
- `_skills_referenced`: list of skill names the agent read or was injected
- `_skills_referenced`: list of skill names the agent concretely read or modified
- `_avg_prm`: mean PRM score across all turns (0.0–1.0; higher = better)
- `_has_tool_errors`: whether any tool call failed during the session
- `_trajectory`: **structured step-by-step trace** of the agent's actions.
Expand Down
66 changes: 60 additions & 6 deletions evolve_server/engines/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,36 @@ def _overlay_manifest_metadata(
skill["description"] = description
return skill

def _wait_nacos_publish(self, name: str, version: str, timeout: float = 30.0) -> bool:
"""Publish a Nacos skill version and wait for it to go online.

Nacos v3 runs a publish pipeline on submit. The pipeline must approve
before the publish call succeeds. This method polls for approval and
retries publish until the version goes online with the latest label.
"""
import time

deadline = time.monotonic() + timeout
published = False
while time.monotonic() < deadline:
try:
detail = self._nacos_skill_client.get_skill(name)
labels = detail.get("labels", {}) if detail else {}
if labels.get("latest") == version:
return True
if not published:
try:
self._nacos_skill_client.publish(name, version, update_latest_label=True)
logger.info("[EvolveServer] Nacos publish accepted for %s %s", name, version)
published = True
except Exception:
pass
except Exception:
pass
time.sleep(2.0)
logger.warning("[EvolveServer] Nacos publish not confirmed for %s %s within %.0fs", name, version, timeout)
return False

def _fetch_skill(self, name: str) -> Optional[str]:
if self._nacos_skill_client is not None:
try:
Expand Down Expand Up @@ -237,13 +267,25 @@ def _upload_skill(self, skill: dict, action: str) -> str:
overwrite=True,
target_version=target_version,
)
self._nacos_skill_client.submit(name, target_version)
publish_mode = str(getattr(self.config, "nacos_publish_mode", "") or "review").strip().lower()
if publish_mode in {"review", "direct"}:
self._nacos_skill_client.submit(name, target_version)
publish_confirmed = False
if publish_mode == "direct":
publish_confirmed = self._wait_nacos_publish(name, target_version)
logger.info(
"[EvolveServer] uploaded and submitted skill %s to Nacos as %s via action=%s",
"[EvolveServer] uploaded skill %s to Nacos as %s via action=%s publish_mode=%s",
name,
target_version,
action,
publish_mode,
)
if publish_mode == "draft":
return "uploaded_draft"
if publish_mode == "review":
return "uploaded_pending_review"
if not publish_confirmed:
return "uploaded_pending_publish"
return "uploaded"

skill_id = self._id_registry.get_or_create(name)
Expand Down Expand Up @@ -314,13 +356,13 @@ async def _resolve_and_upload(self, skill: dict, action_type: str) -> tuple[str,
has_conflict = await self._call_storage(self._detect_conflict, name, skill)
if not has_conflict:
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)
return self._upload_status_to_action(action_type, upload_status)

logger.info("[EvolveServer] conflict detected for '%s' - merging", name)
existing_md = await self._call_storage(self._fetch_skill, name)
if not existing_md:
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)
return self._upload_status_to_action(action_type, upload_status)

existing_skill = parse_skill_content(name, existing_md)
existing_skill = self._overlay_manifest_metadata(
Expand All @@ -332,11 +374,23 @@ async def _resolve_and_upload(self, skill: dict, action_type: str) -> tuple[str,
if merged and merged.get("name"):
merged["name"] = name
upload_status = await self._call_storage(self._upload_skill, merged, "merge")
return ("merge", True) if upload_status == "uploaded" else (upload_status, False)
return self._upload_status_to_action("merge", upload_status)

logger.warning("[EvolveServer] merge failed for '%s' - keeping incoming version", name)
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)
return self._upload_status_to_action(action_type, upload_status)

@staticmethod
def _upload_status_to_action(action_type: str, upload_status: str) -> tuple[str, bool]:
if upload_status == "uploaded":
return action_type, True
if upload_status == "uploaded_pending_review":
return f"{action_type}_pending_review", False
if upload_status == "uploaded_pending_publish":
return f"{action_type}_pending_publish", False
if upload_status == "uploaded_draft":
return f"{action_type}_draft", False
return upload_status, False

def _empty_judge_summary(self) -> dict[str, Any]:
return {
Expand Down
37 changes: 35 additions & 2 deletions skillclaw/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import asyncio
import base64
import copy
import json
import logging
import os
Expand Down Expand Up @@ -2387,6 +2388,7 @@ def _prompt_len(msgs):
"prm_score": None,
}
)
self._maybe_upload_session_snapshot(session_id, turn_num)
self._pending_turn_data.setdefault(session_id, {})[turn_num] = {
"prompt_text": prompt_text,
"response_text": response_text,
Expand Down Expand Up @@ -2736,7 +2738,7 @@ async def _upload_session_data(
self,
session_id: str,
turns: list[dict],
) -> None:
) -> bool:
"""Upload the complete session turn records to cloud storage.

Session data and skill data live in *separate* cloud paths so they
Expand All @@ -2753,7 +2755,7 @@ async def _upload_session_data(
"[SkillHub] session remote upload skipped: no local/OSS/S3 storage configured "
"(skill registry may still use Nacos)"
)
return
return False
session_payload = {
"session_id": session_id,
"timestamp": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"),
Expand All @@ -2771,8 +2773,39 @@ async def _upload_session_data(
len(turns),
len(content),
)
return True
except Exception as e:
logger.warning("[SkillHub] session upload failed: %s", e)
return False

def _maybe_upload_session_snapshot(self, session_id: str, turn_num: int) -> None:
interval = max(0, int(getattr(self.config, "sharing_session_upload_interval", 0) or 0))
if not self.config.sharing_enabled or interval <= 0:
return
if turn_num <= 0 or turn_num % interval != 0:
return
turns = copy.deepcopy(self._session_turns.get(session_id, []))
if not turns:
return
self._safe_create_task(self._upload_session_snapshot_and_trigger(session_id, turns))

async def _upload_session_snapshot_and_trigger(self, session_id: str, turns: list[dict]) -> None:
uploaded = await self._upload_session_data(session_id, turns)
if uploaded:
await self._trigger_evolve()

async def _trigger_evolve(self) -> None:
url = str(getattr(self.config, "evolve_server_url", "") or "").strip().rstrip("/")
if not url:
return
try:
import httpx

async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(f"{url}/trigger")
logger.info("[SkillHub] triggered evolve server: %s", url)
except Exception as e:
logger.warning("[SkillHub] evolve trigger failed: %s", e)

# ------------------------------------------------------------------ #
# Skill pull (cloud -> local) #
Expand Down
10 changes: 10 additions & 0 deletions skillclaw/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,22 @@ class SkillClawConfig:
sharing_nacos_username: str = ""
sharing_nacos_password: str = ""
sharing_nacos_label: str = "latest"
sharing_nacos_publish_mode: str = "review"

sharing_group_id: str = "default"
sharing_user_alias: str = ""
sharing_auto_pull_on_start: bool = False
sharing_push_min_injections: int = 5
sharing_push_min_effectiveness: float = 0.3
sharing_session_upload_interval: int = 0
sharing_skill_reload_mode: str = "poll"
sharing_skill_reload_interval_seconds: int = 30

# ------------------------------------------------------------------ #
# Evolve server integration #
# ------------------------------------------------------------------ #
evolve_server_url: str = ""
evolve_proxy_reload_url: str = ""

# ------------------------------------------------------------------ #
# Background validation #
Expand Down
Loading
Loading