Skip to content

Draft: WIP NixlConnector drop ZMQ in favor of HTTP metadata exchanges #19447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

wseaton
Copy link
Contributor

@wseaton wseaton commented Jun 10, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

Don't use a custom ZMQ side channel socker for nixl agent registration, instead switch to http transport.

TODO(s)

  • Completely deprecate env.NIXL_SIDE_CHANNEL_PORT
  • Scheduler changes to move the agent registration out of _add_blocks
  • Authentication on the route, move it out of the dev server args

Test Plan

TODO

Test Result

TODO

(Optional) Documentation Update

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@mergify mergify bot added the frontend label Jun 10, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @wseaton, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request is a work-in-progress draft aimed at replacing the custom ZMQ side channel used by the Nixl KV connector for exchanging metadata with an HTTP-based approach. This involves adding a new API endpoint to serve metadata, updating the Nixl connector to use HTTP requests, and plumbing the metadata collection and distribution through the system configuration.

Highlights

  • Replace ZMQ with HTTP for Nixl Metadata: The core change is migrating the Nixl KV connector's side channel for exchanging agent metadata from a custom ZMQ implementation to a standard HTTP endpoint. This simplifies the communication mechanism.
  • New HTTP Endpoint: A new GET endpoint /get_kv_connector_metadata is added to the API server (vllm/entrypoints/openai/api_server.py) to serve the necessary KV connector handshake metadata collected from the workers.
  • Metadata Propagation: KV connector handshake metadata is now collected from the workers by the executor, passed up to the engine, stored in the vllm_config.cache_config, and included in the initial handshake message sent back to the launcher/utils during engine startup.
  • Async HTTP Handshake: The Nixl worker's handshake logic (_nixl_handshake) is updated to use aiohttp for making asynchronous HTTP requests to fetch metadata from remote engines via the new API endpoint, replacing the previous ZMQ request/reply pattern.
  • Base Connector Updates: The base KV connector class (vllm/distributed/kv_transfer/kv_connector/v1/base.py) is updated to include a dedicated KVConnectorHandshakeMetadata struct and methods (set_handshake_metadata, get_handshake_metadata) to manage this out-of-band information.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configureGemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@mergify mergify bot added the v1 label Jun 10, 2025
Copy link

mergify bot commented Jun 10, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @wseaton.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 10, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request makes significant changes to the NixlConnector by replacing ZMQ-based metadata exchange with an HTTP-based mechanism. This involves adding new metadata structures (KVConnectorHandshakeMetadata), modifying existing configuration classes (CacheConfig, EngineCore) to store and propagate this metadata, and introducing a new API endpoint (/get_kv_connector_metadata) to serve it. The core NIXL handshake logic in NixlConnectorWorker now uses aiohttp for asynchronous HTTP requests.

Key areas of feedback:

  • Security: The new HTTP endpoint needs authentication, as noted in a TODO. This is critical.
  • Asynchronous Operations: The use of asyncio.run() in a potentially performance-sensitive path (_read_blocks) should be reviewed. The TODO to move agent registration out of _add_blocks might address this.
  • DP Rank Logic: A FIXME comment in _nixl_handshake regarding Data Parallel rank determination needs to be addressed to ensure robustness across different deployment scenarios.
  • Error Handling and Robustness: Ensure robust error handling for HTTP requests and consider configurability for timeouts.

Overall, the changes seem to be a good step towards simplifying the inter-process communication for metadata exchange. Addressing the highlighted points, especially security and the FIXMEs, will be important.

Comment on lines 416 to 478
# FIXME(weaton): Need to figure out the logic for the right DP rank to
# query. Currently, we assume that the metadata is sent by the
# first DP rank in the TP group.
# Get the data for this tensor parallel rank
tp_data = res.get(str(self.tp_rank), {}).get("0", {})
metadata_bytes = tp_data.get("agent_metadata", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The FIXME(weaton) comment highlights a potential issue with determining the correct Data Parallel (DP) rank to query. This logic (res.get(str(self.tp_rank), {}).get("0", {})) assumes metadata is always provided by DP rank 0 within a TP group. This assumption needs to be robust or made configurable if different DP ranks might host the metadata. This could lead to incorrect handshakes if the assumption doesn't hold in all deployment scenarios.

Comment on lines 872 to 874
@router.get("/get_kv_connector_metadata")
async def get_kv_connector_metadata(raw_request: Request):
kv_connector_metadata = raw_request.app.state.vllm_config.cache_config.transfer_handshake_metadata
return JSONResponse(content=kv_connector_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new endpoint /get_kv_connector_metadata is added. This endpoint exposes transfer_handshake_metadata from the CacheConfig.

Security Concern: The PR description includes a TODO: "Authentication on the route, move it out of the dev server args". This is critical. Exposing this metadata without authentication could be a security risk, as it might reveal internal cluster configuration or state. This endpoint should be protected, especially in production environments.

@@ -722,7 +713,7 @@ def _read_blocks(
):
# NOTE(rob): this takes ~2s. We need to get this off the hotpath.
if dst_engine_id not in self._remote_agents:
self._nixl_handshake(remote_host, remote_port)
asyncio.run(self._nixl_handshake(remote_host, remote_port))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling asyncio.run(self._nixl_handshake(remote_host, remote_port)) here is a significant change. Previously, _nixl_handshake was synchronous (using ZMQ). Now it's an async function.

asyncio.run() creates a new event loop, runs the coroutine, and then closes the loop. If _read_blocks (and by extension, this handshake) is called frequently in a performance-sensitive, synchronous path, repeatedly creating and tearing down event loops could introduce overhead.

Consider if the broader context calling _read_blocks can be made async to avoid asyncio.run(). If not, this might be acceptable if handshakes are infrequent. The PR description mentions a TODO: "Scheduler changes to move the agent registration out of _add_blocks", which might address this by making handshakes less frequent or moving them to a more appropriate (potentially async) context.

Comment on lines 442 to 519
# If metadata_bytes is None, it means the remote agent
# is not using NIXL, so we can skip the handshake.
logger.warning(
"Received None metadata from %s:%s, skipping NIXL handshake",
host, port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Handling the case where metadata_bytes is None by logging a warning and skipping the NIXL handshake is a good fallback. This allows graceful degradation if a remote agent isn't using NIXL or if there's an issue fetching its metadata.

Comment on lines +143 to +156
# Set handshake metadata using the base class method
if hasattr(self.connector_worker, 'xfer_metadata'):
self.set_handshake_metadata(self.connector_worker.xfer_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to set handshake metadata after registering KV caches is good. The hasattr check for xfer_metadata on self.connector_worker is a safe way to ensure the attribute exists before accessing it, especially if NixlConnectorWorker.register_kv_caches might not always set it (though in the current code, it seems it always does if NixlWrapper is available).

Comment on lines 80 to 89
num_gpu_blocks, num_cpu_blocks, kv_cache_config, \
transfer_handshake_metadata = self._initialize_kv_caches(vllm_config)

vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
vllm_config.cache_config.num_cpu_blocks = num_cpu_blocks

# Store KV connector metadata for handshake
self.transfer_handshake_metadata = transfer_handshake_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _initialize_kv_caches method now also returns transfer_handshake_metadata, which is then stored in self.transfer_handshake_metadata. This metadata is collected from workers after KV cache registration. This is a key change to support the HTTP-based handshake mechanism.

Comment on lines 172 to 176
# Collect KV connector xfer metadata from workers (after KV cache registration)
transfer_handshake_metadata = (
self.model_executor.get_kv_connector_handshake_metadata())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Collecting transfer_handshake_metadata from self.model_executor.get_kv_connector_handshake_metadata() after initializing KV caches is the correct place to gather this information. This metadata is then propagated up and eventually exposed via the new API endpoint.

Comment on lines 424 to 460
# Include KV connector metadata if available
if hasattr(self,
'transfer_handshake_metadata') and self.transfer_handshake_metadata:
# self.transfer_handshake_metadata is a list of dicts from workers
# Each dict already has structure {tp_rank: {dp_rank: metadata}}
# Merge all worker dicts into a single dict
content = {}
for worker_dict in self.transfer_handshake_metadata:
if worker_dict is not None:
content.update(worker_dict)
handshake_message["transfer_handshake_metadata"] = content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The handshake message now includes transfer_handshake_metadata if available. This metadata is a list of dictionaries from workers, each structured as {tp_rank: {dp_rank: metadata}}. The code correctly merges these into a single content dictionary before encoding. This is how the metadata becomes available to other vLLM instances for the HTTP-based handshake.

vllm/v1/utils.py Outdated
Comment on lines 372 to 548
# stash KV connector metadata in vllm_config if passed in.
if "transfer_handshake_metadata" in msg and msg["transfer_handshake_metadata"]:
logger.debug(
"Received transfer handshake metadata from engine %s: %s",
eng_index, msg["transfer_handshake_metadata"])
# Merge the received metadata with existing cache config
for tp_rank, dp_dict in msg["transfer_handshake_metadata"].items():
for dp_rank, metadata in dp_dict.items():
cache_config.transfer_handshake_metadata[tp_rank][
dp_rank] = metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The wait_for_engine_startup function now processes transfer_handshake_metadata if it's present in the handshake message from an engine. It correctly merges this metadata into the cache_config.transfer_handshake_metadata. This allows the orchestrator or client to gather all necessary handshake details from all engine instances. The nested loop structure for merging tp_rank and dp_rank specific metadata is appropriate.

Comment on lines 221 to 267
def get_kv_connector_handshake_metadata(self) -> Optional[dict]:
"""Get KV connector metadata from this worker if available."""

connector = get_kv_transfer_group()
if not is_v1_kv_transfer_group(connector):
logger.warning(
"The KV connector is not a v1 connector. "
"This method is only supported for v1 connectors.")
return None

# Only return metadata if this is a worker role
if connector.role == KVConnectorRole.WORKER:
metadata = connector.get_handshake_metadata()
if metadata is None:
logger.warning(
"KV connector metadata is not available. "
"This may happen if the KV connector is not initialized "
"or the worker is not part of a disaggregated KV cache setup."
)
return None

tp_rank = get_tp_group().rank_in_group
dp_rank = self.vllm_config.parallel_config.data_parallel_rank_local
return {
tp_rank: {
dp_rank: msgspec.to_builtins(metadata)
}
}

return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The get_kv_connector_handshake_metadata method is a new addition to the Worker class. It correctly retrieves handshake metadata from the KV connector if it's a v1 connector and the role is WORKER.

Key points:

  • It checks if the connector is v1 and logs a warning if not. This is good for compatibility.
  • It only returns metadata for WORKER roles, which is logical as schedulers typically don't expose their own NIXL-agent-like metadata in the same way.
  • It handles the case where metadata might be None (e.g., connector not initialized or not a disaggregated setup) by logging a warning and returning None.
  • The returned metadata is structured as {tp_rank: {dp_rank: msgspec.to_builtins(metadata)}}. This structure is consistent with how it's processed in vllm/v1/engine/core.py and vllm/v1/utils.py.

This method is crucial for enabling workers to provide their handshake details, which are then aggregated by the engine and made available for other instances.

Copy link
Member

@russellb russellb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at this very quickly, but is putting this in the main API server just a temporary thing?

This seems like something that should be internal-facing only, not in the same API server exposed externally.

@wseaton
Copy link
Contributor Author

wseaton commented Jun 10, 2025

@russellb this has to be externally accessible for P/D disagg, as one "api server" is basically handshaking with another. But it is important that is is accessible only by trusted peers (since the information can be used by an attacker to leak information about KV blocks, then pivot to potentially register nixl agents, etc). So that authentication mechanism needs to be figured out. Happy to chat about the path forward for this!

@mergify mergify bot removed the needs-rebase label Jun 11, 2025
Signed-off-by: Will Eaton <[email protected]>

# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NickLucche can you help review this for correctness? For HTTP metadata xfer, we store nixlagent metadata in a dict tp_rank.dp_rank.metadata so ["0"]["0"]["agent_metadata"] is the default remote agent in the TP1DP1 case.

Is remote_tp_size and the corresponding p_remote_rank calculation correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks correct. However I am not sure why we're bundling tp.dp here.
I am treating each DP as a separate remote for all intents and purposes, hence we would be able to support DP groups with different TP sizes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although you can't start DP with different TP right now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DP is being bundled in the metadata because the api server runs a collective rpc to get metadata from all workers, which includes DP workers, for now though that metadata is functionally ignored, but it might be useful in the future for us or other connectors

Copy link
Contributor

@NickLucche NickLucche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the http exchange logic in nixl is in a good state.

I am not convinced the increased code complexity in handling the threads is really beneficial though, particularly for readability.


try:
req = Request(url)
with urlopen(req, timeout=5.0) as response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: are we putting timeout in a global var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, this should probably get moved to something like envs.KV_METADATA_HANDSHAKE_TIMEOUT or something like that.

remote_tp_size = len(res.keys())
# Default case is that the remote TP size is 1, so we can
# directly access the metadata.
tp_data = res.get(str(self.tp_rank), {}).get("0", {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can't we maintain the same structure regardless of tp0 or tp>1?

raise


remote_tp_size = len(res.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fyi I am hoping I can transmit the tp size with the kvmetadata here #19413

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of my comment above, and the fact that all ranks are bundled together right now we get this for free with keys... but it makes the size of this bundle grow with number of ranks. I think this means we should make the api something like get_kv_connector_metadata/{rank}, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense, since rank_i only needs the data from rank_j


# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks correct. However I am not sure why we're bundling tp.dp here.
I am treating each DP as a separate remote for all intents and purposes, hence we would be able to support DP groups with different TP sizes.


# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although you can't start DP with different TP right now)

Comment on lines 488 to 489
agent_metadata=base64.b64decode(metadata_bytes),
kv_caches_base_addr=tp_data["kv_caches_base_addr"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see here the problem with sending a lot of memory pointers through json..a bit ugly, but it's a one off

Copy link
Contributor Author

@wseaton wseaton Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... also json might not be ideal here since the agent_metadata is binary. we can revisit this encoding, since it's an internal API we could probably just use msgpack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree


logger.debug("NIXL handshake: get metadata took: %s",
pre_register - start_time)
logger.debug("NIXL handshake: add agent took: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, is this value changing significantly with http?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's slightly slower w/ nixl 0.2, with nixl 0.3.1 agent registration is a lot faster, which makes the increase more noticable. But, since we are moving this onto a background thread, so the engine is not blocked during handshake. If we can reduce the amount of metadata needed for handshake (like the optimization you already have) I think we can get closer to zmq speed.


# check for timeout (threads running > 30 seconds)
thread_age = time.time() - getattr(thread, '_start_time', time.time())
if thread.is_alive() and thread_age > 30.0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing with constant

@@ -851,39 +883,138 @@ def _pop_done_transfers(
xfer_state)
return done_req_ids

def _process_completed_handshakes(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the increased complexity in this method - perhaps I wish we had separated the async handshake from the http exchange.

Is this meant for speeding up a warmup stage where requests are sent just to get every D to handshake with every other P?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for making sure we reprocess any requests that are deferred because they needed handshakes to happen (as a part of the changes to make handshakes happen in the background), it might be my lack of knowledge of scheduler internals, but when I tried to just continue and have them reprocessed on next engine step, I didn't see the _read_blocks ever retried.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also definitely open to refactoring the background threading stuff... initially I had it working via aiohttp and concurrent.Futures which made the code a bit cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was expecting to see asyncio usage here too tbh

@wseaton
Copy link
Contributor Author

wseaton commented Jun 12, 2025

There is a good opportunity here to align with other refactoring efforts, specifically proposed changes here #19330 (comment), which would allow pushing some of the retry logic explored here directly into the scheduler.

@chaunceyjiang
Copy link
Contributor

If PP > 1, is this solution compatible?
For example, with PP = 4, how should PP1, PP2, and PP3 handle the metadata?

Copy link
Member

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wseaton, great work!

As well as the inline comments, I think the async loading could be streamlined a bit, here are my thoughts on that:

  1. Instead of spawning threads, could use a ThreadPoolExecutor with the returned futures tracking the handshake results instead of a separate dict
  2. Can have a dict of engine_id -> handshake future. When the future completes it removes itself from this dict and adds the entry to the _remote_agents dict
  3. In start_load_kv, if a particular engine_id isn't in the _remote_agents dict, the futures dict would first be checked and if it also doesn't contain the engine_id then the handshake can be submitted and the returned future put into the futures dict. Future. add_done_callback on the future can be used to add the req_id + meta to a Queue of requests that are now ready to load.
  4. After the loop over metadata in load_kv, it can loop over this queue to call read_blocks for and of the req_id,meta pairs in there.
  5. A lock can guard (2) and (3) to avoid any race between the two dicts

We had been avoiding using asyncio in the core process, I think a thread approach is fine.

Also I'm wondering about the peer-to-peer metadata exchange vs something orchestrated by the router/sidecar, see comment in the related llm-d design doc here.

.gitignore Outdated
@@ -202,3 +202,4 @@ shellcheck*/

# Ingore moe/marlin_moe gen code
csrc/moe/marlin_moe_wna16/kernel_*
uv.lock
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick dropby, I think we should move to uv.lock at some point, so we shouldn't ignore this lock file here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely, will remove.

Copy link

mergify bot commented Jun 19, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @wseaton.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants