From 76b1485f3a004ce8b7e778b7338ca3528d0db15a Mon Sep 17 00:00:00 2001 From: Nadeem Date: Thu, 28 May 2026 16:02:17 +0400 Subject: [PATCH] Support live Responses previous_response_id reuse --- README.md | 10 +- ds4_server.c | 339 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 271 insertions(+), 78 deletions(-) diff --git a/README.md b/README.md index d0f4aa9d6..f1f6360cc 100644 --- a/README.md +++ b/README.md @@ -454,9 +454,13 @@ tool calls are mapped back to OpenAI tool calls. `/v1/responses` accepts OpenAI Responses-style `input`, `instructions`, `tools`, `tool_choice`, `max_output_tokens`, `temperature`, `top_p`, `stream`, -and `reasoning`. It is the preferred endpoint for Codex CLI. The server keeps -Responses continuations bound to live state when possible, and can fall back to -the same DSML rendering and KV prefix reuse used by chat completions. +`previous_response_id`, and `reasoning`. It is the preferred endpoint for Codex +CLI. The server keeps Responses continuations bound to live state when possible, +and can fall back to the same DSML rendering and KV prefix reuse used by chat +completions. `previous_response_id` is supported for the live in-memory +continuation case: if the referenced response is no longer the current live KV +frontier, the server returns `409` and the client should replay the full input +history. `/v1/messages` is the Anthropic-compatible endpoint used by Claude Code style clients. It accepts `system`, `messages`, `tools`, `tool_choice`, `max_tokens`, diff --git a/ds4_server.c b/ds4_server.c index a9930d603..0ccd06d28 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -623,6 +623,8 @@ typedef struct { * opaque reasoning state from a future implementation). */ bool responses_requires_live_tool_state; bool responses_requires_live_reasoning; + char *responses_previous_response_id; + char *responses_previous_suffix_text; stop_list responses_live_call_ids; char *responses_live_suffix_text; bool anthropic_requires_live_tool_state; @@ -762,6 +764,8 @@ static void request_free(request *r) { free(r->stops.v); free(r->raw_body); free(r->prompt_text); + free(r->responses_previous_response_id); + free(r->responses_previous_suffix_text); stop_list_clear(&r->responses_live_call_ids); free(r->responses_live_call_ids.v); free(r->responses_live_suffix_text); @@ -2268,86 +2272,107 @@ static bool chat_history_uses_tool_context(const chat_msgs *msgs, return false; } -static char *render_chat_prompt_text(const chat_msgs *msgs, const char *tool_schemas, - const tool_schema_orders *tool_orders, - ds4_think_mode think_mode) { - (void)tool_orders; +static void render_chat_turns_into(buf *out, const chat_msgs *msgs, + ds4_think_mode think_mode, + bool tool_context) { const bool think = ds4_think_mode_enabled(think_mode); - const bool tool_context = chat_history_uses_tool_context(msgs, tool_schemas); int last_user_idx = -1; - buf system = {0}; - /* Render tool schemas before the client system content so - * --kv-cache-boundary-trim-tokens chops a dynamic tail from the client - * message instead of the much larger tool-schema region. */ - if (tool_schemas && tool_schemas[0]) { - append_tools_prompt_text(&system, tool_schemas); - } - for (int i = 0; i < msgs->len; i++) { - const chat_msg *m = &msgs->v[i]; - if (!role_is_system(m->role)) continue; - if (system.len) buf_puts(&system, "\n\n"); - buf_puts(&system, m->content ? m->content : ""); - } - for (int i = 0; i < msgs->len; i++) { + for (int i = 0; msgs && i < msgs->len; i++) { const chat_msg *m = &msgs->v[i]; if (role_is_user_like(m->role)) last_user_idx = i; } - buf out = {0}; - buf_puts(&out, "<|begin▁of▁sentence|>"); - if (think_mode == DS4_THINK_MAX) buf_puts(&out, ds4_think_max_prefix()); - buf_puts(&out, system.ptr ? system.ptr : ""); - bool pending_assistant = false; bool pending_tool_result = false; - for (int i = 0; i < msgs->len; i++) { + for (int i = 0; msgs && i < msgs->len; i++) { const chat_msg *m = &msgs->v[i]; if (role_is_system(m->role)) { continue; } else if (!strcmp(m->role, "user")) { - buf_puts(&out, "<|User|>"); - buf_puts(&out, m->content ? m->content : ""); + buf_puts(out, "<|User|>"); + buf_puts(out, m->content ? m->content : ""); pending_assistant = true; pending_tool_result = false; } else if (!strcmp(m->role, "tool") || !strcmp(m->role, "function")) { - if (!pending_tool_result) buf_puts(&out, "<|User|>"); - buf_puts(&out, ""); - append_tool_result_text(&out, m->content); - buf_puts(&out, ""); + if (!pending_tool_result) buf_puts(out, "<|User|>"); + buf_puts(out, ""); + append_tool_result_text(out, m->content); + buf_puts(out, ""); pending_assistant = true; pending_tool_result = true; } else if (!strcmp(m->role, "assistant")) { if (pending_assistant) { - buf_puts(&out, "<|Assistant|>"); + buf_puts(out, "<|Assistant|>"); if (think) { if (tool_context || i > last_user_idx) { - buf_puts(&out, ""); - buf_puts(&out, m->reasoning ? m->reasoning : ""); - buf_puts(&out, ""); + buf_puts(out, ""); + buf_puts(out, m->reasoning ? m->reasoning : ""); + buf_puts(out, ""); } else { - buf_puts(&out, ""); + buf_puts(out, ""); } } else { - buf_puts(&out, ""); + buf_puts(out, ""); } } - buf_puts(&out, m->content ? m->content : ""); - append_dsml_tool_calls_text(&out, &m->calls); - buf_puts(&out, "<|end▁of▁sentence|>"); + buf_puts(out, m->content ? m->content : ""); + append_dsml_tool_calls_text(out, &m->calls); + buf_puts(out, "<|end▁of▁sentence|>"); pending_assistant = false; pending_tool_result = false; } } if (pending_assistant) { - buf_puts(&out, "<|Assistant|>"); - buf_puts(&out, think ? "" : ""); + buf_puts(out, "<|Assistant|>"); + buf_puts(out, think ? "" : ""); } +} + +static char *render_chat_prompt_text(const chat_msgs *msgs, const char *tool_schemas, + const tool_schema_orders *tool_orders, + ds4_think_mode think_mode) { + (void)tool_orders; + const bool tool_context = chat_history_uses_tool_context(msgs, tool_schemas); + buf system = {0}; + /* Render tool schemas before the client system content so + * --kv-cache-boundary-trim-tokens chops a dynamic tail from the client + * message instead of the much larger tool-schema region. */ + if (tool_schemas && tool_schemas[0]) { + append_tools_prompt_text(&system, tool_schemas); + } + for (int i = 0; i < msgs->len; i++) { + const chat_msg *m = &msgs->v[i]; + if (!role_is_system(m->role)) continue; + if (system.len) buf_puts(&system, "\n\n"); + buf_puts(&system, m->content ? m->content : ""); + } + + buf out = {0}; + buf_puts(&out, "<|begin▁of▁sentence|>"); + if (think_mode == DS4_THINK_MAX) buf_puts(&out, ds4_think_max_prefix()); + buf_puts(&out, system.ptr ? system.ptr : ""); + render_chat_turns_into(&out, msgs, think_mode, tool_context); buf_free(&system); return buf_take(&out); } +static bool previous_response_tail_supported(const chat_msgs *msgs) { + for (int i = 0; msgs && i < msgs->len; i++) { + const chat_msg *m = &msgs->v[i]; + if (role_is_system(m->role) || !strcmp(m->role, "assistant")) return false; + } + return true; +} + +static char *render_previous_response_suffix_text(const chat_msgs *msgs, + ds4_think_mode think_mode) { + buf out = {0}; + render_chat_turns_into(&out, msgs, think_mode, true); + return buf_take(&out); +} + /* Render only the semantic tail that must be appended to the live KV for a * tool-result continuation. * @@ -3692,6 +3717,7 @@ static bool parse_responses_request(ds4_engine *e, server *s, const char *body, buf loaded_tool_schemas = {0}; char *instructions = NULL; char *tool_schemas = NULL; + bool saw_previous_response_id = false; json_ws(&p); if (*p != '{') goto bad; @@ -3831,19 +3857,20 @@ static bool parse_responses_request(ds4_engine *e, server *s, const char *body, * thinking. Other effort values choose between HIGH and MAX. */ if (reasoning_effort == DS4_THINK_NONE) thinking_enabled = false; } - } else if (!strcmp(key, "previous_response_id") || - !strcmp(key, "conversation")) - { - /* Official Responses state can be durable: - * previous_response_id chains to a stored prior response, and - * conversation points at a persistent Conversations object. - * - * DS4 does not yet implement that durable store. The supported - * modes are either (a) a live in-memory continuation checked by - * visible transcript / tool call ids, or (b) stateless replay of - * the full input items. Accepting a non-null durable reference - * without loading the referenced items would silently truncate the - * prompt, so reject it explicitly. */ + } else if (!strcmp(key, "previous_response_id")) { + free(r->responses_previous_response_id); + r->responses_previous_response_id = NULL; + json_ws(&p); + if (json_lit(&p, "null")) { + /* Stateless full replay. */ + } else if (!json_string(&p, &r->responses_previous_response_id)) { + free(key); + goto bad; + } else { + saw_previous_response_id = true; + } + } else if (!strcmp(key, "conversation")) { + /* Durable Conversations storage is not implemented yet. */ json_ws(&p); if (!json_lit(&p, "null")) { snprintf(err, errlen, @@ -3924,6 +3951,33 @@ static bool parse_responses_request(ds4_engine *e, server *s, const char *body, r->prompt_preserves_reasoning = chat_history_uses_tool_context(&msgs, active_tool_schemas); responses_prepare_live_continuation(r, &msgs); + if (saw_previous_response_id) { + if (!previous_response_tail_supported(&msgs)) { + snprintf(err, errlen, + "previous_response_id live continuation only supports new user/tool input; replay full input instead"); + chat_msgs_free(&msgs); + buf_free(&combined_tool_schemas); + buf_free(&loaded_tool_schemas); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } + free(r->responses_previous_suffix_text); + r->responses_previous_suffix_text = + render_previous_response_suffix_text(&msgs, r->think_mode); + if (!r->responses_previous_suffix_text || !r->responses_previous_suffix_text[0]) { + snprintf(err, errlen, + "previous_response_id live continuation has no input suffix; replay full input instead"); + chat_msgs_free(&msgs); + buf_free(&combined_tool_schemas); + buf_free(&loaded_tool_schemas); + free(instructions); + free(tool_schemas); + request_free(r); + return false; + } + } r->prompt_text = render_chat_prompt_text(&msgs, active_tool_schemas, &r->tool_orders, r->think_mode); ds4_tokenize_rendered_chat(e, r->prompt_text, &r->prompt); @@ -6018,10 +6072,11 @@ typedef struct { int sequence; /* monotonic per-event sequence_number Codex consumes */ } responses_stream; -static void responses_stream_init(const request *r, responses_stream *st) { +static void responses_stream_init(const request *r, responses_stream *st, const char *response_id) { memset(st, 0, sizeof(*st)); st->mode = ds4_think_mode_enabled(r->think_mode) ? RESP_STREAM_THINKING : RESP_STREAM_TEXT; - responses_random_id(st->response_id, sizeof(st->response_id), "resp_"); + if (response_id && response_id[0]) snprintf(st->response_id, sizeof(st->response_id), "%s", response_id); + else responses_random_id(st->response_id, sizeof(st->response_id), "resp_"); responses_random_id(st->reasoning_id, sizeof(st->reasoning_id), "rs_"); responses_random_id(st->message_id, sizeof(st->message_id), "msg_"); st->reasoning_index = -1; @@ -6705,9 +6760,9 @@ static bool responses_final_response(int fd, bool enable_cors, const char *text, const char *reasoning, const tool_calls *calls, const char *finish, int prompt_tokens, int completion_tokens) { - (void)id; char response_id[40], reasoning_id[40], message_id[40]; - responses_random_id(response_id, sizeof(response_id), "resp_"); + if (id && id[0]) snprintf(response_id, sizeof(response_id), "%s", id); + else responses_random_id(response_id, sizeof(response_id), "resp_"); responses_random_id(reasoning_id, sizeof(reasoning_id), "rs_"); responses_random_id(message_id, sizeof(message_id), "msg_"); @@ -7692,6 +7747,12 @@ typedef struct { size_t visible_len; } visible_live_state; +typedef struct { + bool valid; + char *response_id; + int live_tokens; +} responses_previous_state; + static bool id_list_contains(const stop_list *ids, const char *id); static void id_list_push_unique(stop_list *ids, const char *id); @@ -7704,6 +7765,7 @@ struct server { live_tool_state responses_live; live_tool_state anthropic_live; visible_live_state thinking_live; + responses_previous_state responses_previous; bool disable_exact_dsml_tool_replay; bool enable_cors; pthread_mutex_t tool_mu; @@ -7959,6 +8021,43 @@ static void visible_live_free(visible_live_state *st) { memset(st, 0, sizeof(*st)); } +static void responses_previous_clear_locked(responses_previous_state *st) { + if (!st) return; + free(st->response_id); + st->response_id = NULL; + st->live_tokens = 0; + st->valid = false; +} + +static void responses_previous_clear(server *s) { + if (!s) return; + pthread_mutex_lock(&s->tool_mu); + responses_previous_clear_locked(&s->responses_previous); + pthread_mutex_unlock(&s->tool_mu); +} + +static void responses_previous_remember(server *s, const char *response_id) { + if (!s || !response_id || !response_id[0]) return; + pthread_mutex_lock(&s->tool_mu); + responses_previous_clear_locked(&s->responses_previous); + s->responses_previous.response_id = xstrdup(response_id); + s->responses_previous.live_tokens = ds4_session_pos(s->session); + s->responses_previous.valid = true; + pthread_mutex_unlock(&s->tool_mu); +} + +static bool responses_previous_matches(server *s, const char *response_id, + int live_tokens) { + if (!s || !response_id || !response_id[0]) return false; + pthread_mutex_lock(&s->tool_mu); + bool ok = s->responses_previous.valid && + s->responses_previous.live_tokens == live_tokens && + s->responses_previous.response_id && + !strcmp(s->responses_previous.response_id, response_id); + pthread_mutex_unlock(&s->tool_mu); + return ok; +} + static void thinking_live_clear(server *s) { if (!s) return; pthread_mutex_lock(&s->tool_mu); @@ -8881,6 +8980,23 @@ static int responses_live_continuation_prompt(server *s, const request *req, return live_tokens->len; } +static int responses_previous_response_prompt(server *s, const request *req, + int live_pos, + ds4_tokens *effective_prompt) { + if (!s || !req || !effective_prompt) return 0; + if (req->api != API_RESPONSES || !req->responses_previous_response_id) return 0; + if (!req->responses_previous_suffix_text || !req->responses_previous_suffix_text[0]) return -1; + if (!responses_previous_matches(s, req->responses_previous_response_id, live_pos)) return -1; + + const ds4_tokens *live_tokens = ds4_session_tokens(s->session); + if (!live_tokens || live_tokens->len != live_pos) return -1; + + build_prompt_from_exact_prefix_and_text_suffix( + s->engine, live_tokens, req->responses_previous_suffix_text, + effective_prompt); + return live_tokens->len; +} + /* Tool-result Anthropic continuation. * * /v1/messages has no server-side response object like the OpenAI Responses @@ -9922,19 +10038,35 @@ static void generate_job(server *s, job *j) { int anthropic_live_match_ids = 0; /* Responses gets the first chance to continue from live state. This is * the whole point of the API shape: a request that is bound to prior live - * output by visible transcript or tool call ids does not need to prove an - * exact token-prefix match. Exact token/text/disk matching remains the - * fallback when the live state is absent or no longer describes the - * request. */ - int cached = responses_live_visible_prefix_prompt(s, &j->req, old_pos, + * output by previous_response_id, visible transcript, or tool call ids does + * not need to prove an exact token-prefix match. Exact token/text/disk + * matching remains the fallback when the live state is absent or no longer + * describes the request. */ + int cached = responses_previous_response_prompt(s, &j->req, old_pos, + &effective_prompt); + const char *cache_source = cached > 0 ? "responses-previous-id" : "none"; + if (cached < 0) { + ds4_tokens_free(&effective_prompt); + http_error(j->fd, s->enable_cors, 409, + "previous_response_id state is not available; retry by replaying the full input history"); + return; + } + int previous_id_cached = cached; + if (cached == 0) { + cached = responses_live_visible_prefix_prompt(s, &j->req, old_pos, &effective_prompt); - const char *cache_source = cached > 0 ? "responses-visible" : "none"; + cache_source = cached > 0 ? "responses-visible" : "none"; + } if (cached > 0) { - responses_live_match = "visible-prefix"; - if (responses_live_matches_request(s, &j->req.responses_live_call_ids, - old_pos)) - { - responses_live_match_ids = j->req.responses_live_call_ids.len; + if (previous_id_cached > 0) { + responses_live_match = "previous-response-id"; + } else { + responses_live_match = "visible-prefix"; + if (responses_live_matches_request(s, &j->req.responses_live_call_ids, + old_pos)) + { + responses_live_match_ids = j->req.responses_live_call_ids.len; + } } } if (cached == 0) { @@ -10201,9 +10333,14 @@ static void generate_job(server *s, job *j) { } } char id[96]; - snprintf(id, sizeof(id), "%s-%llu", - j->req.kind == REQ_CHAT ? "chatcmpl" : "cmpl", - (unsigned long long)++s->seq); + if (j->req.api == API_RESPONSES) { + responses_random_id(id, sizeof(id), "resp_"); + ++s->seq; + } else { + snprintf(id, sizeof(id), "%s-%llu", + j->req.kind == REQ_CHAT ? "chatcmpl" : "cmpl", + (unsigned long long)++s->seq); + } bool structured_stream = request_uses_structured_stream(&j->req); anthropic_stream anthropic_live = {0}; @@ -10252,7 +10389,7 @@ static void generate_job(server *s, job *j) { } if (openai_live_chat) openai_stream_start(&j->req, &openai_live); if (responses_live_chat) { - responses_stream_init(&j->req, &responses_live); + responses_stream_init(&j->req, &responses_live, id); responses_live.active = true; if (!responses_sse_created(j->fd, &j->req, &responses_live, responses_created_at)) { server_log(DS4_LOG_GENERATION, @@ -10749,11 +10886,15 @@ static void generate_job(server *s, job *j) { buf_puts(&visible, visible_suffix ? visible_suffix : ""); responses_live_remember(s, visible.ptr ? visible.ptr : "", parsed_calls.len ? &parsed_calls : NULL); + responses_previous_remember(s, id); buf_free(&visible); free(visible_suffix); } else { responses_live_clear(s); + responses_previous_clear(s); } + } else { + responses_previous_clear(s); } if (j->req.api == API_ANTHROPIC) { if (parsed_calls.len && strcmp(final_finish, "error") && @@ -11347,6 +11488,7 @@ static void server_close_resources(server *s) { live_tool_state_free(&s->responses_live); live_tool_state_free(&s->anthropic_live); visible_live_free(&s->thinking_live); + responses_previous_clear_locked(&s->responses_previous); pthread_mutex_destroy(&s->tool_mu); pthread_mutex_destroy(&s->trace_mu); pthread_cond_destroy(&s->clients_cv); @@ -12436,6 +12578,7 @@ static void test_responses_usage_reports_cache_details(void) { shutdown(sv[0], SHUT_WR); char *out = read_socket_text(sv[1]); + TEST_ASSERT(strstr(out, "\"id\":\"resp_usage\"") != NULL); TEST_ASSERT(strstr(out, "\"usage\":{\"input_tokens\":10") != NULL); TEST_ASSERT(strstr(out, "\"input_tokens_details\":{") != NULL); TEST_ASSERT(strstr(out, "\"cached_tokens\":7") != NULL); @@ -12454,7 +12597,7 @@ static void test_responses_usage_reports_cache_details(void) { } responses_stream st; - responses_stream_init(&r, &st); + responses_stream_init(&r, &st, NULL); TEST_ASSERT(responses_sse_completed(sv[0], &r, &st, NULL, NULL, "stop", 10, 2, 1234)); shutdown(sv[0], SHUT_WR); @@ -13930,6 +14073,49 @@ static void test_tool_checkpoint_canonicalization_gate_exact_replay(void) { tool_calls_free(&calls); } +static void test_responses_previous_response_suffix_renders_user_tail(void) { + chat_msgs msgs = {0}; + chat_msg user = {0}; + user.role = xstrdup("user"); + user.content = xstrdup("Continue with one test."); + chat_msgs_push(&msgs, user); + + TEST_ASSERT(previous_response_tail_supported(&msgs)); + char *suffix = render_previous_response_suffix_text(&msgs, DS4_THINK_HIGH); + TEST_ASSERT(suffix != NULL); + TEST_ASSERT(!strcmp(suffix, + "<|User|>Continue with one test.<|Assistant|>")); + + free(suffix); + chat_msgs_free(&msgs); +} + +static void test_responses_previous_response_suffix_rejects_assistant_replay(void) { + chat_msgs msgs = {0}; + chat_msg assistant = {0}; + assistant.role = xstrdup("assistant"); + assistant.content = xstrdup("already answered"); + chat_msgs_push(&msgs, assistant); + + TEST_ASSERT(!previous_response_tail_supported(&msgs)); + chat_msgs_free(&msgs); +} + +static void test_responses_previous_state_matches_live_id_and_pos(void) { + server s = {0}; + pthread_mutex_init(&s.tool_mu, NULL); + s.responses_previous.valid = true; + s.responses_previous.response_id = xstrdup("resp_live"); + s.responses_previous.live_tokens = 42; + + TEST_ASSERT(responses_previous_matches(&s, "resp_live", 42)); + TEST_ASSERT(!responses_previous_matches(&s, "resp_other", 42)); + TEST_ASSERT(!responses_previous_matches(&s, "resp_live", 41)); + + responses_previous_clear_locked(&s.responses_previous); + pthread_mutex_destroy(&s.tool_mu); +} + static void test_responses_live_tail_renders_tool_outputs_only(void) { request r; request_init(&r, REQ_CHAT, 128); @@ -15563,6 +15749,9 @@ static void ds4_server_unit_tests_run(void) { test_anthropic_full_replay_allows_unknown_live_id(); test_anthropic_tool_use_parses_before_role(); test_tool_checkpoint_canonicalization_gate_exact_replay(); + test_responses_previous_response_suffix_renders_user_tail(); + test_responses_previous_response_suffix_rejects_assistant_replay(); + test_responses_previous_state_matches_live_id_and_pos(); test_responses_live_tail_renders_tool_outputs_only(); test_responses_tool_output_id_validation(); test_responses_stateless_tool_replay_requires_reasoning();