Skip to content

WIP [P/D] Use ThreadPoolExecutor to do handshake for each P-D pair #19823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

lk-chen
Copy link
Collaborator

@lk-chen lk-chen commented Jun 19, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

Split from #19447, this PR keeps using zmq for nixl metadata transfer, but uses ThreadPoolExecutor to do _nixl_handshake in background.

closes #19777

Test Plan

Unit test WIP

Test Result

(Optional) Documentation Update

Signed-off-by: Linkun Chen <[email protected]>
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@lk-chen lk-chen changed the title [P/D] Use ThreadPoolExecutor to do handshake for each P-D pair WIP [P/D] Use ThreadPoolExecutor to do handshake for each P-D pair Jun 19, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @lk-chen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the efficiency of KV cache transfer by offloading the nixl_handshake process to a background thread pool. This change ensures that handshake operations, which can be time-consuming, no longer block the main execution flow, leading to smoother and more responsive KV transfer initiation. It also refines the reporting of KV transfer statuses to include requests that are pending handshake completion.

Highlights

  • Asynchronous Handshake: Implemented _nixl_handshake using ThreadPoolExecutor to perform handshakes in the background, preventing blocking of the main thread during KV transfer operations. This allows the system to initiate handshakes for new P-D (Producer-Consumer) pairs without waiting for completion.
  • Enhanced KVTransferResult: The KVTransferResult dataclass has been extended to include pending_handshake information, providing a more comprehensive status of KV transfer requests, including those awaiting handshake completion.
  • Request Management for Handshakes: Introduced mechanisms (_handshake_futures, _pending_requests, _ready_requests) to manage requests that are waiting for a handshake to complete or are ready to proceed after a successful handshake. Requests are queued and processed asynchronously.
  • Multi-Rank Coordination Update: The coordination logic for get_finished across multiple Tensor Parallel (TP) ranks has been updated to correctly aggregate and report pending_handshake statuses in addition to finished sending and receiving requests.
  • Type Hinting Improvements: Added EngineId and ReqId type aliases and applied them consistently across relevant classes and methods (ReqMeta, NixlConnectorMetadata, NixlConnectorWorker, etc.) for improved code clarity and maintainability.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the NIXL connector to perform handshakes asynchronously using a ThreadPoolExecutor. This is a positive change for non-blocking behavior. The get_finished interface is updated to return KVTransferResult, incorporating pending handshake information. My review focuses on the correctness of the asynchronous handshake logic, thread safety, error handling, and overall clarity.

Comment on lines +487 to +489
# Clean up futures. In case of failure, requests will remain
# pending and be reported to scheduler for retry.
del self._handshake_futures[engine_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The comment "In case of failure, requests will remain pending and be reported to scheduler for retry" seems to conflict with the action in the except block (line 504) where self._pending_requests.pop(engine_id, None) is called. Please clarify if the intent is for these specific requests to be retried by the scheduler or if they should remain in _pending_requests.

Comment on lines 160 to 165
def get_pending_handshake_req_ids(self) -> set[str]:
"""Get request IDs that are currently pending handshake completion."""
if self.connector_worker is not None:
result = self.connector_worker.get_finished(set())
return result.pending_handshake
return set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method calls self.connector_worker.get_finished(set()) primarily to extract pending_handshake IDs. Consider if NixlConnectorWorker could offer a more direct way to query only the handshake-related state, which would make the intent clearer and potentially more efficient.

@@ -178,7 +190,7 @@ class NixlConnectorScheduler:
def __init__(self, vllm_config: VllmConfig, engine_id: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the type hint of self.engine_id (which is EngineId), consider typing the engine_id parameter in the __init__ signature as EngineId as well.

Suggested change
def __init__(self, vllm_config: VllmConfig, engine_id: str):
def __init__(self, vllm_config: VllmConfig, engine_id: EngineId):

Comment on lines +438 to +439
if self._nixl_handshake_listener_t:
self._nixl_handshake_listener_t.join(timeout=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Joining the _nixl_handshake_listener_t (a daemon thread) with timeout=0 in __del__ is a best-effort cleanup. Consider implementing an explicit stop mechanism for _nixl_handshake_listener_t for a more graceful shutdown.

Comment on lines +469 to +472
"""Start handshake using ThreadPoolExecutor.

This method is non-blocking and submits `_nixl_handshake` to the
background thread pool.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring is helpful. It could be slightly more explicit by mentioning that self._pending_requests should ideally be populated with any requests for this engine_id before calling _start_handshake.

Comment on lines +497 to +500
logger.debug(
"Handshake completed for engine %s. "
"Moved %d requests to ready queue for processing",
engine_id, len(completed_reqs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logging message here has extra leading spaces in the format string. This might be unintentional and could affect log formatting.

Suggested change
logger.debug(
"Handshake completed for engine %s. "
"Moved %d requests to ready queue for processing",
engine_id, len(completed_reqs))
logger.debug(
"Handshake completed for engine %s. "
"Moved %d requests to ready queue for processing",
engine_id, len(completed_reqs))

Comment on lines +506 to +509
logger.warning(
"Handshake failed for engine %s, leaving"
"%d requests pending for scheduler retry",
engine_id, len(failed_reqs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message "leaving %d requests pending for scheduler retry" could be slightly misleading. Since _pending_requests.pop(engine_id, None) was called on line 504, these failed_reqs are no longer in self._pending_requests. Consider rephrasing for clarity.

Comment on lines +832 to +835
with self._lock:
pending_handshake = set()
for pending_reqs in self._pending_requests.values():
pending_handshake.update({req_id for req_id, _ in pending_reqs})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Accessing self._pending_requests is correctly done under self._lock. The logic to collect all req_id from the values of _pending_requests (which are lists of tuples) into the pending_handshake set is also correct.

Comment on lines +861 to +863
rank_data = self.tp_group.recv_object(src=i)
other_rank_result = KVTransferResult.from_tuple(
rank_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code now expects self.tp_group.recv_object(src=i) to return data that can be converted into a KVTransferResult using KVTransferResult.from_tuple(). Ensure these methods are robust and correctly handle all fields of KVTransferResult, including the new pending_handshake set.

lk-chen added 2 commits June 18, 2025 17:40
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
lk-chen added 2 commits June 18, 2025 19:02
Signed-off-by: Linkun Chen <[email protected]>
Signed-off-by: Linkun Chen <[email protected]>
Copy link

mergify bot commented Jun 19, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @lk-chen.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 19, 2025
@lk-chen
Copy link
Collaborator Author

lk-chen commented Jun 19, 2025

close in favor of #19836

@lk-chen lk-chen closed this Jun 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: nixl handshake is slow and can accumulate in a batch of requests where each request is pulling from a different prefill node
1 participant