Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 210 additions & 8 deletions skillclaw/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,14 +1593,36 @@ async def responses(

body = await request.json()
if owner._responses_native_enabled():
record_body = copy.deepcopy(body)
turn_type = _resolve_turn_type(x_turn_type, body.get("turn_type"), default="main")
body = owner._prepare_native_responses_body(body, turn_type=turn_type)
injected_skills = owner._prepare_native_responses_body_inplace(body, turn_type=turn_type)
_raw_sid = x_session_id or codex_session_id or body.get("session_id") or ""
session_id = _raw_sid or await owner._resolve_tui_session(
body.get("model", owner._served_model),
len(body.get("input", []) if isinstance(body.get("input"), list) else []),
)
session_done = _resolve_session_done(x_session_done, body.get("session_done"))
if bool(body.get("stream", False)):
return StreamingResponse(
owner._stream_llm_responses(body),
owner._stream_and_track_responses(
body,
record_body=record_body,
session_id=session_id,
turn_type=turn_type,
injected_skills=injected_skills,
session_done=session_done,
),
media_type="text/event-stream",
)
response_payload = await owner._forward_to_llm_responses(body)
owner._record_responses_turn(
session_id,
record_body,
response_payload,
turn_type=turn_type,
injected_skills=injected_skills,
session_done=session_done,
)
return JSONResponse(content=response_payload)

previous_response_id = str(body.get("previous_response_id") or "").strip()
Expand Down Expand Up @@ -2517,8 +2539,13 @@ def _prepare_responses_forward(
def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str) -> dict[str, Any]:
"""Apply non-destructive SkillClaw hooks before native Responses forwarding."""
prepared = dict(body)
self._prepare_native_responses_body_inplace(prepared, turn_type=turn_type)
return prepared

def _prepare_native_responses_body_inplace(self, body: dict[str, Any], *, turn_type: str) -> list[str]:
"""Inject skills into a Responses body in-place. Returns injected skill names."""
if not self.skill_manager or turn_type != "main":
return prepared
return []

try:
self.skill_manager.refresh_if_changed()
Expand All @@ -2529,7 +2556,7 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str
max_chars=getattr(self.config, "max_skills_prompt_chars", 30_000),
)
if not skill_text:
return prepared
return []

all_skills = self.skill_manager.get_all_skills()
skill_names = [s.get("name", "unknown_skill") for s in all_skills if isinstance(s, dict)]
Expand All @@ -2540,9 +2567,72 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str
)
self.skill_manager.record_injection(skill_names)

existing = _normalize_responses_content(prepared.get("instructions", ""))
prepared["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text
return prepared
existing = _normalize_responses_content(body.get("instructions", ""))
body["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text
return skill_names

def _record_responses_turn(
self,
session_id: str,
request_body: dict[str, Any],
response_payload: dict[str, Any],
*,
turn_type: str,
injected_skills: list[str],
session_done: bool,
) -> None:
"""Record a Responses API turn into the session tracking system."""
if not session_id:
return
self._touch_session(session_id)
prompt_text = _normalize_responses_content(request_body.get("instructions", ""))
inp = request_body.get("input")
if isinstance(inp, str):
prompt_text = (prompt_text + "\n" + inp).strip() if prompt_text else inp
elif isinstance(inp, list):
user_parts = []
for item in inp:
if isinstance(item, dict) and item.get("role") == "user":
user_parts.append(_normalize_responses_content(item.get("content", "")))
if user_parts:
joined = " ".join(user_parts)
prompt_text = (prompt_text + "\n" + joined).strip() if prompt_text else joined
response_parts = []
for item in response_payload.get("output", []):
if not isinstance(item, dict):
continue
if item.get("type") == "message":
for part in item.get("content", []):
if isinstance(part, dict) and part.get("type") == "output_text":
response_parts.append(part.get("text", ""))
elif item.get("type") == "function_call":
name = item.get("name", "")
args = str(item.get("arguments", ""))[:500]
response_parts.append(f"[tool:{name}] {args}")
response_text = "\n".join(response_parts)
turns = self._session_turns.setdefault(session_id, [])
turn_num = len(turns) + 1
turns.append(
{
"turn_num": turn_num,
"prompt_text": prompt_text[:2000],
"response_text": response_text[:2000],
"injected_skills": injected_skills,
"prm_score": None,
}
)
logger.info(
"[Codex] %s session=%s turn=%d prompt=%d chars response=%d chars skills=%s",
turn_type,
session_id,
turn_num,
len(prompt_text),
len(response_text),
",".join(injected_skills) if injected_skills else "(none)",
)
self._maybe_upload_session_snapshot(session_id, turn_num)
if session_done:
self._safe_create_task(self._close_session(session_id, reason="codex_session_done"))

async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any]:
"""Forward a Codex Responses payload to an upstream Responses API."""
Expand Down Expand Up @@ -2592,6 +2682,118 @@ async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any
logger.error("[OpenClaw] Responses forward failed: %s", e, exc_info=True)
raise HTTPException(status_code=502, detail=f"Responses forward error: {e}") from e

async def _stream_and_track_responses(
self,
body: dict[str, Any],
*,
record_body: dict[str, Any] | None = None,
session_id: str,
turn_type: str,
injected_skills: list[str],
session_done: bool,
):
"""Wrap _stream_llm_responses: passthrough SSE + parse response.completed inline."""
tracked = False
buf = ""
output_items: dict[int, dict[str, Any]] = {}
output_text_parts: dict[tuple[int, int], str] = {}

def ensure_message_item(output_index: int) -> dict[str, Any]:
item = output_items.setdefault(
output_index,
{
"type": "message",
"role": "assistant",
"status": "completed",
"content": [],
},
)
content = item.setdefault("content", [])
if not isinstance(content, list):
item["content"] = []
return item

def apply_output_text(output_index: int, content_index: int, text: str) -> None:
item = ensure_message_item(output_index)
content = item.setdefault("content", [])
while len(content) <= content_index:
content.append({"type": "output_text", "text": "", "annotations": []})
part = content[content_index]
if isinstance(part, dict):
part["type"] = part.get("type") or "output_text"
part["text"] = text
part.setdefault("annotations", [])

def parse_responses_stream_event(data: dict[str, Any]) -> dict[str, Any] | None:
event_type = data.get("type")
output_index = int(data.get("output_index", 0) or 0)
content_index = int(data.get("content_index", 0) or 0)

if event_type == "response.output_item.added":
item = data.get("item")
if isinstance(item, dict):
output_items[output_index] = item
elif event_type == "response.output_item.done":
item = data.get("item")
if isinstance(item, dict):
output_items[output_index] = item
elif event_type == "response.output_text.delta":
key = (output_index, content_index)
output_text_parts[key] = output_text_parts.get(key, "") + str(data.get("delta") or "")
apply_output_text(output_index, content_index, output_text_parts[key])
elif event_type == "response.output_text.done":
text = str(data.get("text") or output_text_parts.get((output_index, content_index), ""))
output_text_parts[(output_index, content_index)] = text
apply_output_text(output_index, content_index, text)
elif event_type == "response.content_part.done":
part = data.get("part")
if isinstance(part, dict) and part.get("type") == "output_text":
text = str(part.get("text") or output_text_parts.get((output_index, content_index), ""))
output_text_parts[(output_index, content_index)] = text
apply_output_text(output_index, content_index, text)
elif event_type == "response.completed":
response_payload = data.get("response") if isinstance(data.get("response"), dict) else dict(data)
if output_items and not response_payload.get("output"):
response_payload = {
**response_payload,
"output": [item for _, item in sorted(output_items.items())],
}
return response_payload
return None

async for chunk in self._stream_llm_responses(body):
if not tracked:
try:
text = chunk.decode("utf-8", errors="ignore") if isinstance(chunk, bytes) else chunk
buf += text
while "\n" in buf:
line, buf = buf.split("\n", 1)
stripped = line.strip()
if not stripped.startswith("data: "):
continue
raw = stripped[6:]
if raw == "[DONE]":
continue
try:
data = json.loads(raw)
except Exception:
continue
response_payload = parse_responses_stream_event(data) if isinstance(data, dict) else None
if response_payload is not None:
self._record_responses_turn(
session_id,
record_body or body,
response_payload,
turn_type=turn_type,
injected_skills=injected_skills,
session_done=session_done,
)
tracked = True
break
except Exception:
pass
yield chunk

async def _stream_llm_responses(self, body: dict[str, Any]):
"""Passthrough upstream Responses SSE without aggregating or rewriting events."""
import httpx
Expand All @@ -2601,7 +2803,7 @@ async def _stream_llm_responses(self, body: dict[str, Any]):
async with httpx.AsyncClient(timeout=_llm_request_timeout_seconds()) as client:
async with client.stream("POST", url, json=send_body, headers=headers) as resp:
resp.raise_for_status()
async for chunk in resp.aiter_raw():
async for chunk in resp.aiter_bytes():
if chunk:
yield chunk
except httpx.HTTPStatusError as e:
Expand Down
34 changes: 25 additions & 9 deletions skillclaw/claw_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
_HERMES_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "hermes"
_CODEX_HOME = Path.home() / ".codex"
_CODEX_CONFIG_PATH = _CODEX_HOME / "config.toml"
_CODEX_PROFILE_CONFIG_PATH = _CODEX_HOME / "skillclaw.config.toml"
_CODEX_SKILLS_DIR = _CODEX_HOME / "skills"
_CODEX_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "codex"
_CLAUDE_HOME = Path.home() / ".claude"
Expand Down Expand Up @@ -656,7 +657,6 @@ def _build_codex_provider_block(base_url: str, api_key: str) -> str:

def _build_codex_profile_block(model_id: str) -> str:
lines = [
"[profiles.skillclaw]",
f"model = {_format_toml_value(model_id)}",
'model_provider = "skillclaw"',
]
Expand All @@ -673,6 +673,7 @@ def _configure_codex(cfg: "SkillClawConfig") -> None:
api_key = cfg.proxy_api_key or "skillclaw"
base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1"
config_path = _CODEX_CONFIG_PATH
profile_config_path = _CODEX_PROFILE_CONFIG_PATH
_prepare_external_skills_dir(_CODEX_SKILLS_DIR, "Codex")

existing_text = ""
Expand All @@ -687,16 +688,17 @@ def _configure_codex(cfg: "SkillClawConfig") -> None:
updated = _remove_top_level_toml_keys(updated, {"model", "model_provider"})
updated = _remove_toml_table(updated, "model_providers.skillclaw").rstrip() + "\n\n"
updated = _remove_toml_table(updated, "profiles.skillclaw").rstrip() + "\n\n"
updated += _build_codex_provider_block(base_url, api_key)
updated += "\n" + _build_codex_profile_block(model_id)
profile_text = _build_codex_profile_block(model_id) + "\n" + _build_codex_provider_block(base_url, api_key)

_backup_codex_config_if_changed(config_path, updated)
_write_text_atomic(config_path, updated, "Codex config")
_write_text_atomic(profile_config_path, profile_text, "Codex SkillClaw profile config")


def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]:
"""Return a diagnostic snapshot of the local Codex integration state."""
config_path = _CODEX_CONFIG_PATH
profile_config_path = _CODEX_PROFILE_CONFIG_PATH
expected_model = cfg.served_model_name or cfg.llm_model_id or "skillclaw-model"
expected_base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1"
expected_api_key = cfg.proxy_api_key or "skillclaw"
Expand All @@ -711,16 +713,21 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]:
text = config_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning("[ClawAdapter] Failed to read Codex config %s: %s", config_path, e)
profile_text = ""
if profile_config_path.exists():
try:
profile_text = profile_config_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning("[ClawAdapter] Failed to read Codex profile config %s: %s", profile_config_path, e)

configured_model = str(_extract_top_level_toml_value(text, "model") or "")
configured_provider = str(_extract_top_level_toml_value(text, "model_provider") or "")
provider_cfg = _extract_toml_table(text, "model_providers.skillclaw")
provider_cfg = _extract_toml_table(profile_text, "model_providers.skillclaw")
configured_base_url = str(provider_cfg.get("base_url") or "")
configured_wire_api = str(provider_cfg.get("wire_api") or "")
configured_token = str(provider_cfg.get("experimental_bearer_token") or "")
profile_cfg = _extract_toml_table(text, "profiles.skillclaw")
configured_profile_model = str(profile_cfg.get("model") or "")
configured_profile_provider = str(profile_cfg.get("model_provider") or "")
configured_profile_model = str(_extract_top_level_toml_value(profile_text, "model") or "")
configured_profile_provider = str(_extract_top_level_toml_value(profile_text, "model_provider") or "")

proxy_match = (
configured_profile_model == expected_model
Expand All @@ -743,9 +750,13 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]:

if not config_path.exists():
issues.append("Codex config is missing: ~/.codex/config.toml")
if not profile_config_path.exists():
issues.append("Codex SkillClaw profile config is missing: ~/.codex/skillclaw.config.toml")
if not proxy_match:
issues.append("Codex SkillClaw profile is missing or not pointing at the local SkillClaw proxy.")
next_steps.append("Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/config.toml.")
next_steps.append(
"Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/skillclaw.config.toml."
)
if configured_provider == "skillclaw":
issues.append("Codex global model_provider still points at SkillClaw; normal Codex runs may be intercepted.")
next_steps.append('Remove top-level `model_provider = "skillclaw"` or run `skillclaw restore codex`.')
Expand Down Expand Up @@ -797,7 +808,12 @@ def restore_codex_config(backup_path: Path | None = None) -> dict[str, str]:
text = source.read_text(encoding="utf-8")
target = _CODEX_CONFIG_PATH
_write_text_atomic(target, text, "Codex config restore")
return {"source": str(source), "target": str(target)}
profile_target = _CODEX_PROFILE_CONFIG_PATH
removed_profile = False
if profile_target.exists():
profile_target.unlink()
removed_profile = True
return {"source": str(source), "target": str(target), "removed_profile": str(removed_profile)}


# ------------------------------------------------------------------ #
Expand Down
2 changes: 2 additions & 0 deletions skillclaw/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ def restore_codex(backup_path: str | None):
raise click.ClickException(str(exc)) from None

click.echo(f"Restored Codex config: {result['target']} <- {result['source']}")
if result.get("removed_profile") == "True":
click.echo("Removed Codex SkillClaw profile config: ~/.codex/skillclaw.config.toml")


@restore.command(name="claude")
Expand Down
Loading
Loading