Skip to content

Conversation

@runzhezhang11
Copy link

Important

  1. Make sure you have read our contribution guidelines
  2. Ensure there is an associated issue and you have been assigned to it
  3. Use the correct syntax to link this PR: Fixes #<issue number>.

Summary

Details

  • Add has_pending_sessions() method to ResponseStreamCoordinator to check for active/waiting/pending response sessions
  • Update GraphStateManager.is_execution_complete() to consider pending response sessions
  • Pass response_coordinator to GraphStateManager in GraphEngine
  • Add comprehensive unit tests for the fix

This fixes an issue where workflows would terminate prematurely when:

  1. IF/ELSE node has parallel branches
  2. One branch contains an Answer node alongside other nodes (e.g., LLM)
  3. Answer node completes before the parallel nodes

The workflow now correctly waits for all parallel nodes to complete before terminating.

Checklist

  • This change requires a documentation update, included: Dify Document
  • I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
  • I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • I've updated the documentation accordingly.
  • I ran dev/reformat(backend) and cd web && npx lint-staged(frontend) to appease the lint gods

check pending session
Optional response stream coordinator for checking pending sessions
add check pending sesstion in executuion
Copilot AI review requested due to automatic review settings October 29, 2025 09:58
@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Oct 29, 2025
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @runzhezhang11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical bug in workflow execution where workflows with parallel branches, particularly those involving an Answer node, could terminate prematurely. By enhancing the GraphStateManager to consider the state of response sessions, the system now correctly waits for all parallel operations to conclude, ensuring robust and complete workflow execution.

Highlights

  • Workflow Termination Logic: Modified the GraphStateManager to prevent premature workflow termination, ensuring that all parallel nodes in an IF/ELSE branch complete before the workflow finishes, even if an Answer node finishes early.
  • Response Session Tracking: Introduced a new method has_pending_sessions() in ResponseStreamCoordinator to accurately check for active, waiting, or pending response sessions.
  • State Manager Integration: The ResponseStreamCoordinator is now passed to the GraphStateManager during initialization, allowing the state manager to account for pending response sessions when determining workflow completion.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a premature workflow termination issue that occurs when an Answer node in one branch of an IF/ELSE statement completes before parallel nodes in another branch. The fix ensures workflows properly wait for all parallel nodes to complete by tracking pending response sessions.

Key Changes:

  • Added response session tracking to prevent early termination when Answer nodes complete before parallel nodes
  • Enhanced completion check to consider pending response sessions alongside queue and execution state
  • Introduced comprehensive unit tests to validate the fix

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
api/core/workflow/graph_engine/response_coordinator/coordinator.py Adds has_pending_sessions() method to check for active, waiting, or pending response sessions
api/core/workflow/graph_engine/graph_state_manager.py Updates completion logic to consider pending response sessions and accepts response coordinator as constructor parameter
api/core/workflow/graph_engine/graph_engine.py Passes response coordinator to GraphStateManager during initialization

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a critical bug where workflows were prematurely terminating when an Answer node completed before parallel nodes in an IF/ELSE branch. The changes include adding a has_pending_sessions() method to ResponseStreamCoordinator, updating GraphStateManager.is_execution_complete() to consider pending response sessions, and passing the response_coordinator to GraphStateManager in GraphEngine. Comprehensive unit tests have also been added to validate the fix.

# === State Management ===
# Unified state manager handles all node state transitions and queue operations
self._state_manager = GraphStateManager(self._graph, self._ready_queue)
self._state_manager = GraphStateManager(self._graph, self._ready_queue, self._response_coordinator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _response_coordinator is not initialized before being passed to the GraphStateManager. This could lead to a NameError if GraphStateManager tries to use it before it's assigned. Ensure _response_coordinator is initialized before _state_manager.

Comment on lines +29 to +30
self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None
) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Consider adding a type annotation for response_coordinator to improve code readability and maintainability. This makes it explicit that the coordinator is optional.

    def __init__(
        self, graph: Graph, ready_queue: ReadyQueue, response_coordinator: "ResponseStreamCoordinator | None" = None
    ) -> None:

@runzhezhang11
Copy link
Author

@copilot open a new pull request to apply changes based on the comments in this thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant