Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions examples/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,18 @@ struct server_response_reader {
return !cancelled && received_count < id_tasks.size();
}

// cancel-cascade fix: true only if one of THIS reader's tasks is on a
// slot (the active decode). Used to gate llama_decode_stop() so a queued/
// deferred task's disconnect cannot abort another task's active decode via
// the process-global stop_internal_decode flag. Best-effort cross-thread
// read (slots are not resized at runtime; same race class as the global).
bool any_task_on_slot() const {
for (const auto & slot : ctx_server.slots) {
if (slot.is_processing() && id_tasks.count(slot.id_task)) return true;
}
return false;
}

// return nullptr if should_stop() is true before receiving a result
// note: if one error is received, it will stop further processing and return error result
server_task_result_ptr next(const std::function<bool()>& should_stop) {
Expand Down Expand Up @@ -1127,7 +1139,7 @@ int main(int argc, char ** argv) {
// non-stream, wait for the results
auto all_results = rd->wait_for_all(is_connection_closed);
if (all_results.is_terminated) {
llama_decode_stop(); // send a signal to stop decode process
if (rd->any_task_on_slot()) llama_decode_stop(); // cancel-cascade fix: stop only if OUR task is the active decode
return; // connection is closed
}
else if (all_results.error) {
Expand All @@ -1150,7 +1162,7 @@ int main(int argc, char ** argv) {
// ref: https://github.com/ggml-org/llama.cpp/pull/16486#discussion_r2419657309
server_task_result_ptr first_result = rd->next(is_connection_closed);
if (first_result == nullptr) {
llama_decode_stop(); // send a signal to stop decode process
if (rd->any_task_on_slot()) llama_decode_stop(); // cancel-cascade fix: stop only if OUR task is the active decode
return; // connection is closed
}
else if (first_result->is_error()) {
Expand Down Expand Up @@ -1479,7 +1491,7 @@ int main(int argc, char ** argv) {

// collect results
if (all_results.is_terminated) {
llama_decode_stop();
if (rd.any_task_on_slot()) llama_decode_stop(); // cancel-cascade fix: stop only if OUR task is the active decode
return; // connection is closed
}
else if (all_results.error) {
Expand Down