diff --git a/skillclaw/api_server.py b/skillclaw/api_server.py index 8655d99..949772f 100644 --- a/skillclaw/api_server.py +++ b/skillclaw/api_server.py @@ -1593,14 +1593,36 @@ async def responses( body = await request.json() if owner._responses_native_enabled(): + record_body = copy.deepcopy(body) turn_type = _resolve_turn_type(x_turn_type, body.get("turn_type"), default="main") - body = owner._prepare_native_responses_body(body, turn_type=turn_type) + injected_skills = owner._prepare_native_responses_body_inplace(body, turn_type=turn_type) + _raw_sid = x_session_id or codex_session_id or body.get("session_id") or "" + session_id = _raw_sid or await owner._resolve_tui_session( + body.get("model", owner._served_model), + len(body.get("input", []) if isinstance(body.get("input"), list) else []), + ) + session_done = _resolve_session_done(x_session_done, body.get("session_done")) if bool(body.get("stream", False)): return StreamingResponse( - owner._stream_llm_responses(body), + owner._stream_and_track_responses( + body, + record_body=record_body, + session_id=session_id, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ), media_type="text/event-stream", ) response_payload = await owner._forward_to_llm_responses(body) + owner._record_responses_turn( + session_id, + record_body, + response_payload, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ) return JSONResponse(content=response_payload) previous_response_id = str(body.get("previous_response_id") or "").strip() @@ -2517,8 +2539,13 @@ def _prepare_responses_forward( def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str) -> dict[str, Any]: """Apply non-destructive SkillClaw hooks before native Responses forwarding.""" prepared = dict(body) + self._prepare_native_responses_body_inplace(prepared, turn_type=turn_type) + return prepared + + def _prepare_native_responses_body_inplace(self, body: dict[str, Any], *, turn_type: str) -> list[str]: + """Inject skills into a Responses body in-place. Returns injected skill names.""" if not self.skill_manager or turn_type != "main": - return prepared + return [] try: self.skill_manager.refresh_if_changed() @@ -2529,7 +2556,7 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str max_chars=getattr(self.config, "max_skills_prompt_chars", 30_000), ) if not skill_text: - return prepared + return [] all_skills = self.skill_manager.get_all_skills() skill_names = [s.get("name", "unknown_skill") for s in all_skills if isinstance(s, dict)] @@ -2540,9 +2567,72 @@ def _prepare_native_responses_body(self, body: dict[str, Any], *, turn_type: str ) self.skill_manager.record_injection(skill_names) - existing = _normalize_responses_content(prepared.get("instructions", "")) - prepared["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text - return prepared + existing = _normalize_responses_content(body.get("instructions", "")) + body["instructions"] = (existing + "\n\n" + skill_text).strip() if existing else skill_text + return skill_names + + def _record_responses_turn( + self, + session_id: str, + request_body: dict[str, Any], + response_payload: dict[str, Any], + *, + turn_type: str, + injected_skills: list[str], + session_done: bool, + ) -> None: + """Record a Responses API turn into the session tracking system.""" + if not session_id: + return + self._touch_session(session_id) + prompt_text = _normalize_responses_content(request_body.get("instructions", "")) + inp = request_body.get("input") + if isinstance(inp, str): + prompt_text = (prompt_text + "\n" + inp).strip() if prompt_text else inp + elif isinstance(inp, list): + user_parts = [] + for item in inp: + if isinstance(item, dict) and item.get("role") == "user": + user_parts.append(_normalize_responses_content(item.get("content", ""))) + if user_parts: + joined = " ".join(user_parts) + prompt_text = (prompt_text + "\n" + joined).strip() if prompt_text else joined + response_parts = [] + for item in response_payload.get("output", []): + if not isinstance(item, dict): + continue + if item.get("type") == "message": + for part in item.get("content", []): + if isinstance(part, dict) and part.get("type") == "output_text": + response_parts.append(part.get("text", "")) + elif item.get("type") == "function_call": + name = item.get("name", "") + args = str(item.get("arguments", ""))[:500] + response_parts.append(f"[tool:{name}] {args}") + response_text = "\n".join(response_parts) + turns = self._session_turns.setdefault(session_id, []) + turn_num = len(turns) + 1 + turns.append( + { + "turn_num": turn_num, + "prompt_text": prompt_text[:2000], + "response_text": response_text[:2000], + "injected_skills": injected_skills, + "prm_score": None, + } + ) + logger.info( + "[Codex] %s session=%s turn=%d prompt=%d chars response=%d chars skills=%s", + turn_type, + session_id, + turn_num, + len(prompt_text), + len(response_text), + ",".join(injected_skills) if injected_skills else "(none)", + ) + self._maybe_upload_session_snapshot(session_id, turn_num) + if session_done: + self._safe_create_task(self._close_session(session_id, reason="codex_session_done")) async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any]: """Forward a Codex Responses payload to an upstream Responses API.""" @@ -2592,6 +2682,118 @@ async def _forward_to_llm_responses(self, body: dict[str, Any]) -> dict[str, Any logger.error("[OpenClaw] Responses forward failed: %s", e, exc_info=True) raise HTTPException(status_code=502, detail=f"Responses forward error: {e}") from e + async def _stream_and_track_responses( + self, + body: dict[str, Any], + *, + record_body: dict[str, Any] | None = None, + session_id: str, + turn_type: str, + injected_skills: list[str], + session_done: bool, + ): + """Wrap _stream_llm_responses: passthrough SSE + parse response.completed inline.""" + tracked = False + buf = "" + output_items: dict[int, dict[str, Any]] = {} + output_text_parts: dict[tuple[int, int], str] = {} + + def ensure_message_item(output_index: int) -> dict[str, Any]: + item = output_items.setdefault( + output_index, + { + "type": "message", + "role": "assistant", + "status": "completed", + "content": [], + }, + ) + content = item.setdefault("content", []) + if not isinstance(content, list): + item["content"] = [] + return item + + def apply_output_text(output_index: int, content_index: int, text: str) -> None: + item = ensure_message_item(output_index) + content = item.setdefault("content", []) + while len(content) <= content_index: + content.append({"type": "output_text", "text": "", "annotations": []}) + part = content[content_index] + if isinstance(part, dict): + part["type"] = part.get("type") or "output_text" + part["text"] = text + part.setdefault("annotations", []) + + def parse_responses_stream_event(data: dict[str, Any]) -> dict[str, Any] | None: + event_type = data.get("type") + output_index = int(data.get("output_index", 0) or 0) + content_index = int(data.get("content_index", 0) or 0) + + if event_type == "response.output_item.added": + item = data.get("item") + if isinstance(item, dict): + output_items[output_index] = item + elif event_type == "response.output_item.done": + item = data.get("item") + if isinstance(item, dict): + output_items[output_index] = item + elif event_type == "response.output_text.delta": + key = (output_index, content_index) + output_text_parts[key] = output_text_parts.get(key, "") + str(data.get("delta") or "") + apply_output_text(output_index, content_index, output_text_parts[key]) + elif event_type == "response.output_text.done": + text = str(data.get("text") or output_text_parts.get((output_index, content_index), "")) + output_text_parts[(output_index, content_index)] = text + apply_output_text(output_index, content_index, text) + elif event_type == "response.content_part.done": + part = data.get("part") + if isinstance(part, dict) and part.get("type") == "output_text": + text = str(part.get("text") or output_text_parts.get((output_index, content_index), "")) + output_text_parts[(output_index, content_index)] = text + apply_output_text(output_index, content_index, text) + elif event_type == "response.completed": + response_payload = data.get("response") if isinstance(data.get("response"), dict) else dict(data) + if output_items and not response_payload.get("output"): + response_payload = { + **response_payload, + "output": [item for _, item in sorted(output_items.items())], + } + return response_payload + return None + + async for chunk in self._stream_llm_responses(body): + if not tracked: + try: + text = chunk.decode("utf-8", errors="ignore") if isinstance(chunk, bytes) else chunk + buf += text + while "\n" in buf: + line, buf = buf.split("\n", 1) + stripped = line.strip() + if not stripped.startswith("data: "): + continue + raw = stripped[6:] + if raw == "[DONE]": + continue + try: + data = json.loads(raw) + except Exception: + continue + response_payload = parse_responses_stream_event(data) if isinstance(data, dict) else None + if response_payload is not None: + self._record_responses_turn( + session_id, + record_body or body, + response_payload, + turn_type=turn_type, + injected_skills=injected_skills, + session_done=session_done, + ) + tracked = True + break + except Exception: + pass + yield chunk + async def _stream_llm_responses(self, body: dict[str, Any]): """Passthrough upstream Responses SSE without aggregating or rewriting events.""" import httpx @@ -2601,7 +2803,7 @@ async def _stream_llm_responses(self, body: dict[str, Any]): async with httpx.AsyncClient(timeout=_llm_request_timeout_seconds()) as client: async with client.stream("POST", url, json=send_body, headers=headers) as resp: resp.raise_for_status() - async for chunk in resp.aiter_raw(): + async for chunk in resp.aiter_bytes(): if chunk: yield chunk except httpx.HTTPStatusError as e: diff --git a/skillclaw/claw_adapter.py b/skillclaw/claw_adapter.py index b0636a1..b054c94 100644 --- a/skillclaw/claw_adapter.py +++ b/skillclaw/claw_adapter.py @@ -45,6 +45,7 @@ _HERMES_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "hermes" _CODEX_HOME = Path.home() / ".codex" _CODEX_CONFIG_PATH = _CODEX_HOME / "config.toml" +_CODEX_PROFILE_CONFIG_PATH = _CODEX_HOME / "skillclaw.config.toml" _CODEX_SKILLS_DIR = _CODEX_HOME / "skills" _CODEX_BACKUP_DIR = Path.home() / ".skillclaw" / "backups" / "codex" _CLAUDE_HOME = Path.home() / ".claude" @@ -656,7 +657,6 @@ def _build_codex_provider_block(base_url: str, api_key: str) -> str: def _build_codex_profile_block(model_id: str) -> str: lines = [ - "[profiles.skillclaw]", f"model = {_format_toml_value(model_id)}", 'model_provider = "skillclaw"', ] @@ -673,6 +673,7 @@ def _configure_codex(cfg: "SkillClawConfig") -> None: api_key = cfg.proxy_api_key or "skillclaw" base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1" config_path = _CODEX_CONFIG_PATH + profile_config_path = _CODEX_PROFILE_CONFIG_PATH _prepare_external_skills_dir(_CODEX_SKILLS_DIR, "Codex") existing_text = "" @@ -687,16 +688,17 @@ def _configure_codex(cfg: "SkillClawConfig") -> None: updated = _remove_top_level_toml_keys(updated, {"model", "model_provider"}) updated = _remove_toml_table(updated, "model_providers.skillclaw").rstrip() + "\n\n" updated = _remove_toml_table(updated, "profiles.skillclaw").rstrip() + "\n\n" - updated += _build_codex_provider_block(base_url, api_key) - updated += "\n" + _build_codex_profile_block(model_id) + profile_text = _build_codex_profile_block(model_id) + "\n" + _build_codex_provider_block(base_url, api_key) _backup_codex_config_if_changed(config_path, updated) _write_text_atomic(config_path, updated, "Codex config") + _write_text_atomic(profile_config_path, profile_text, "Codex SkillClaw profile config") def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: """Return a diagnostic snapshot of the local Codex integration state.""" config_path = _CODEX_CONFIG_PATH + profile_config_path = _CODEX_PROFILE_CONFIG_PATH expected_model = cfg.served_model_name or cfg.llm_model_id or "skillclaw-model" expected_base_url = f"http://127.0.0.1:{cfg.proxy_port}/v1" expected_api_key = cfg.proxy_api_key or "skillclaw" @@ -711,16 +713,21 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: text = config_path.read_text(encoding="utf-8") except Exception as e: logger.warning("[ClawAdapter] Failed to read Codex config %s: %s", config_path, e) + profile_text = "" + if profile_config_path.exists(): + try: + profile_text = profile_config_path.read_text(encoding="utf-8") + except Exception as e: + logger.warning("[ClawAdapter] Failed to read Codex profile config %s: %s", profile_config_path, e) configured_model = str(_extract_top_level_toml_value(text, "model") or "") configured_provider = str(_extract_top_level_toml_value(text, "model_provider") or "") - provider_cfg = _extract_toml_table(text, "model_providers.skillclaw") + provider_cfg = _extract_toml_table(profile_text, "model_providers.skillclaw") configured_base_url = str(provider_cfg.get("base_url") or "") configured_wire_api = str(provider_cfg.get("wire_api") or "") configured_token = str(provider_cfg.get("experimental_bearer_token") or "") - profile_cfg = _extract_toml_table(text, "profiles.skillclaw") - configured_profile_model = str(profile_cfg.get("model") or "") - configured_profile_provider = str(profile_cfg.get("model_provider") or "") + configured_profile_model = str(_extract_top_level_toml_value(profile_text, "model") or "") + configured_profile_provider = str(_extract_top_level_toml_value(profile_text, "model_provider") or "") proxy_match = ( configured_profile_model == expected_model @@ -743,9 +750,13 @@ def inspect_codex_config(cfg: "SkillClawConfig") -> dict[str, object]: if not config_path.exists(): issues.append("Codex config is missing: ~/.codex/config.toml") + if not profile_config_path.exists(): + issues.append("Codex SkillClaw profile config is missing: ~/.codex/skillclaw.config.toml") if not proxy_match: issues.append("Codex SkillClaw profile is missing or not pointing at the local SkillClaw proxy.") - next_steps.append("Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/config.toml.") + next_steps.append( + "Start SkillClaw once with `claw_type=codex` so it can register ~/.codex/skillclaw.config.toml." + ) if configured_provider == "skillclaw": issues.append("Codex global model_provider still points at SkillClaw; normal Codex runs may be intercepted.") next_steps.append('Remove top-level `model_provider = "skillclaw"` or run `skillclaw restore codex`.') @@ -797,7 +808,12 @@ def restore_codex_config(backup_path: Path | None = None) -> dict[str, str]: text = source.read_text(encoding="utf-8") target = _CODEX_CONFIG_PATH _write_text_atomic(target, text, "Codex config restore") - return {"source": str(source), "target": str(target)} + profile_target = _CODEX_PROFILE_CONFIG_PATH + removed_profile = False + if profile_target.exists(): + profile_target.unlink() + removed_profile = True + return {"source": str(source), "target": str(target), "removed_profile": str(removed_profile)} # ------------------------------------------------------------------ # diff --git a/skillclaw/cli.py b/skillclaw/cli.py index 89c99a0..529c702 100644 --- a/skillclaw/cli.py +++ b/skillclaw/cli.py @@ -502,6 +502,8 @@ def restore_codex(backup_path: str | None): raise click.ClickException(str(exc)) from None click.echo(f"Restored Codex config: {result['target']} <- {result['source']}") + if result.get("removed_profile") == "True": + click.echo("Removed Codex SkillClaw profile config: ~/.codex/skillclaw.config.toml") @restore.command(name="claude") diff --git a/tests/test_codex_profile_integration.py b/tests/test_codex_profile_integration.py index 832c365..6fcbcbb 100644 --- a/tests/test_codex_profile_integration.py +++ b/tests/test_codex_profile_integration.py @@ -27,12 +27,14 @@ def record_injection(self, names: list[str]) -> None: def test_configure_codex_registers_profile_without_replacing_global_defaults(monkeypatch, tmp_path: Path) -> None: config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" config_path.parent.mkdir(parents=True) config_path.write_text( 'model = "gpt-5.5"\nmodel_provider = "openai"\n\n[profiles.default]\nmodel = "gpt-5.5"\n', encoding="utf-8", ) monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", tmp_path / ".codex" / "skills") monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") @@ -47,24 +49,38 @@ def test_configure_codex_registers_profile_without_replacing_global_defaults(mon text = config_path.read_text(encoding="utf-8") assert 'model = "gpt-5.5"' in text assert 'model_provider = "openai"' in text - assert "[model_providers.skillclaw]" in text - assert 'base_url = "http://127.0.0.1:31000/v1"' in text - assert 'wire_api = "responses"' in text - assert 'experimental_bearer_token = "skillclaw-key"' in text - assert "[profiles.skillclaw]" in text - assert 'model = "skillclaw-model"' in text - assert 'model_provider = "skillclaw"' in text + assert "[profiles.skillclaw]" not in text + assert "[model_providers.skillclaw]" not in text + profile_text = profile_config_path.read_text(encoding="utf-8") + assert 'model = "skillclaw-model"' in profile_text + assert 'model_provider = "skillclaw"' in profile_text + assert "[model_providers.skillclaw]" in profile_text + assert 'base_url = "http://127.0.0.1:31000/v1"' in profile_text + assert 'wire_api = "responses"' in profile_text + assert 'experimental_bearer_token = "skillclaw-key"' in profile_text assert (tmp_path / ".codex" / "skills").is_dir() def test_configure_codex_removes_legacy_global_skillclaw_defaults(monkeypatch, tmp_path: Path) -> None: config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" config_path.parent.mkdir(parents=True) config_path.write_text( - 'model = "skillclaw-model"\nmodel_provider = "skillclaw"\n\n[profiles.default]\nmodel = "gpt-5.5"\n', + ( + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[model_providers.skillclaw]\n" + 'base_url = "http://127.0.0.1:30000/v1"\n\n' + "[profiles.skillclaw]\n" + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[profiles.default]\n" + 'model = "gpt-5.5"\n' + ), encoding="utf-8", ) monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", tmp_path / ".codex" / "skills") monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") @@ -73,7 +89,70 @@ def test_configure_codex_removes_legacy_global_skillclaw_defaults(monkeypatch, t top_level = config_path.read_text(encoding="utf-8").split("[", 1)[0] assert "model_provider" not in top_level assert "model =" not in top_level - assert "[profiles.skillclaw]" in config_path.read_text(encoding="utf-8") + text = config_path.read_text(encoding="utf-8") + assert "[profiles.skillclaw]" not in text + assert "[model_providers.skillclaw]" not in text + assert "[profiles.default]" in text + assert "[model_providers.skillclaw]" in profile_config_path.read_text(encoding="utf-8") + + +def test_inspect_codex_config_reads_split_profile_config(monkeypatch, tmp_path: Path) -> None: + config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" + skills_dir = tmp_path / ".codex" / "skills" + config_path.parent.mkdir(parents=True) + skills_dir.mkdir() + config_path.write_text('model = "gpt-5.5"\nmodel_provider = "openai"\n', encoding="utf-8") + profile_config_path.write_text( + ( + 'model = "skillclaw-model"\n' + 'model_provider = "skillclaw"\n\n' + "[model_providers.skillclaw]\n" + 'name = "SkillClaw"\n' + 'base_url = "http://127.0.0.1:31000/v1"\n' + 'wire_api = "responses"\n' + 'experimental_bearer_token = "skillclaw-key"\n' + ), + encoding="utf-8", + ) + monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_SKILLS_DIR", skills_dir) + monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", tmp_path / "backups") + + report = claw_adapter.inspect_codex_config( + SkillClawConfig( + served_model_name="skillclaw-model", + proxy_api_key="skillclaw-key", + proxy_port=31000, + skills_dir=str(skills_dir), + ) + ) + + assert report["status"] == "ok" + assert report["proxy_match"] is True + assert report["configured_profile_model"] == "skillclaw-model" + assert report["configured_base_url"] == "http://127.0.0.1:31000/v1" + + +def test_restore_codex_config_removes_split_profile_config(monkeypatch, tmp_path: Path) -> None: + config_path = tmp_path / ".codex" / "config.toml" + profile_config_path = tmp_path / ".codex" / "skillclaw.config.toml" + backup_path = tmp_path / "backups" / "config.latest.toml" + config_path.parent.mkdir(parents=True) + backup_path.parent.mkdir(parents=True) + config_path.write_text('model_provider = "skillclaw"\n', encoding="utf-8") + profile_config_path.write_text('model_provider = "skillclaw"\n', encoding="utf-8") + backup_path.write_text('model_provider = "openai"\n', encoding="utf-8") + monkeypatch.setattr(claw_adapter, "_CODEX_CONFIG_PATH", config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_PROFILE_CONFIG_PATH", profile_config_path) + monkeypatch.setattr(claw_adapter, "_CODEX_BACKUP_DIR", backup_path.parent) + + result = claw_adapter.restore_codex_config() + + assert config_path.read_text(encoding="utf-8") == 'model_provider = "openai"\n' + assert not profile_config_path.exists() + assert result["removed_profile"] == "True" def test_codex_config_defaults_to_responses_mode_and_codex_skills(tmp_path: Path) -> None: diff --git a/tests/test_responses_native.py b/tests/test_responses_native.py index b22140f..0547587 100644 --- a/tests/test_responses_native.py +++ b/tests/test_responses_native.py @@ -1,3 +1,6 @@ +import gzip +import json + import httpx import pytest @@ -122,6 +125,76 @@ async def fake_forward(body): assert seen["body"]["tools"] == [{"type": "custom", "name": "js_repl"}] +@pytest.mark.asyncio +async def test_native_responses_records_original_prompt_before_skill_injection(): + class FakeSkillManager: + def refresh_if_changed(self): + return None + + def build_injection_prompt(self, *, max_chars): + return "\n" + ("catalog filler " * 300) + "\n" + + def get_all_skills(self): + return [{"name": "demo-skill"}] + + def record_injection(self, names): + self.names = names + + server = SkillClawAPIServer( + SkillClawConfig( + llm_api_mode="responses", + llm_api_base="http://upstream.test/v1", + llm_model_id="upstream-model", + proxy_api_key="skillclaw", + record_enabled=False, + ), + skill_manager=FakeSkillManager(), + ) + seen = {} + + async def fake_forward(body): + seen["instructions"] = body.get("instructions", "") + return { + "id": "resp_native", + "object": "response", + "created_at": 0, + "status": "completed", + "model": "upstream-model", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "native ok", "annotations": []}], + } + ], + } + + server._forward_to_llm_responses = fake_forward + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=server.app), base_url="http://test") + try: + response = await client.post( + "/v1/responses", + headers={"Authorization": "Bearer skillclaw", "Session_id": "codex-session-1"}, + json={ + "model": "skillclaw-model", + "instructions": "original instructions", + "input": "actual user task", + "stream": False, + }, + ) + finally: + await client.aclose() + + assert response.status_code == 200 + assert "" in seen["instructions"] + turn = server._session_turns["codex-session-1"][0] + assert turn["prompt_text"] == "original instructions\nactual user task" + assert "" not in turn["prompt_text"] + assert turn["injected_skills"] == ["demo-skill"] + + @pytest.mark.asyncio async def test_forward_to_llm_responses_stream_preserves_upstream_sse(monkeypatch): captured = {} @@ -130,7 +203,7 @@ class FakeStreamResponse: def raise_for_status(self): return None - async def aiter_raw(self): + async def aiter_bytes(self): yield b'data: {"type":"response.created"}\n\n' yield b'data: {"type":"response.completed"}\n\n' yield b"data: [DONE]\n\n" @@ -191,6 +264,202 @@ def stream(self, method, url, json, headers): assert captured["json"]["tools"] == body["tools"] +@pytest.mark.asyncio +async def test_forward_to_llm_responses_stream_decodes_compressed_upstream_sse(monkeypatch): + raw_sse = b'data: {"type":"response.created"}\n\ndata: {"type":"response.completed"}\n\ndata: [DONE]\n\n' + + class FakeStreamResponse: + def raise_for_status(self): + return None + + def aiter_bytes(self): + response = httpx.Response( + 200, + headers={"content-encoding": "gzip"}, + stream=httpx.ByteStream(gzip.compress(raw_sse)), + ) + return response.aiter_bytes() + + class FakeStreamContext: + async def __aenter__(self): + return FakeStreamResponse() + + async def __aexit__(self, exc_type, exc, tb): + return False + + class FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + def stream(self, method, url, json, headers): + return FakeStreamContext() + + monkeypatch.setattr(httpx, "AsyncClient", FakeAsyncClient) + server = object.__new__(SkillClawAPIServer) + server.config = SkillClawConfig( + llm_api_base="http://upstream.test/v1", + llm_api_key="upstream-key", + llm_model_id="upstream-model", + llm_api_mode="responses", + ) + + chunks = [] + async for chunk in server._stream_llm_responses({"model": "skillclaw-model", "input": "hi", "stream": True}): + chunks.append(chunk) + + assert b"".join(chunks) == raw_sse + + +@pytest.mark.asyncio +async def test_stream_and_track_responses_records_before_completed_chunk_is_consumed(): + server = object.__new__(SkillClawAPIServer) + server._session_turns = {} + server._safe_create_task = lambda coro: None + recorded = {} + + completed_event = { + "type": "response.completed", + "response": { + "id": "resp_native", + "object": "response", + "status": "completed", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "tracked ok", "annotations": []}], + } + ], + }, + } + + async def fake_stream(_body): + payload = ("data: " + json.dumps(completed_event) + "\n\n").encode() + yield payload[:20] + yield payload[20:] + yield b"data: [DONE]\n\n" + + def fake_record(session_id, request_body, response_payload, *, turn_type, injected_skills, session_done): + recorded["session_id"] = session_id + recorded["request_body"] = request_body + recorded["response_text"] = response_payload["output"][0]["content"][0]["text"] + recorded["turn_type"] = turn_type + recorded["injected_skills"] = injected_skills + recorded["session_done"] = session_done + + server._stream_llm_responses = fake_stream + server._record_responses_turn = fake_record + + stream = server._stream_and_track_responses( + {"model": "skillclaw-model", "instructions": "catalog", "stream": True}, + record_body={"model": "skillclaw-model", "instructions": "original instructions", "stream": True}, + session_id="codex-session-1", + turn_type="main", + injected_skills=["demo"], + session_done=False, + ) + first = await stream.__anext__() + second = await stream.__anext__() + assert first + second == ("data: " + json.dumps(completed_event) + "\n\n").encode() + assert recorded == { + "session_id": "codex-session-1", + "request_body": {"model": "skillclaw-model", "instructions": "original instructions", "stream": True}, + "response_text": "tracked ok", + "turn_type": "main", + "injected_skills": ["demo"], + "session_done": False, + } + await stream.aclose() + + +@pytest.mark.asyncio +async def test_stream_and_track_responses_records_output_from_stream_events_when_completed_lacks_output(): + server = object.__new__(SkillClawAPIServer) + recorded = {} + + events = [ + { + "type": "response.output_item.added", + "output_index": 0, + "item": { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "in_progress", + "content": [], + }, + }, + { + "type": "response.output_text.delta", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "delta": "real ", + }, + { + "type": "response.output_text.delta", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "delta": "ok", + }, + { + "type": "response.output_text.done", + "output_index": 0, + "content_index": 0, + "item_id": "msg_1", + "text": "real ok", + }, + { + "type": "response.completed", + "response": { + "id": "resp_1", + "object": "response", + "status": "completed", + "model": "gpt-5.5", + }, + }, + ] + + async def fake_stream(_body): + for event in events: + yield ("event: " + event["type"] + "\n").encode() + yield ("data: " + json.dumps(event) + "\n\n").encode() + + def fake_record(session_id, request_body, response_payload, *, turn_type, injected_skills, session_done): + recorded["session_id"] = session_id + recorded["response_text"] = response_payload["output"][0]["content"][0]["text"] + recorded["turn_type"] = turn_type + + server._stream_llm_responses = fake_stream + server._record_responses_turn = fake_record + + chunks = [] + async for chunk in server._stream_and_track_responses( + {"model": "skillclaw-model", "input": "hi", "stream": True}, + session_id="codex-session-1", + turn_type="main", + injected_skills=[], + session_done=False, + ): + chunks.append(chunk) + + assert b"".join(chunks).startswith(b"event: response.output_item.added\n") + assert recorded == { + "session_id": "codex-session-1", + "response_text": "real ok", + "turn_type": "main", + } + + @pytest.mark.asyncio async def test_responses_endpoint_passthroughs_native_stream(): server = SkillClawAPIServer(