-
Notifications
You must be signed in to change notification settings - Fork 269
router and infer parrall. #965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @hiworldwzj, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant architectural refactoring to enhance the parallelism and efficiency of the LightLLM serving system. It transitions the communication paradigm between the router and inference components from a request-response RPC model to a continuous, shared-memory-driven approach. This change aims to reduce communication latency, streamline request processing, and provide a more robust mechanism for handling request lifecycle events like abortion.
Highlights
- Inter-process Communication Overhaul: Replaced RPC-based communication between the router and inference processes with a shared-memory-based
ShmReqsIOBuffer
for improved parallelism and reduced overhead. This new mechanism allows for more direct and efficient data exchange. - Continuous Inference Loop: Introduced a persistent
infer_loop
within the inference backend that continuously processes requests and commands from the shared memory. This eliminates the need for explicit RPC calls for prefill, decode, and pause operations, streamlining the inference pipeline. - Enhanced Abort Handling: Implemented a dedicated
AbortedReqCmd
dataclass and a shared memory mechanism for robustly signaling and processing aborted requests. The router now explicitly marks requests asrouter_aborted
via the shared buffer, ensuring timely and consistent cleanup by the inference processes. - Simplified Scheduling and Pausing: Removed the complex, event-driven pausing and scheduling logic, including
pause_strategy
andschedule_event
. The new batch generation in the router manager now proceeds only when no requests are paused, simplifying the overall scheduling mechanism. - Node-level Synchronization: Added new distributed group and broadcast primitives (
node_broadcast_tensor
,node_nccl_group
) to facilitate efficient synchronization of shared memory operations across inference processes within the same node.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural change by refactoring the interaction between the router and the inference engine. It moves from an RPC-based prefill/decode cycle to a continuous batching model where the inference engine runs its own loop. Communication is now handled via a shared memory buffer (ShmReqsIOBuffer
), which should improve performance by reducing RPC overhead. The changes are extensive, touching routing, request queuing, and the model inference backend.
My review has identified a few areas for improvement, primarily concerning code clarity and robustness. I've suggested simplifying some logic to be more idiomatic and readable. More critically, I've pointed out the use of bare except
blocks which can hide bugs and should be replaced with specific exception handling.
try: | ||
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | ||
except: | ||
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a bare except:
is a bad practice as it can catch unexpected exceptions (like KeyboardInterrupt
or SystemExit
) and hide bugs. It's better to catch specific exceptions. In this case, you should catch FileExistsError
.
try: | |
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
except: | |
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
try: | |
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
except FileExistsError: | |
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) |
try: | ||
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | ||
logger.info(f"create shm {self.name}") | ||
except: | ||
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | ||
logger.info(f"link shm {self.name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the previous point, the bare except:
here should be replaced with a specific exception, FileExistsError
, to avoid masking other potential errors and improve code robustness.
try: | |
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
logger.info(f"create shm {self.name}") | |
except: | |
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
logger.info(f"link shm {self.name}") | |
try: | |
shm = shared_memory.SharedMemory(name=self.name, create=True, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
logger.info(f"create shm {self.name}") | |
except FileExistsError: | |
shm = shared_memory.SharedMemory(name=self.name, create=False, size=LIGHTLLM_REQS_BUFFER_BYTE_SIZE) | |
logger.info(f"link shm {self.name}") |
def _get_paused_req_num(self) -> int: | ||
if self.running_batch is None: | ||
return 0 | ||
else: | ||
count = 0 | ||
for req in self.running_batch.reqs: | ||
if req.is_paused: | ||
count += 1 | ||
return count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _get_paused_req_num_in_dp_index(self, dp_index: int) -> int: | ||
if self.running_batch is None: | ||
return 0 | ||
else: | ||
count = 0 | ||
for req in self.running_batch.reqs: | ||
if req.is_paused and req.sample_params.suggested_dp_index == dp_index: | ||
count += 1 | ||
return count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to _get_paused_req_num
, this function can be made more concise and readable by using sum()
with a generator expression.
def _get_paused_req_num_in_dp_index(self, dp_index: int) -> int:
if self.running_batch is None:
return 0
return sum(
req.is_paused and req.sample_params.suggested_dp_index == dp_index for req in self.running_batch.reqs
)
No description provided.