Skip to content

Commit cab61d6

Browse files
committed
Fix stop generating: actually abort stream instead of draining in background
When user clicks Stop, worker_registry.stop_conversation() now marks the conversation as explicitly stopped. The CancelledError and GeneratorExit handlers in _execute_chat_completions_stream check this flag — if the conversation was user-stopped, the stream is closed immediately instead of being drained to completion in a background task. The background drain is preserved for passive disconnects (browser close, network drop) where the agent should continue processing so the response is saved for when the user returns.
1 parent a94d5d2 commit cab61d6

2 files changed

Lines changed: 39 additions & 0 deletions

File tree

agixt/WorkerRegistry.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(self):
2121
self._conversation_tasks: Dict[str, asyncio.Task] = (
2222
{}
2323
) # conversation_id -> task
24+
self._stopped_conversations: Set[str] = set() # explicitly stopped by user
2425
self._lock = threading.Lock()
2526

2627
def register_conversation(
@@ -76,8 +77,15 @@ def unregister_conversation(self, conversation_id: str) -> bool:
7677
if conversation_id in self._conversation_tasks:
7778
del self._conversation_tasks[conversation_id]
7879

80+
self._stopped_conversations.discard(conversation_id)
81+
7982
return removed
8083

84+
def is_stopped(self, conversation_id: str) -> bool:
85+
"""Check if a conversation was explicitly stopped by the user."""
86+
with self._lock:
87+
return conversation_id in self._stopped_conversations
88+
8189
def get_conversation_info(self, conversation_id: str) -> Optional[Dict]:
8290
"""
8391
Get information about an active conversation
@@ -147,6 +155,10 @@ async def stop_conversation(
147155
)
148156
return False
149157

158+
# Mark as explicitly stopped so the stream handler knows not to
159+
# continue processing in the background.
160+
self._stopped_conversations.add(conversation_id)
161+
150162
# Get the task if it exists
151163
task = self._conversation_tasks.get(conversation_id)
152164

agixt/XT.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4988,6 +4988,20 @@ async def _next_event(iterator):
49884988
yield f"data: {json.dumps(tts_end_chunk)}\n\n"
49894989

49904990
except asyncio.CancelledError:
4991+
# Check if user explicitly stopped this conversation
4992+
if worker_registry.is_stopped(conversation_id):
4993+
logging.info(
4994+
f"[_execute_chat_completions_stream] Conversation {conversation_id} "
4995+
f"stopped by user — aborting stream."
4996+
)
4997+
# Close the stream iterator without draining
4998+
if stream_iter is not None:
4999+
try:
5000+
await stream_iter.aclose()
5001+
except Exception:
5002+
pass
5003+
raise
5004+
49915005
logging.warning(
49925006
f"[_execute_chat_completions_stream] CancelledError for conversation {conversation_id}. "
49935007
f"Detaching run_stream to continue in background."
@@ -5032,6 +5046,19 @@ async def _drain_stream_background(stream, pending, conv_id, _SEND):
50325046
)
50335047
raise
50345048
except GeneratorExit:
5049+
# Check if user explicitly stopped this conversation
5050+
if worker_registry.is_stopped(conversation_id):
5051+
logging.info(
5052+
f"[_execute_chat_completions_stream] GeneratorExit: conversation {conversation_id} "
5053+
f"stopped by user — aborting stream."
5054+
)
5055+
if stream_iter is not None:
5056+
try:
5057+
await stream_iter.aclose()
5058+
except Exception:
5059+
pass
5060+
return
5061+
50355062
logging.warning(
50365063
f"[_execute_chat_completions_stream] GeneratorExit for conversation {conversation_id}. "
50375064
f"Detaching run_stream to continue in background."

0 commit comments

Comments
 (0)