diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 7071a1f33a9a5a..5571052203927b 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -96,7 +96,7 @@ def __init__( # === State Management === # Unified state manager handles all node state transitions and queue operations - self._state_manager = GraphStateManager(self._graph, self._ready_queue) + self._state_manager = GraphStateManager(self._graph, self._ready_queue, self._response_coordinator) # === Response Coordination === # Coordinates response streaming from response nodes diff --git a/api/core/workflow/graph_engine/graph_state_manager.py b/api/core/workflow/graph_engine/graph_state_manager.py index 22a3a826fcd1c9..fabf8fb0ed46e6 100644 --- a/api/core/workflow/graph_engine/graph_state_manager.py +++ b/api/core/workflow/graph_engine/graph_state_manager.py @@ -4,13 +4,16 @@ import threading from collections.abc import Sequence -from typing import TypedDict, final +from typing import TYPE_CHECKING, TypedDict, final from core.workflow.enums import NodeState from core.workflow.graph import Edge, Graph from .ready_queue import ReadyQueue +if TYPE_CHECKING: + from .response_coordinator import ResponseStreamCoordinator + class EdgeStateAnalysis(TypedDict): """Analysis result for edge states.""" @@ -22,16 +25,20 @@ class EdgeStateAnalysis(TypedDict): @final class GraphStateManager: - def __init__(self, graph: Graph, ready_queue: ReadyQueue) -> None: + def __init__( + self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None + ) -> None: """ Initialize the state manager. Args: graph: The workflow graph ready_queue: Queue for nodes ready to execute + response_coordinator: Optional response stream coordinator for checking pending sessions """ self._graph = graph self._ready_queue = ready_queue + self._response_coordinator = response_coordinator self._lock = threading.RLock() # Execution tracking state @@ -251,12 +258,21 @@ def is_execution_complete(self) -> bool: Execution is complete when: - Ready queue is empty - No nodes are executing + - No pending response sessions (if response coordinator is available) Returns: True if execution is complete """ with self._lock: - return self._ready_queue.empty() and len(self._executing_nodes) == 0 + basic_complete = self._ready_queue.empty() and len(self._executing_nodes) == 0 + + if not basic_complete: + return False + + if self._response_coordinator and self._response_coordinator.has_pending_sessions(): + return False + + return True def get_queue_depth(self) -> int: """ diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 98e0ea91efbcde..0995dfdca4f027 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -511,6 +511,18 @@ def end_session(self, node_id: str) -> list[NodeRunStreamChunkEvent]: return events + def has_pending_sessions(self) -> bool: + """ + Check if there are any pending response sessions. + + Returns: + True if there are active, waiting, or inactive response sessions + """ + with self._lock: + return ( + self._active_session is not None or len(self._waiting_sessions) > 0 or len(self._response_sessions) > 0 + ) + # ============= Internal Stream Management Methods ============= def _append_stream_chunk(self, selector: Sequence[str], event: NodeRunStreamChunkEvent) -> None: