From 10fca4ae41d9759c726f47339b120a45a6910043 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 12:42:45 +0800 Subject: [PATCH 1/3] fix: add session tracking for native Responses API path The native Responses API path (_responses_native_enabled) bypassed _handle_request entirely, skipping session turn recording, skill injection tracking, and snapshot uploads. Codex sessions through the proxy were invisible to the evolve pipeline. Adds _record_responses_turn to capture prompt/response text and injected_skills for Responses API requests, enabling the full session -> evolve -> skill generation loop for Codex users. --- skillclaw/api_server.py | 136 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 7 deletions(-) diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index 8655d99..00b144e 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -1593,14 +1593,34 @@ async def responses( body = await request.json() if owner._responses_native_enabled(): + record_body = copy.deepcopy(body) turn_type = _resolve_turn_type(x_turn_type, body.get("turn_type"), default="main") - body = owner._prepare_native_responses_body(body, turn_type=turn_type) + injected_skills = owner._prepare_native_responses_body_inplace(body, turn_type=turn_type) + _raw_sid = x_session_id or codex_session_id or body.get("session_id") or "" + session_id = _raw_sid or await owner._resolve_tui_session( + body.get("model", owner._served_model), + len(body.get("input", []) if isinstance(body.get("input"), list) else []), + ) + session_done = _resolve_session_done(x_session_done, body.get("session_done")) if bool(body.get("stream", False)): return StreamingResponse( - owner._stream_llm_responses(body), + owner._stream_and_track_responses( + body, + record_body=record_body, + session_id=session_id, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ), media_type="text/event-stream", ) response_payload = await owner._forward_to_llm_responses(body) + owner._record_responses_turn( + session_id, record_body, response_payload, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ) return JSONResponse(content=response_payload) previous_response_id = str(body.get("previous_response_id") or "").strip() @@ -2517,8 +2537,13 @@ def _prepare_responses_forward( def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str) -> dict[str, Any]: """Apply non-destructive SkillClaw hooks before native Responses forwarding.""" prepared = dict(body) + self._prepare_native_responses_body_inplace(prepared, turn_type=turn_type) + return prepared + + def _prepare_native_responses_body_inplace(self, body: dict[str, Any], *, turn_type: str) -> list[str]: + """Inject skills into a Responses body in-place. Returns injected skill names.""" if not self.skill_manager or turn_type != "main": - return prepared + return [] try: self.skill_manager.refresh_if_changed() @@ -2529,7 +2554,7 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str max_chars=getattr(self.config, "max_skills_prompt_chars", 30_000), ) if not skill_text: - return prepared + return [] all_skills = self.skill_manager.get_all_skills() skill_names = [s.get("name", "unknown_skill") for s in all_skills if isinstance(s, dict)] @@ -2540,9 +2565,60 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str ) self.skill_manager.record_injection(skill_names) - existing = _normalize_responses_content(prepared.get("instructions", "")) - prepared["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text - return prepared + existing = _normalize_responses_content(body.get("instructions", "")) + body["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text + return skill_names + + def _record_responses_turn( + self, + session_id: str, + request_body: dict[str, Any], + response_payload: dict[str, Any], + *, + turn_type: str, + injected_skills: list[str], + session_done: bool, + ) -> None: + """Record a Responses API turn into the session tracking system.""" + if not session_id: + return + self._touch_session(session_id) + prompt_text = _normalize_responses_content(request_body.get("instructions", "")) + inp = request_body.get("input") + if isinstance(inp, str): + prompt_text = (prompt_text + "\n" + inp).strip() if prompt_text else inp + elif isinstance(inp, list): + user_parts = [] + for item in inp: + if isinstance(item, dict) and item.get("role") == "user": + user_parts.append(_normalize_responses_content(item.get("content", ""))) + if user_parts: + joined = " ".join(user_parts) + prompt_text = (prompt_text + "\n" + joined).strip() if prompt_text else joined + response_text = "" + for item in response_payload.get("output", []): + if isinstance(item, dict) and item.get("type") == "message": + for part in item.get("content", []): + if isinstance(part, dict) and part.get("type") == "output_text": + response_text += part.get("text", "") + turns = self._session_turns.setdefault(session_id, []) + turn_num = len(turns) + 1 + turns.append({ + "turn_num": turn_num, + "prompt_text": prompt_text[:2000], + "response_text": response_text[:2000], + "injected_skills": injected_skills, + "prm_score": None, + }) + logger.info( + "[Codex] %s session=%s turn=%d prompt=%d chars response=%d chars skills=%s", + turn_type, session_id, turn_num, + len(prompt_text), len(response_text), + ",".join(injected_skills) if injected_skills else "(none)", + ) + self._maybe_upload_session_snapshot(session_id, turn_num) + if session_done: + self._safe_create_task(self._close_session(session_id, reason="codex_session_done")) async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any]: """Forward a Codex Responses payload to an upstream Responses API.""" @@ -2592,6 +2668,52 @@ async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any logger.error("[OpenClaw] Responses forward failed: %s", e, exc_info=True) raise HTTPException(status_code=502, detail=f"Responses forward error: {e}") from e + async def _stream_and_track_responses( + self, + body: dict[str, Any], + *, + record_body: dict[str, Any] | None = None, + session_id: str, + turn_type: str, + injected_skills: list[str], + session_done: bool, + ): + """Wrap _stream_llm_responses: passthrough SSE + extract response.completed for tracking.""" + completed_payload: dict[str, Any] | None = None + buf = "" + async for chunk in self._stream_llm_responses(body): + yield chunk + if completed_payload is None: + try: + text = chunk.decode("utf-8", errors="ignore") if isinstance(chunk, bytes) else chunk + buf += text + except Exception: + pass + if buf and session_id: + for line in buf.split("\n"): + stripped = line.strip() + if not stripped.startswith("data: "): + continue + raw = stripped[6:] + if raw == "[DONE]": + continue + try: + data = json.loads(raw) + if isinstance(data, dict) and data.get("type") == "response.completed": + completed_payload = data.get("response", data) + break + except Exception: + continue + if completed_payload and session_id: + self._record_responses_turn( + session_id, record_body or body, completed_payload, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ) + elif session_id: + logger.warning("[Codex] stream ended without response.completed event") + async def _stream_llm_responses(self, body: dict[str, Any]): """Passthrough upstream Responses SSE without aggregating or rewriting events.""" import httpx From c640389e671a3c026cc40cffc113fa661352d7ec Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 14:38:08 +0800 Subject: [PATCH 2/3] fix: track native Responses streams for Codex Parse native Responses streaming events before forwarding each chunk so session tracking is not lost if the client closes after response.completed. Reconstruct assistant output from response.output_text and response.output_item events because real Responses streams may omit output from the final response.completed payload. Update Codex integration to use the current split profile file at ~/.codex/skillclaw.config.toml while keeping ~/.codex/config.toml free of legacy [profiles.skillclaw] tables. --- skillclaw/api_server.py | 135 +++++++++--- skillclaw/claw_adapter.py | 32 ++- skillclaw/cli.py | 2 + tests/test_codex_profile_integration.py | 97 ++++++++- tests/test_responses_native.py | 275 +++++++++++++++++++++++- 5 files changed, 490 insertions(+), 51 deletions(-) diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index 00b144e..adff2e1 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -2595,12 +2595,19 @@ def _record_responses_turn( if user_parts: joined = " ".join(user_parts) prompt_text = (prompt_text + "\n" + joined).strip() if prompt_text else joined - response_text = "" + response_parts = [] for item in response_payload.get("output", []): - if isinstance(item, dict) and item.get("type") == "message": + if not isinstance(item, dict): + continue + if item.get("type") == "message": for part in item.get("content", []): if isinstance(part, dict) and part.get("type") == "output_text": - response_text += part.get("text", "") + response_parts.append(part.get("text", "")) + elif item.get("type") == "function_call": + name = item.get("name", "") + args = str(item.get("arguments", ""))[:500] + response_parts.append(f"[tool:{name}] {args}") + response_text = "\n".join(response_parts) turns = self._session_turns.setdefault(session_id, []) turn_num = len(turns) + 1 turns.append({ @@ -2678,41 +2685,105 @@ async def _stream_and_track_responses( injected_skills: list[str], session_done: bool, ): - """Wrap _stream_llm_responses: passthrough SSE + extract response.completed for tracking.""" - completed_payload: dict[str, Any] | None = None + """Wrap _stream_llm_responses: passthrough SSE + parse response.completed inline.""" + tracked = False buf = "" + output_items: dict[int, dict[str, Any]] = {} + output_text_parts: dict[tuple[int, int], str] = {} + + def ensure_message_item(output_index: int) -> dict[str, Any]: + item = output_items.setdefault( + output_index, + { + "type": "message", + "role": "assistant", + "status": "completed", + "content": [], + }, + ) + content = item.setdefault("content", []) + if not isinstance(content, list): + item["content"] = [] + return item + + def apply_output_text(output_index: int, content_index: int, text: str) -> None: + item = ensure_message_item(output_index) + content = item.setdefault("content", []) + while len(content) <= content_index: + content.append({"type": "output_text", "text": "", "annotations": []}) + part = content[content_index] + if isinstance(part, dict): + part["type"] = part.get("type") or "output_text" + part["text"] = text + part.setdefault("annotations", []) + + def parse_responses_stream_event(data: dict[str, Any]) -> dict[str, Any] | None: + event_type = data.get("type") + output_index = int(data.get("output_index", 0) or 0) + content_index = int(data.get("content_index", 0) or 0) + + if event_type == "response.output_item.added": + item = data.get("item") + if isinstance(item, dict): + output_items[output_index] = item + elif event_type == "response.output_item.done": + item = data.get("item") + if isinstance(item, dict): + output_items[output_index] = item + elif event_type == "response.output_text.delta": + key = (output_index, content_index) + output_text_parts[key] = output_text_parts.get(key, "") + str(data.get("delta") or "") + apply_output_text(output_index, content_index, output_text_parts[key]) + elif event_type == "response.output_text.done": + text = str(data.get("text") or output_text_parts.get((output_index, content_index), "")) + output_text_parts[(output_index, content_index)] = text + apply_output_text(output_index, content_index, text) + elif event_type == "response.content_part.done": + part = data.get("part") + if isinstance(part, dict) and part.get("type") == "output_text": + text = str(part.get("text") or output_text_parts.get((output_index, content_index), "")) + output_text_parts[(output_index, content_index)] = text + apply_output_text(output_index, content_index, text) + elif event_type == "response.completed": + response_payload = data.get("response") if isinstance(data.get("response"), dict) else dict(data) + if output_items and not response_payload.get("output"): + response_payload = { + **response_payload, + "output": [item for _, item in sorted(output_items.items())], + } + return response_payload + return None + async for chunk in self._stream_llm_responses(body): - yield chunk - if completed_payload is None: + if not tracked: try: text = chunk.decode("utf-8", errors="ignore") if isinstance(chunk, bytes) else chunk buf += text + while "\n" in buf: + line, buf = buf.split("\n", 1) + stripped = line.strip() + if not stripped.startswith("data: "): + continue + raw = stripped[6:] + if raw == "[DONE]": + continue + try: + data = json.loads(raw) + except Exception: + continue + response_payload = parse_responses_stream_event(data) if isinstance(data, dict) else None + if response_payload is not None: + self._record_responses_turn( + session_id, record_body or body, response_payload, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ) + tracked = True + break except Exception: pass - if buf and session_id: - for line in buf.split("\n"): - stripped = line.strip() - if not stripped.startswith("data: "): - continue - raw = stripped[6:] - if raw == "[DONE]": - continue - try: - data = json.loads(raw) - if isinstance(data, dict) and data.get("type") == "response.completed": - completed_payload = data.get("response", data) - break - except Exception: - continue - if completed_payload and session_id: - self._record_responses_turn( - session_id, record_body or body, completed_payload, - turn_type=turn_type, - injected_skills=injected_skills, - session_done=session_done, - ) - elif session_id: - logger.warning("[Codex] stream ended without response.completed event") + yield chunk async def _stream_llm_responses(self, body: dict[str, Any]): """Passthrough upstream Responses SSE without aggregating or rewriting events.""" @@ -2723,7 +2794,7 @@ async def _stream_llm_responses(self, body: dict[str, Any]): async with httpx.AsyncClient(timeout=_llm_request_timeout_seconds()) as client: async with client.stream("POST", url, json=send_body, headers=headers) as resp: resp.raise_for_status() - async for chunk in resp.aiter_raw(): + async for chunk in resp.aiter_bytes(): if chunk: yield chunk except httpx.HTTPStatusError as e: diff --git a/skillclaw/claw_adapter.py b/skillclaw/claw_adapter.py index b0636a1..b8d4e4d 100644 --- a/skillclaw/claw_adapter.py +++ b/skillclaw/claw_adapter.py @@ -45,6 +45,7 @@ _HERMES_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "hermes" _CODEX_HOME = Path.home() / ".codex" _CODEX_CONFIG_PATH = _CODEX_HOME / "config.toml" +_CODEX_PROFILE_CONFIG_PATH = _CODEX_HOME / "skillclaw.config.toml" _CODEX_SKILLS_DIR = _CODEX_HOME / "skills" _CODEX_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "codex" _CLAUDE_HOME = Path.home() / ".claude" @@ -656,7 +657,6 @@ def _build_codex_provider_block(base_url: str, api_key: str) -> str: def _build_codex_profile_block(model_id: str) -> str: lines = [ - "[profiles.skillclaw]", f"model = {_format_toml_value(model_id)}", 'model_provider = "skillclaw"', ] @@ -673,6 +673,7 @@ def _configure_codex(cfg: "SkillClawConfig") -> None: api_key = cfg.proxy_api_key or "skillclaw" base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1" config_path = _CODEX_CONFIG_PATH + profile_config_path = _CODEX_PROFILE_CONFIG_PATH _prepare_external_skills_dir(_CODEX_SKILLS_DIR, "Codex") existing_text = "" @@ -687,16 +688,17 @@ def _configure_codex(cfg: "SkillClawConfig") -> None: updated = _remove_top_level_toml_keys(updated, {"model", "model_provider"}) updated = _remove_toml_table(updated, "model_providers.skillclaw").rstrip() + "\n\n" updated = _remove_toml_table(updated, "profiles.skillclaw").rstrip() + "\n\n" - updated += _build_codex_provider_block(base_url, api_key) - updated += "\n" + _build_codex_profile_block(model_id) + profile_text = _build_codex_profile_block(model_id) + "\n" + _build_codex_provider_block(base_url, api_key) _backup_codex_config_if_changed(config_path, updated) _write_text_atomic(config_path, updated, "Codex config") + _write_text_atomic(profile_config_path, profile_text, "Codex SkillClaw profile config") def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: """Return a diagnostic snapshot of the local Codex integration state.""" config_path = _CODEX_CONFIG_PATH + profile_config_path = _CODEX_PROFILE_CONFIG_PATH expected_model = cfg.served_model_name or cfg.llm_model_id or "skillclaw-model" expected_base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1" expected_api_key = cfg.proxy_api_key or "skillclaw" @@ -711,16 +713,21 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: text = config_path.read_text(encoding="utf-8") except Exception as e: logger.warning("[ClawAdapter] Failed to read Codex config %s: %s", config_path, e) + profile_text = "" + if profile_config_path.exists(): + try: + profile_text = profile_config_path.read_text(encoding="utf-8") + except Exception as e: + logger.warning("[ClawAdapter] Failed to read Codex profile config %s: %s", profile_config_path, e) configured_model = str(_extract_top_level_toml_value(text, "model") or "") configured_provider = str(_extract_top_level_toml_value(text, "model_provider") or "") - provider_cfg = _extract_toml_table(text, "model_providers.skillclaw") + provider_cfg = _extract_toml_table(profile_text, "model_providers.skillclaw") configured_base_url = str(provider_cfg.get("base_url") or "") configured_wire_api = str(provider_cfg.get("wire_api") or "") configured_token = str(provider_cfg.get("experimental_bearer_token") or "") - profile_cfg = _extract_toml_table(text, "profiles.skillclaw") - configured_profile_model = str(profile_cfg.get("model") or "") - configured_profile_provider = str(profile_cfg.get("model_provider") or "") + configured_profile_model = str(_extract_top_level_toml_value(profile_text, "model") or "") + configured_profile_provider = str(_extract_top_level_toml_value(profile_text, "model_provider") or "") proxy_match = ( configured_profile_model == expected_model @@ -743,9 +750,11 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: if not config_path.exists(): issues.append("Codex config is missing: ~/.codex/config.toml") + if not profile_config_path.exists(): + issues.append("Codex SkillClaw profile config is missing: ~/.codex/skillclaw.config.toml") if not proxy_match: issues.append("Codex SkillClaw profile is missing or not pointing at the local SkillClaw proxy.") - next_steps.append("Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/config.toml.") + next_steps.append("Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/skillclaw.config.toml.") if configured_provider == "skillclaw": issues.append("Codex global model_provider still points at SkillClaw; normal Codex runs may be intercepted.") next_steps.append('Remove top-level `model_provider = "skillclaw"` or run `skillclaw restore codex`.') @@ -797,7 +806,12 @@ def restore_codex_config(backup_path: Path | None = None) -> dict[str, str]: text = source.read_text(encoding="utf-8") target = _CODEX_CONFIG_PATH _write_text_atomic(target, text, "Codex config restore") - return {"source": str(source), "target": str(target)} + profile_target = _CODEX_PROFILE_CONFIG_PATH + removed_profile = False + if profile_target.exists(): + profile_target.unlink() + removed_profile = True + return {"source": str(source), "target": str(target), "removed_profile": str(removed_profile)} # ------------------------------------------------------------------ # diff --git a/skillclaw/cli.py b/skillclaw/cli.py index 89c99a0..529c702 100644 --- a/skillclaw/cli.py +++ b/skillclaw/cli.py @@ -502,6 +502,8 @@ def restore_codex(backup_path: str | None): raise click.ClickException(str(exc)) from None click.echo(f"Restored Codex config: {result['target']} <- {result['source']}") + if result.get("removed_profile") == "True": + click.echo("Removed Codex SkillClaw profile config: ~/.codex/skillclaw.config.toml") @restore.command(name="claude") diff --git a/tests/test_codex_profile_integration.py b/tests/test_codex_profile_integration.py index 832c365..6fcbcbb 100644 --- a/tests/test_codex_profile_integration.py +++ b/tests/test_codex_profile_integration.py @@ -27,12 +27,14 @@ def record_injection(self, names: list[str]) -> None: def test_configure_codex_registers_profile_without_replacing_global_defaults(monkeypatch, tmp_path: Path) -> None: config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" config_path.parent.mkdir(parents=True) config_path.write_text( 'model = "gpt-5.5"\nmodel_provider = "openai"\n\n[profiles.default]\nmodel = "gpt-5.5"\n', encoding="utf-8", ) monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", tmp_path / ".codex" / "skills") monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") @@ -47,24 +49,38 @@ def test_configure_codex_registers_profile_without_replacing_global_defaults(mon text = config_path.read_text(encoding="utf-8") assert 'model = "gpt-5.5"' in text assert 'model_provider = "openai"' in text - assert "[model_providers.skillclaw]" in text - assert 'base_url = "http://127.0.0.1:31000/v1"' in text - assert 'wire_api = "responses"' in text - assert 'experimental_bearer_token = "skillclaw-key"' in text - assert "[profiles.skillclaw]" in text - assert 'model = "skillclaw-model"' in text - assert 'model_provider = "skillclaw"' in text + assert "[profiles.skillclaw]" not in text + assert "[model_providers.skillclaw]" not in text + profile_text = profile_config_path.read_text(encoding="utf-8") + assert 'model = "skillclaw-model"' in profile_text + assert 'model_provider = "skillclaw"' in profile_text + assert "[model_providers.skillclaw]" in profile_text + assert 'base_url = "http://127.0.0.1:31000/v1"' in profile_text + assert 'wire_api = "responses"' in profile_text + assert 'experimental_bearer_token = "skillclaw-key"' in profile_text assert (tmp_path / ".codex" / "skills").is_dir() def test_configure_codex_removes_legacy_global_skillclaw_defaults(monkeypatch, tmp_path: Path) -> None: config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" config_path.parent.mkdir(parents=True) config_path.write_text( - 'model = "skillclaw-model"\nmodel_provider = "skillclaw"\n\n[profiles.default]\nmodel = "gpt-5.5"\n', + ( + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[model_providers.skillclaw]\n" + 'base_url = "http://127.0.0.1:30000/v1"\n\n' + "[profiles.skillclaw]\n" + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[profiles.default]\n" + 'model = "gpt-5.5"\n' + ), encoding="utf-8", ) monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", tmp_path / ".codex" / "skills") monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") @@ -73,7 +89,70 @@ def test_configure_codex_removes_legacy_global_skillclaw_defaults(monkeypatch, t top_level = config_path.read_text(encoding="utf-8").split("[", 1)[0] assert "model_provider" not in top_level assert "model =" not in top_level - assert "[profiles.skillclaw]" in config_path.read_text(encoding="utf-8") + text = config_path.read_text(encoding="utf-8") + assert "[profiles.skillclaw]" not in text + assert "[model_providers.skillclaw]" not in text + assert "[profiles.default]" in text + assert "[model_providers.skillclaw]" in profile_config_path.read_text(encoding="utf-8") + + +def test_inspect_codex_config_reads_split_profile_config(monkeypatch, tmp_path: Path) -> None: + config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" + skills_dir = tmp_path / ".codex" / "skills" + config_path.parent.mkdir(parents=True) + skills_dir.mkdir() + config_path.write_text('model = "gpt-5.5"\nmodel_provider = "openai"\n', encoding="utf-8") + profile_config_path.write_text( + ( + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[model_providers.skillclaw]\n" + 'name = "SkillClaw"\n' + 'base_url = "http://127.0.0.1:31000/v1"\n' + 'wire_api = "responses"\n' + 'experimental_bearer_token = "skillclaw-key"\n' + ), + encoding="utf-8", + ) + monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", skills_dir) + monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") + + report = claw_adapter.inspect_codex_config( + SkillClawConfig( + served_model_name="skillclaw-model", + proxy_api_key="skillclaw-key", + proxy_port=31000, + skills_dir=str(skills_dir), + ) + ) + + assert report["status"] == "ok" + assert report["proxy_match"] is True + assert report["configured_profile_model"] == "skillclaw-model" + assert report["configured_base_url"] == "http://127.0.0.1:31000/v1" + + +def test_restore_codex_config_removes_split_profile_config(monkeypatch, tmp_path: Path) -> None: + config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" + backup_path = tmp_path / "backups" / "config.latest.toml" + config_path.parent.mkdir(parents=True) + backup_path.parent.mkdir(parents=True) + config_path.write_text('model_provider = "skillclaw"\n', encoding="utf-8") + profile_config_path.write_text('model_provider = "skillclaw"\n', encoding="utf-8") + backup_path.write_text('model_provider = "openai"\n', encoding="utf-8") + monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", backup_path.parent) + + result = claw_adapter.restore_codex_config() + + assert config_path.read_text(encoding="utf-8") == 'model_provider = "openai"\n' + assert not profile_config_path.exists() + assert result["removed_profile"] == "True" def test_codex_config_defaults_to_responses_mode_and_codex_skills(tmp_path: Path) -> None: diff --git a/tests/test_responses_native.py b/tests/test_responses_native.py index b22140f..f98154b 100644 --- a/tests/test_responses_native.py +++ b/tests/test_responses_native.py @@ -1,3 +1,6 @@ +import gzip +import json + import httpx import pytest @@ -122,6 +125,76 @@ async def fake_forward(body): assert seen["body"]["tools"] == [{"type": "custom", "name": "js_repl"}] +@pytest.mark.asyncio +async def test_native_responses_records_original_prompt_before_skill_injection(): + class FakeSkillManager: + def refresh_if_changed(self): + return None + + def build_injection_prompt(self, *, max_chars): + return "\n" + ("catalog filler " * 300) + "\n" + + def get_all_skills(self): + return [{"name": "demo-skill"}] + + def record_injection(self, names): + self.names = names + + server = SkillClawAPIServer( + SkillClawConfig( + llm_api_mode="responses", + llm_api_base="http://upstream.test/v1", + llm_model_id="upstream-model", + proxy_api_key="skillclaw", + record_enabled=False, + ), + skill_manager=FakeSkillManager(), + ) + seen = {} + + async def fake_forward(body): + seen["instructions"] = body.get("instructions", "") + return { + "id": "resp_native", + "object": "response", + "created_at": 0, + "status": "completed", + "model": "upstream-model", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "native ok", "annotations": []}], + } + ], + } + + server._forward_to_llm_responses = fake_forward + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=server.app), base_url="http://test") + try: + response = await client.post( + "/v1/responses", + headers={"Authorization": "Bearer skillclaw", "Session_id": "codex-session-1"}, + json={ + "model": "skillclaw-model", + "instructions": "original instructions", + "input": "actual user task", + "stream": False, + }, + ) + finally: + await client.aclose() + + assert response.status_code == 200 + assert "" in seen["instructions"] + turn = server._session_turns["codex-session-1"][0] + assert turn["prompt_text"] == "original instructions\nactual user task" + assert "" not in turn["prompt_text"] + assert turn["injected_skills"] == ["demo-skill"] + + @pytest.mark.asyncio async def test_forward_to_llm_responses_stream_preserves_upstream_sse(monkeypatch): captured = {} @@ -130,7 +203,7 @@ class FakeStreamResponse: def raise_for_status(self): return None - async def aiter_raw(self): + async def aiter_bytes(self): yield b'data: {"type":"response.created"}\n\n' yield b'data: {"type":"response.completed"}\n\n' yield b"data: [DONE]\n\n" @@ -191,6 +264,206 @@ def stream(self, method, url, json, headers): assert captured["json"]["tools"] == body["tools"] +@pytest.mark.asyncio +async def test_forward_to_llm_responses_stream_decodes_compressed_upstream_sse(monkeypatch): + raw_sse = ( + b'data: {"type":"response.created"}\n\n' + b'data: {"type":"response.completed"}\n\n' + b"data: [DONE]\n\n" + ) + + class FakeStreamResponse: + def raise_for_status(self): + return None + + def aiter_bytes(self): + response = httpx.Response( + 200, + headers={"content-encoding": "gzip"}, + stream=httpx.ByteStream(gzip.compress(raw_sse)), + ) + return response.aiter_bytes() + + class FakeStreamContext: + async def __aenter__(self): + return FakeStreamResponse() + + async def __aexit__(self, exc_type, exc, tb): + return False + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + def stream(self, method, url, json, headers): + return FakeStreamContext() + + monkeypatch.setattr(httpx, "AsyncClient", FakeAsyncClient) + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig( + llm_api_base="http://upstream.test/v1", + llm_api_key="upstream-key", + llm_model_id="upstream-model", + llm_api_mode="responses", + ) + + chunks = [] + async for chunk in server._stream_llm_responses({"model": "skillclaw-model", "input": "hi", "stream": True}): + chunks.append(chunk) + + assert b"".join(chunks) == raw_sse + + +@pytest.mark.asyncio +async def test_stream_and_track_responses_records_before_completed_chunk_is_consumed(): + server = object.__new__(SkillClawAPIServer) + server._session_turns = {} + server._safe_create_task = lambda coro: None + recorded = {} + + completed_event = { + "type": "response.completed", + "response": { + "id": "resp_native", + "object": "response", + "status": "completed", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "tracked ok", "annotations": []}], + } + ], + }, + } + + async def fake_stream(_body): + payload = ("data: " + json.dumps(completed_event) + "\n\n").encode() + yield payload[:20] + yield payload[20:] + yield b"data: [DONE]\n\n" + + def fake_record(session_id, request_body, response_payload, *, turn_type, injected_skills, session_done): + recorded["session_id"] = session_id + recorded["request_body"] = request_body + recorded["response_text"] = response_payload["output"][0]["content"][0]["text"] + recorded["turn_type"] = turn_type + recorded["injected_skills"] = injected_skills + recorded["session_done"] = session_done + + server._stream_llm_responses = fake_stream + server._record_responses_turn = fake_record + + stream = server._stream_and_track_responses( + {"model": "skillclaw-model", "instructions": "catalog", "stream": True}, + record_body={"model": "skillclaw-model", "instructions": "original instructions", "stream": True}, + session_id="codex-session-1", + turn_type="main", + injected_skills=["demo"], + session_done=False, + ) + first = await stream.__anext__() + second = await stream.__anext__() + assert first + second == ("data: " + json.dumps(completed_event) + "\n\n").encode() + assert recorded == { + "session_id": "codex-session-1", + "request_body": {"model": "skillclaw-model", "instructions": "original instructions", "stream": True}, + "response_text": "tracked ok", + "turn_type": "main", + "injected_skills": ["demo"], + "session_done": False, + } + await stream.aclose() + + +@pytest.mark.asyncio +async def test_stream_and_track_responses_records_output_from_stream_events_when_completed_lacks_output(): + server = object.__new__(SkillClawAPIServer) + recorded = {} + + events = [ + { + "type": "response.output_item.added", + "output_index": 0, + "item": { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "in_progress", + "content": [], + }, + }, + { + "type": "response.output_text.delta", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "delta": "real ", + }, + { + "type": "response.output_text.delta", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "delta": "ok", + }, + { + "type": "response.output_text.done", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "text": "real ok", + }, + { + "type": "response.completed", + "response": { + "id": "resp_1", + "object": "response", + "status": "completed", + "model": "gpt-5.5", + }, + }, + ] + + async def fake_stream(_body): + for event in events: + yield ("event: " + event["type"] + "\n").encode() + yield ("data: " + json.dumps(event) + "\n\n").encode() + + def fake_record(session_id, request_body, response_payload, *, turn_type, injected_skills, session_done): + recorded["session_id"] = session_id + recorded["response_text"] = response_payload["output"][0]["content"][0]["text"] + recorded["turn_type"] = turn_type + + server._stream_llm_responses = fake_stream + server._record_responses_turn = fake_record + + chunks = [] + async for chunk in server._stream_and_track_responses( + {"model": "skillclaw-model", "input": "hi", "stream": True}, + session_id="codex-session-1", + turn_type="main", + injected_skills=[], + session_done=False, + ): + chunks.append(chunk) + + assert b"".join(chunks).startswith(b"event: response.output_item.added\n") + assert recorded == { + "session_id": "codex-session-1", + "response_text": "real ok", + "turn_type": "main", + } + + @pytest.mark.asyncio async def test_responses_endpoint_passthroughs_native_stream(): server = SkillClawAPIServer( From 0347fcb680456ce73ccc4a138e967b7fd7bbbb17 Mon Sep 17 00:00:00 2001 From: StoneHanaMori Date: Wed, 27 May 2026 17:21:43 +0800 Subject: [PATCH 3/3] fix: format Codex Responses changes --- skillclaw/api_server.py | 31 ++++++++++++++++++++----------- skillclaw/claw_adapter.py | 4 +++- tests/test_responses_native.py | 6 +----- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index adff2e1..949772f 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -1616,7 +1616,9 @@ async def responses( ) response_payload = await owner._forward_to_llm_responses(body) owner._record_responses_turn( - session_id, record_body, response_payload, + session_id, + record_body, + response_payload, turn_type=turn_type, injected_skills=injected_skills, session_done=session_done, @@ -2610,17 +2612,22 @@ def _record_responses_turn( response_text = "\n".join(response_parts) turns = self._session_turns.setdefault(session_id, []) turn_num = len(turns) + 1 - turns.append({ - "turn_num": turn_num, - "prompt_text": prompt_text[:2000], - "response_text": response_text[:2000], - "injected_skills": injected_skills, - "prm_score": None, - }) + turns.append( + { + "turn_num": turn_num, + "prompt_text": prompt_text[:2000], + "response_text": response_text[:2000], + "injected_skills": injected_skills, + "prm_score": None, + } + ) logger.info( "[Codex] %s session=%s turn=%d prompt=%d chars response=%d chars skills=%s", - turn_type, session_id, turn_num, - len(prompt_text), len(response_text), + turn_type, + session_id, + turn_num, + len(prompt_text), + len(response_text), ",".join(injected_skills) if injected_skills else "(none)", ) self._maybe_upload_session_snapshot(session_id, turn_num) @@ -2774,7 +2781,9 @@ def parse_responses_stream_event(data: dict[str, Any]) -> dict[str, Any] | None: response_payload = parse_responses_stream_event(data) if isinstance(data, dict) else None if response_payload is not None: self._record_responses_turn( - session_id, record_body or body, response_payload, + session_id, + record_body or body, + response_payload, turn_type=turn_type, injected_skills=injected_skills, session_done=session_done, diff --git a/skillclaw/claw_adapter.py b/skillclaw/claw_adapter.py index b8d4e4d..b054c94 100644 --- a/skillclaw/claw_adapter.py +++ b/skillclaw/claw_adapter.py @@ -754,7 +754,9 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: issues.append("Codex SkillClaw profile config is missing: ~/.codex/skillclaw.config.toml") if not proxy_match: issues.append("Codex SkillClaw profile is missing or not pointing at the local SkillClaw proxy.") - next_steps.append("Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/skillclaw.config.toml.") + next_steps.append( + "Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/skillclaw.config.toml." + ) if configured_provider == "skillclaw": issues.append("Codex global model_provider still points at SkillClaw; normal Codex runs may be intercepted.") next_steps.append('Remove top-level `model_provider = "skillclaw"` or run `skillclaw restore codex`.') diff --git a/tests/test_responses_native.py b/tests/test_responses_native.py index f98154b..0547587 100644 --- a/tests/test_responses_native.py +++ b/tests/test_responses_native.py @@ -266,11 +266,7 @@ def stream(self, method, url, json, headers): @pytest.mark.asyncio async def test_forward_to_llm_responses_stream_decodes_compressed_upstream_sse(monkeypatch): - raw_sse = ( - b'data: {"type":"response.created"}\n\n' - b'data: {"type":"response.completed"}\n\n' - b"data: [DONE]\n\n" - ) + raw_sse = b'data: {"type":"response.created"}\n\ndata: {"type":"response.completed"}\n\ndata: [DONE]\n\n' class FakeStreamResponse: def raise_for_status(self):