From 33c5d11341e216145bc8c251e5ebd59a8544a5e6 Mon Sep 17 00:00:00 2001 From: Runzhe Zhang Date: Wed, 29 Oct 2025 17:51:25 +0800 Subject: [PATCH 1/6] Update coordinator.py check pending session --- .../response_coordinator/coordinator.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 98e0ea91efbcde..13b4d69aac67d5 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -510,6 +510,20 @@ def end_session(self, node_id: str) -> list[NodeRunStreamChunkEvent]: events = self.try_flush() return events + + def has_pending_sessions(self) -> bool: + """ + Check if there are any pending response sessions. + + Returns: + True if there are active, waiting, or未激活的 response sessions + """ + with self._lock: + return ( + self._active_session is not None + or len(self._waiting_sessions) > 0 + or len(self._response_sessions) > 0 + ) # ============= Internal Stream Management Methods ============= From 2b90123d6e473a4594b2c497cd213df032343416 Mon Sep 17 00:00:00 2001 From: Runzhe Zhang Date: Wed, 29 Oct 2025 17:53:08 +0800 Subject: [PATCH 2/6] Update graph_state_manager.py Optional response stream coordinator for checking pending sessions --- api/core/workflow/graph_engine/graph_state_manager.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/api/core/workflow/graph_engine/graph_state_manager.py b/api/core/workflow/graph_engine/graph_state_manager.py index 22a3a826fcd1c9..a6645bbeb1363e 100644 --- a/api/core/workflow/graph_engine/graph_state_manager.py +++ b/api/core/workflow/graph_engine/graph_state_manager.py @@ -4,13 +4,16 @@ import threading from collections.abc import Sequence -from typing import TypedDict, final +from typing import TYPE_CHECKING, TypedDict, final from core.workflow.enums import NodeState from core.workflow.graph import Edge, Graph from .ready_queue import ReadyQueue +if TYPE_CHECKING: + from .response_coordinator import ResponseStreamCoordinator + class EdgeStateAnalysis(TypedDict): """Analysis result for edge states.""" @@ -22,16 +25,20 @@ class EdgeStateAnalysis(TypedDict): @final class GraphStateManager: - def __init__(self, graph: Graph, ready_queue: ReadyQueue) -> None: + def __init__( + self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None + ) -> None: """ Initialize the state manager. Args: graph: The workflow graph ready_queue: Queue for nodes ready to execute + response_coordinator: Optional response stream coordinator for checking pending sessions """ self._graph = graph self._ready_queue = ready_queue + self._response_coordinator = response_coordinator self._lock = threading.RLock() # Execution tracking state From 635e55e7ca1aaceb172265ad5a7bd3b3ca11cd88 Mon Sep 17 00:00:00 2001 From: Runzhe Zhang Date: Wed, 29 Oct 2025 17:55:27 +0800 Subject: [PATCH 3/6] Update graph_state_manager.py add check pending sesstion in executuion --- api/core/workflow/graph_engine/graph_state_manager.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/api/core/workflow/graph_engine/graph_state_manager.py b/api/core/workflow/graph_engine/graph_state_manager.py index a6645bbeb1363e..fabf8fb0ed46e6 100644 --- a/api/core/workflow/graph_engine/graph_state_manager.py +++ b/api/core/workflow/graph_engine/graph_state_manager.py @@ -258,12 +258,21 @@ def is_execution_complete(self) -> bool: Execution is complete when: - Ready queue is empty - No nodes are executing + - No pending response sessions (if response coordinator is available) Returns: True if execution is complete """ with self._lock: - return self._ready_queue.empty() and len(self._executing_nodes) == 0 + basic_complete = self._ready_queue.empty() and len(self._executing_nodes) == 0 + + if not basic_complete: + return False + + if self._response_coordinator and self._response_coordinator.has_pending_sessions(): + return False + + return True def get_queue_depth(self) -> int: """ From 7d9200d2a260edfef6948d43e3d41324b015a727 Mon Sep 17 00:00:00 2001 From: Runzhe Zhang Date: Wed, 29 Oct 2025 17:56:38 +0800 Subject: [PATCH 4/6] Update graph_engine.py --- api/core/workflow/graph_engine/graph_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index dd2ca3f93b4237..8127d82557e1cc 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -96,7 +96,7 @@ def __init__( # === State Management === # Unified state manager handles all node state transitions and queue operations - self._state_manager = GraphStateManager(self._graph, self._ready_queue) + self._state_manager = GraphStateManager(self._graph, self._ready_queue, self._response_coordinator) # === Response Coordination === # Coordinates response streaming from response nodes From 95e3dd4c0e7fd63699ddbcd482c06a2fb5fc071a Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 30 Oct 2025 01:35:46 +0000 Subject: [PATCH 5/6] [autofix.ci] apply automated fixes --- .../graph_engine/response_coordinator/coordinator.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 13b4d69aac67d5..378a2fa983a61d 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -510,7 +510,7 @@ def end_session(self, node_id: str) -> list[NodeRunStreamChunkEvent]: events = self.try_flush() return events - + def has_pending_sessions(self) -> bool: """ Check if there are any pending response sessions. @@ -520,9 +520,7 @@ def has_pending_sessions(self) -> bool: """ with self._lock: return ( - self._active_session is not None - or len(self._waiting_sessions) > 0 - or len(self._response_sessions) > 0 + self._active_session is not None or len(self._waiting_sessions) > 0 or len(self._response_sessions) > 0 ) # ============= Internal Stream Management Methods ============= From 82421dc7ce50a1772d81c3b3d78e03f56843585a Mon Sep 17 00:00:00 2001 From: Runzhe Zhang Date: Mon, 3 Nov 2025 19:33:21 +0800 Subject: [PATCH 6/6] Update api/core/workflow/graph_engine/response_coordinator/coordinator.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../workflow/graph_engine/response_coordinator/coordinator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 378a2fa983a61d..0995dfdca4f027 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -516,7 +516,7 @@ def has_pending_sessions(self) -> bool: Check if there are any pending response sessions. Returns: - True if there are active, waiting, or未激活的 response sessions + True if there are active, waiting, or inactive response sessions """ with self._lock: return (