Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/core/workflow/graph_engine/graph_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(

# === State Management ===
# Unified state manager handles all node state transitions and queue operations
self._state_manager = GraphStateManager(self._graph, self._ready_queue)
self._state_manager = GraphStateManager(self._graph, self._ready_queue, self._response_coordinator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _response_coordinator is not initialized before being passed to the GraphStateManager. This could lead to a NameError if GraphStateManager tries to use it before it's assigned. Ensure _response_coordinator is initialized before _state_manager.


# === Response Coordination ===
# Coordinates response streaming from response nodes
Expand Down
22 changes: 19 additions & 3 deletions api/core/workflow/graph_engine/graph_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

import threading
from collections.abc import Sequence
from typing import TypedDict, final
from typing import TYPE_CHECKING, TypedDict, final

from core.workflow.enums import NodeState
from core.workflow.graph import Edge, Graph

from .ready_queue import ReadyQueue

if TYPE_CHECKING:
from .response_coordinator import ResponseStreamCoordinator


class EdgeStateAnalysis(TypedDict):
"""Analysis result for edge states."""
Expand All @@ -22,16 +25,20 @@ class EdgeStateAnalysis(TypedDict):

@final
class GraphStateManager:
def __init__(self, graph: Graph, ready_queue: ReadyQueue) -> None:
def __init__(
self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None
) -> None:
Comment on lines +29 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a type annotation for response_coordinator to improve code readability and maintainability. This makes it explicit that the coordinator is optional.

    def __init__(
        self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None
    ) -> None:

"""
Initialize the state manager.

Args:
graph: The workflow graph
ready_queue: Queue for nodes ready to execute
response_coordinator: Optional response stream coordinator for checking pending sessions
"""
self._graph = graph
self._ready_queue = ready_queue
self._response_coordinator = response_coordinator
self._lock = threading.RLock()

# Execution tracking state
Expand Down Expand Up @@ -251,12 +258,21 @@ def is_execution_complete(self) -> bool:
Execution is complete when:
- Ready queue is empty
- No nodes are executing
- No pending response sessions (if response coordinator is available)

Returns:
True if execution is complete
"""
with self._lock:
return self._ready_queue.empty() and len(self._executing_nodes) == 0
basic_complete = self._ready_queue.empty() and len(self._executing_nodes) == 0

if not basic_complete:
return False

if self._response_coordinator and self._response_coordinator.has_pending_sessions():
return False

return True

def get_queue_depth(self) -> int:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,20 @@ def end_session(self, node_id: str) -> list[NodeRunStreamChunkEvent]:
events = self.try_flush()

return events

def has_pending_sessions(self) -> bool:
"""
Check if there are any pending response sessions.

Returns:
True if there are active, waiting, or未激活的 response sessions
"""
with self._lock:
return (
self._active_session is not None
or len(self._waiting_sessions) > 0
or len(self._response_sessions) > 0
)

# ============= Internal Stream Management Methods =============

Expand Down
Loading