Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ skillclaw skills pull

If your team uses a mounted local shared directory instead of OSS/S3, use `sharing.backend local` plus `sharing.local_root /path/to/shared/root` instead of the remote storage keys.

To store skill assets in Nacos while keeping session and validation artifacts in the existing shared storage, set a
skill backend override:

```bash
skillclaw config sharing.backend oss
skillclaw config sharing.skill_backend nacos
skillclaw config sharing.nacos_server http://nacos.example.com
```

When `sharing.skill_backend` is empty, SkillClaw keeps the legacy behavior and uses `sharing.backend` for skill assets.

When you join a shared group:

- you still run only the local client proxy on your machine
Expand Down
17 changes: 16 additions & 1 deletion evolve_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def _build_config_from_args(args: argparse.Namespace) -> EvolveServerConfig:
if not config.storage_backend:
if args.oss_endpoint or args.oss_bucket:
config.storage_backend = "oss"
elif config.storage_bucket or config.storage_endpoint:
elif config.storage_bucket or (
config.storage_endpoint
and str(getattr(config, "skill_storage_backend", "") or "").strip().lower() != "nacos"
):
config.storage_backend = "s3"
if args.group_id:
config.group_id = args.group_id
Expand Down Expand Up @@ -240,6 +243,18 @@ def main() -> None:
"or use --use-skillclaw-config."
)
raise SystemExit(1)
elif not backend:
if str(getattr(config, "skill_storage_backend", "") or "").strip().lower() == "nacos":
logger.error(
"sharing.skill_backend=nacos stores skill assets only. Configure session storage with "
"sharing.backend, sharing.session_backend, sharing.local_root, EVOLVE_STORAGE_*, or use --mock."
)
raise SystemExit(1)
logger.error(
"Storage backend is not configured. Set EVOLVE_STORAGE_BACKEND, use --use-skillclaw-config, "
"use --local-root for local mode, or use --mock."
)
raise SystemExit(1)
else:
if not config.storage_bucket:
logger.error(
Expand Down
41 changes: 21 additions & 20 deletions evolve_server/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,12 @@ def from_skillclaw_config(cls, config) -> "EvolveServerConfig":
"""Build from an existing ``SkillClawConfig`` (reuse sharing + LLM settings)."""
engine = _first_env("EVOLVE_ENGINE", default="workflow").strip().lower() or "workflow"
sharing_backend = str(getattr(config, "sharing_backend", "") or "").strip().lower()
skill_backend = str(getattr(config, "sharing_skill_backend", "") or "").strip().lower() or sharing_backend
session_backend = str(getattr(config, "sharing_session_backend", "") or "").strip().lower()
storage_endpoint = str(
sharing_endpoint = str(
getattr(config, "sharing_endpoint", "") or getattr(config, "sharing_oss_endpoint", "") or ""
)
storage_endpoint = "" if sharing_backend == "nacos" and not session_backend else sharing_endpoint
storage_bucket = str(getattr(config, "sharing_bucket", "") or getattr(config, "sharing_oss_bucket", "") or "")
storage_access_key_id = str(
getattr(config, "sharing_access_key_id", "") or getattr(config, "sharing_oss_access_key_id", "") or ""
Expand Down Expand Up @@ -297,25 +299,24 @@ def from_skillclaw_config(cls, config) -> "EvolveServerConfig":
default="openai-completions",
)

storage_backend = _first_env("EVOLVE_STORAGE_BACKEND", default="")
if not storage_backend:
if session_backend:
storage_backend = session_backend
elif local_root:
storage_backend = "local"
elif sharing_backend and sharing_backend != "nacos":
storage_backend = sharing_backend
elif (storage_bucket or storage_endpoint) and sharing_backend != "nacos":
storage_backend = "oss" if "aliyuncs.com" in storage_endpoint else "s3"

nacos_server = str(getattr(config, "sharing_nacos_server", "") or "")
if not nacos_server and sharing_backend == "nacos" and skill_backend == "nacos":
nacos_server = sharing_endpoint

return cls(
engine=engine,
storage_backend=_first_env("EVOLVE_STORAGE_BACKEND", default="")
or (
session_backend
if session_backend
else "local"
if local_root
else "s3"
if (storage_bucket or storage_endpoint) and sharing_backend != "nacos"
else ""
)
or (
"local"
if local_root
else "oss"
if sharing_backend != "nacos"
else ""
),
storage_backend=storage_backend,
storage_endpoint=storage_endpoint,
storage_bucket=storage_bucket,
storage_access_key_id=storage_access_key_id,
Expand Down Expand Up @@ -344,8 +345,8 @@ def from_skillclaw_config(cls, config) -> "EvolveServerConfig":
validation_required_approvals=int(os.environ.get("EVOLVE_VALIDATION_REQUIRED_APPROVALS", "1")),
validation_min_mean_score=float(os.environ.get("EVOLVE_VALIDATION_MIN_MEAN_SCORE", "0.75")),
validation_max_rejections=int(os.environ.get("EVOLVE_VALIDATION_MAX_REJECTIONS", "1")),
skill_storage_backend="nacos" if sharing_backend == "nacos" else "",
nacos_server=str(getattr(config, "sharing_nacos_server", "") or storage_endpoint),
skill_storage_backend="nacos" if skill_backend == "nacos" else "",
nacos_server=nacos_server,
nacos_namespace_id=str(getattr(config, "sharing_nacos_namespace_id", "") or "public"),
nacos_access_token=str(getattr(config, "sharing_nacos_access_token", "") or ""),
nacos_username=str(getattr(config, "sharing_nacos_username", "") or ""),
Expand Down
8 changes: 8 additions & 0 deletions evolve_server/engines/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ def _build_bucket(
"""Create the object-store adapter for an engine."""
if mock:
return LocalBucket(root=mock_root)
if (
str(getattr(config, "skill_storage_backend", "") or "").strip().lower() == "nacos"
and not str(getattr(config, "storage_backend", "") or "").strip()
):
raise ValueError(
"sharing.skill_backend=nacos stores skill assets only. Configure session storage with "
"sharing.backend, sharing.session_backend, sharing.local_root, EVOLVE_STORAGE_*, or use --mock."
)
return build_object_store(
backend=config.storage_backend,
endpoint=config.storage_endpoint,
Expand Down
124 changes: 92 additions & 32 deletions evolve_server/engines/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,36 @@ def _overlay_manifest_metadata(
def _fetch_skill(self, name: str) -> Optional[str]:
if self._nacos_skill_client is not None:
try:
from skillclaw.nacos_skill_hub import _nacos_zip_to_bundle
from skillclaw.nacos_skill_hub import (
_nacos_published_version,
_nacos_working_version,
_nacos_zip_to_bundle,
)

record = self._load_remote_skill_record(name) or {}
labels = record.get("labels") if isinstance(record.get("labels"), dict) else {}
version = labels.get(str(getattr(self.config, "nacos_label", "") or "latest"))
zip_bytes = self._nacos_skill_client.download_skill_zip(
name,
version=version,
label=str(getattr(self.config, "nacos_label", "") or "latest"),
)
try:
detail = self._nacos_skill_client.get_skill(name) if record else {}
except Exception:
detail = {}
working = _nacos_working_version(record, detail)
if working:
_status, version = working
zip_bytes = self._nacos_skill_client.download_skill_zip(name, version=version, admin=True)
else:
label = str(getattr(self.config, "nacos_label", "") or "latest")
version = _nacos_published_version(record, detail, label=label)
if not version:
logger.info(
"[EvolveServer] Nacos skill %s has no published %s version",
name,
label,
)
return None
zip_bytes = self._nacos_skill_client.download_skill_zip(
name,
version=version,
label=label,
)
bundle = _nacos_zip_to_bundle(zip_bytes)
data = bundle.get("SKILL.md")
return data.decode("utf-8") if data is not None else None
Expand All @@ -157,23 +177,60 @@ def _fetch_skill(self, name: str) -> Optional[str]:
return None
return fetch_skill_content(self._bucket, self._prefix, name)

def _upload_skill(self, skill: dict, action: str) -> None:
def _upload_skill(self, skill: dict, action: str) -> str:
name = skill.get("name", "")
if not name:
return
return "skipped_missing_name"

if self._nacos_skill_client is not None:
from skillclaw.nacos_skill_hub import _bundle_to_nacos_zip, _next_version
from skillclaw.nacos_skill_hub import (
_bundle_matches_remote,
_bundle_to_nacos_zip,
_nacos_working_version,
_nacos_zip_to_bundle,
_next_version,
)

md_content = build_skill_md(skill)
md_bytes = md_content.encode("utf-8")
bundle_files = {"SKILL.md": md_bytes}
record = self._load_remote_skill_record(name) or {}
try:
detail = self._nacos_skill_client.get_skill(name) if record else {}
except Exception:
detail = {}
working = _nacos_working_version(record, detail)
if working:
status, version = working
try:
zip_bytes = self._nacos_skill_client.download_skill_zip(name, version=version, admin=True)
remote_bundle = _nacos_zip_to_bundle(zip_bytes)
except Exception as exc:
logger.warning(
"[EvolveServer] skipping Nacos skill %s: failed to inspect %s version %s: %s",
name,
status,
version,
exc,
)
return f"skipped_existing_{status}"
if _bundle_matches_remote(bundle_files, remote_bundle):
logger.info(
"[EvolveServer] skipped Nacos skill %s: %s version %s already matches",
name,
status,
version,
)
return f"skipped_existing_{status}"
logger.info(
"[EvolveServer] skipped Nacos skill %s: %s version %s already exists",
name,
status,
version,
)
return f"skipped_existing_{status}"
target_version = _next_version(record, detail)
zip_bytes = _bundle_to_nacos_zip(name, {"SKILL.md": md_bytes})
zip_bytes = _bundle_to_nacos_zip(name, bundle_files)
self._nacos_skill_client.upload_skill_zip(
zip_bytes=zip_bytes,
filename=f"{name}-{target_version}.zip",
Expand All @@ -187,7 +244,7 @@ def _upload_skill(self, skill: dict, action: str) -> None:
target_version,
action,
)
return
return "uploaded"

skill_id = self._id_registry.get_or_create(name)
md_content = build_skill_md(skill)
Expand Down Expand Up @@ -234,6 +291,7 @@ def _upload_skill(self, skill: dict, action: str) -> None:
version,
object_key,
)
return "uploaded"

def _detect_conflict(self, name: str, incoming_skill: dict) -> bool:
if self._nacos_skill_client is not None:
Expand All @@ -251,18 +309,18 @@ def _detect_conflict(self, name: str, incoming_skill: dict) -> bool:
incoming_sha = hashlib.sha256(incoming_md.encode("utf-8")).hexdigest()
return existing_sha != incoming_sha

async def _resolve_and_upload(self, skill: dict, action_type: str) -> str:
async def _resolve_and_upload(self, skill: dict, action_type: str) -> tuple[str, bool]:
name = skill.get("name", "")
has_conflict = await self._call_storage(self._detect_conflict, name, skill)
if not has_conflict:
await self._call_storage(self._upload_skill, skill, action_type)
return action_type
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)

logger.info("[EvolveServer] conflict detected for '%s' - merging", name)
existing_md = await self._call_storage(self._fetch_skill, name)
if not existing_md:
await self._call_storage(self._upload_skill, skill, action_type)
return action_type
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)

existing_skill = parse_skill_content(name, existing_md)
existing_skill = self._overlay_manifest_metadata(
Expand All @@ -273,12 +331,12 @@ async def _resolve_and_upload(self, skill: dict, action_type: str) -> str:
merged = await execute_merge(self._llm, existing_skill, skill)
if merged and merged.get("name"):
merged["name"] = name
await self._call_storage(self._upload_skill, merged, "merge")
return "merge"
upload_status = await self._call_storage(self._upload_skill, merged, "merge")
return ("merge", True) if upload_status == "uploaded" else (upload_status, False)

logger.warning("[EvolveServer] merge failed for '%s' - keeping incoming version", name)
await self._call_storage(self._upload_skill, skill, action_type)
return action_type
upload_status = await self._call_storage(self._upload_skill, skill, action_type)
return (action_type, True) if upload_status == "uploaded" else (upload_status, False)

def _empty_judge_summary(self) -> dict[str, Any]:
return {
Expand Down Expand Up @@ -354,6 +412,7 @@ def _empty_validation_publish_summary(self) -> dict[str, Any]:
"pending": 0,
"published": 0,
"rejected": 0,
"skipped": 0,
}

def _build_validation_evidence(self, sessions: list[dict[str, Any]]) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -518,30 +577,33 @@ async def _finalize_validation_jobs(self) -> tuple[list[dict[str, Any]], dict[st
summary["rejected"] += 1
continue
action_type = str(job.get("proposed_action", DecisionAction.CREATE) or DecisionAction.CREATE)
actual_action = await self._resolve_and_upload(candidate_skill, action_type)
actual_action, uploaded = await self._resolve_and_upload(candidate_skill, action_type)
self._validation_store.save_decision(
job_id,
{
"status": "published",
"status": "published" if uploaded else "skipped",
"published_action": actual_action,
"result_count": len(results),
"accepted_count": accepted,
"rejected_count": rejected,
"mean_score": mean_score,
},
)
summary["published"] += 1
if uploaded:
summary["published"] += 1
else:
summary["skipped"] += 1
records.append(
{
"action": "published_after_validation",
"action": "published_after_validation" if uploaded else actual_action,
"published_action": actual_action,
"skill_name": str(candidate_skill.get("name", "")),
"skill_id": self._id_registry.get_or_create(str(candidate_skill.get("name", ""))),
"version": self._id_registry.get_version(str(candidate_skill.get("name", ""))),
"session_ids": list(job.get("session_ids") or []),
"rationale": str(job.get("rationale", "") or ""),
"source": "validation_publish",
"uploaded": True,
"uploaded": uploaded,
"validation_job_id": job_id,
"validation_results": {
"result_count": len(results),
Expand Down Expand Up @@ -688,7 +750,7 @@ async def _materialize_skill(
record["verification"] = verification
return record

actual_action = await self._resolve_and_upload(evolved_skill, action_type)
actual_action, uploaded = await self._resolve_and_upload(evolved_skill, action_type)
logger.info(
"[EvolveServer] %s skill '%s' (id=%s, v%d)",
actual_action,
Expand All @@ -705,7 +767,7 @@ async def _materialize_skill(
"rationale": rationale,
"source": source,
"edit_summary": evolved_skill.get("edit_summary"),
"uploaded": True,
"uploaded": uploaded,
"verification": verification,
}

Expand Down Expand Up @@ -896,9 +958,7 @@ async def trigger_evolve():
@app.get("/status")
async def status():
entries = (
self._load_remote_skills()
if self._uses_nacos_skill_registry()
else self._id_registry.all_entries()
self._load_remote_skills() if self._uses_nacos_skill_registry() else self._id_registry.all_entries()
)
pending_keys = await self._call_storage(list_session_keys, self._bucket, self._prefix)
return JSONResponse(
Expand Down
3 changes: 2 additions & 1 deletion scripts/demo_nacos_skill_lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python
# ruff: noqa: I001
"""Demo SkillClaw's Nacos-backed skill lifecycle without a real Nacos server.

The demo uses httpx.MockTransport to show the exact lifecycle SkillClaw now
Expand All @@ -13,8 +14,8 @@
from __future__ import annotations

import tempfile
from pathlib import Path
import sys
from pathlib import Path

import httpx

Expand Down
Loading
Loading