-
Notifications
You must be signed in to change notification settings - Fork 77
nixl: add lancher and proxy server for PD workloads #635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds NIXL (Network Interconnect eXtension Layer) support for disaggregated prefill-decode workloads in vLLM, introducing a proxy server and launcher script to orchestrate distributed inference with separate prefill and decode instances.
Key changes:
- Adds a comprehensive proxy server implementation with round-robin and load-balanced scheduling policies
- Implements a bash launcher script for deploying disaggregated vLLM server instances
- Provides OpenAI-compatible API endpoints with KV cache transfer capabilities
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 9 comments.
| File | Description |
|---|---|
| examples/nixl/proxy_server.py | Implements FastAPI-based proxy server with request routing, load balancing, and KV cache transfer orchestration |
| examples/nixl/disaggregated_prefill_server_launcher | Bash script to launch and configure prefill/decode server instances with NIXL connectivity |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def D_first_token_generator( | ||
| generator_d, | ||
| callback_owner=None, | ||
| decode_instance: str = None, | ||
| req_len: int = None, | ||
| ): |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function name 'D_first_token_generator' is unclear. Consider renaming to 'decode_first_token_generator' to make the purpose more explicit.
| with self.lock: | ||
| return next(cycler) | ||
|
|
||
| def schedule(self, cycler: itertools.cycle, request: Optional[dict[str, any]] = None) -> str: |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type hint uses 'any' instead of 'Any'. This will cause a NameError at runtime as 'any' is not a valid type. Change to 'Any' and ensure it's imported from typing.
| try: | ||
| error_content = json.loads(error_content) | ||
| except json.JSONDecodeError: | ||
| error_content = error_content |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is redundant as it assigns 'error_content' to itself. Remove this line as the JSONDecodeError exception already leaves 'error_content' with the text value.
| error_content = error_content | |
| pass |
| self, instance: str, endpoint: str, req_data: dict, request_id: str | ||
| ): # yapf: disable | ||
| """ | ||
| Send a request to a service using a client from the pool. |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring only contains a generic description. It should document the parameters (instance, endpoint, req_data, request_id) and return value (response object), especially since this method modifies req_data with KV transfer parameters.
| Send a request to a service using a client from the pool. | |
| Send a request to a service using a client from the pool. | |
| Args: | |
| instance (str): The hostname or IP address of the service instance to send the request to. | |
| endpoint (str): The API endpoint path to send the request to. | |
| req_data (dict): The request payload. This method copies and modifies this dictionary with KV transfer parameters. | |
| request_id (str): A unique identifier for the request, used for tracking. | |
| Returns: | |
| httpx.Response: The response object returned by the service. | |
| Side Effects: | |
| The input req_data is copied and modified with KV transfer parameters before sending. |
| client = httpx.AsyncClient(timeout=None, base_url=prefiller_base_url) | ||
|
|
||
| response = await client.post( | ||
| endpoint, json=req_data, headers=headers | ||
| ) # yapf: disable | ||
| response.raise_for_status() | ||
|
|
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The httpx.AsyncClient is created but never explicitly closed, which can lead to resource leaks. Use an async context manager (async with) to ensure proper cleanup.
| client = httpx.AsyncClient(timeout=None, base_url=prefiller_base_url) | |
| response = await client.post( | |
| endpoint, json=req_data, headers=headers | |
| ) # yapf: disable | |
| response.raise_for_status() | |
| async with httpx.AsyncClient(timeout=None, base_url=prefiller_base_url) as client: | |
| response = await client.post( | |
| endpoint, json=req_data, headers=headers | |
| ) # yapf: disable | |
| response.raise_for_status() |
| print("Error occurred in disagg proxy server") | ||
| print(exc_info) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using print() for error reporting bypasses the configured logging system. Use 'logger.error()' instead for consistency with the rest of the code and to ensure errors are properly formatted and can be captured by logging infrastructure.
| print("Error occurred in disagg proxy server") | |
| print(exc_info) | |
| logger.error("Error occurred in disagg proxy server", exc_info=exc_info) |
| print("Error occurred in disagg proxy server") | ||
| print(error_messages) | ||
| return StreamingResponse(content=iter(error_messages), media_type="application/json") |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using print() for error reporting bypasses the configured logging system. Use 'logger.error()' instead. Also, returning error_messages in StreamingResponse may not produce valid JSON; consider returning a proper JSONResponse with an error status code.
| print("Error occurred in disagg proxy server") | |
| print(error_messages) | |
| return StreamingResponse(content=iter(error_messages), media_type="application/json") | |
| logger.error("Error occurred in disagg proxy server: %s", error_messages) | |
| return JSONResponse(content={"error": error_messages}, status_code=500) |
| self.validate_instances(decodes) | ||
| self.verify_model_config(decodes, args.model) |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable 'args' is not defined in this method scope. The method signature receives 'prefills' and 'decodes' lists, but 'args.model' references an undefined variable. This should likely be 'self.proxy_instance.model' or the model should be passed as a parameter.
| for i in $(seq 0 $((NUM_LOCAL_INSTANCES-1))); do | ||
| # Calculate port number (base port + instance number) | ||
| PORT=$((BASE_PORT+8*NODE_RANK+i)) | ||
| # Calculate side channel port. Avoid clash with with TP workers. |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling: 'with with' contains a duplicate word. Should be 'with'.
| # Calculate side channel port. Avoid clash with with TP workers. | |
| # Calculate side channel port. Avoid clash with TP workers. |
🚧 CI BlockedThe main CI workflow was not started for the following reason:
|
2e4d6d5 to
cf5ee2a
Compare
🚧 CI BlockedThe main CI workflow was not started for the following reason:
|
cf5ee2a to
072ce61
Compare
🚧 CI BlockedThe main CI workflow was not started for the following reason:
|
Signed-off-by: Xinyu Chen <[email protected]>
Uh oh!
There was an error while loading. Please reload this page.