-
-
Notifications
You must be signed in to change notification settings - Fork 8.2k
[V1] - Enable worker -> scheduler connector metadata #19555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[V1] - Enable worker -> scheduler connector metadata #19555
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @orozery, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request refactors the communication mechanism between workers and the scheduler for KV cache transfer status. It introduces a generic metadata channel from workers to the scheduler, allowing workers to send connector-specific information. The scheduler is now responsible for collecting and aggregating this metadata from all workers to determine the global state of KV transfers, particularly for distributed connectors like Nixl where completion requires coordination across multiple ranks.
Highlights
- New Worker-to-Scheduler Metadata API: Introduced a new base method
build_worker_connector_meta
on the worker side of the KV connector interface. This method allows workers to build and return arbitrary metadata (KVConnectorMetadata
) to the scheduler during each execution step. - Scheduler-side Aggregation of KV Transfer Status: Moved the responsibility of aggregating finished KV transfers (sending and receiving) from the worker side to the scheduler side. The scheduler-side base connector now has a
get_finished
method that takes the collected metadata from all workers and determines which requests have completed their transfers across the entire distributed setup. - ModelRunnerOutput Update: The
ModelRunnerOutput
dataclass, which workers return to the scheduler, has been updated to include a list ofkv_connector_metadata
instead of separatefinished_sending
andfinished_recving
sets. This allows workers to pass richer, connector-specific information. - Executor Aggregation: The multiprocess executor (
MultiprocExecutor
) now collectsModelRunnerOutput
from all workers and aggregates thekv_connector_metadata
from each worker into a single list before passing the combined output to the scheduler. - Nixl Connector Adaptation: The Nixl connector implementation has been updated to use the new metadata mechanism. It defines
NixlWorkerConnectorMetadata
to carry finished transfer IDs from the worker. The logic for tracking completion across all tensor parallel ranks has been moved from the worker-sideNixlConnectorWorker
to the scheduler-sideNixlConnectorScheduler
.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configureGemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
@sdavidbd we can change the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism for workers to send metadata to the scheduler via the KV connector, which is a significant architectural change. The core idea is to centralize the aggregation of KV transfer events and status on the scheduler side. Key changes include introducing build_worker_connector_meta
on the worker side and moving/refining get_finished
to the scheduler side.
Overall, the changes seem to implement the described functionality. However, there are a few critical areas to address:
- A potential logic swap in
vllm/v1/core/sched/scheduler.py
regarding howfinished_sending
andfinished_recving
statuses from the connector are interpreted and acted upon. This could lead to incorrect behavior like freeing blocks prematurely or not marking requests as ready when they are. - Type mismatches and potential runtime errors in
NixlConnectorMetadata
,ModelRunnerOutput
, andgpu_worker.py
related to handlingNone
values and list assignments forkv_connector_metadata
.
Addressing these points will be crucial for the correctness and stability of this new metadata flow.
@njhill @robertgshaw2-redhat putting this for preliminary review before weekend starts over here. |
4af6c58
to
f4351d0
Compare
This commit makes the following changes: 1. Add a new kv_connector_metadata to ModelRunnerOutput to allow arbitrary connector metadata flow from workers to the scheduler. 2. Add a new worker-side connector API to build the above metadata. 3. Change MultiprocExecutor to get ModelRunnerOutput from all workers, and aggregate the kv_connector_metadata from all. 3. Move the get_finished connector API from the worker side to the scheduler side. 4. Change the nixl and multi connectors to match the above API changes. Signed-off-by: Or Ozeri <[email protected]>
f4351d0
to
916c8e2
Compare
assert isinstance(output, ModelRunnerOutput) | ||
return output if self.is_driver_worker else None | ||
if has_kv_transfer_group(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer to use is_v1_kv_transfer_group
until V0 is officially deprecated.
finished_sending: Optional[set[str]] = None | ||
finished_recving: Optional[set[str]] = None | ||
# KV Cache Connector metadata. | ||
kv_connector_metadata: Optional[list["KVConnectorMetadata"]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KVConnectorMetadata
was originally intended for scheduler-to-worker signaling. Using it in the opposite direction (worker-to-scheduler) could blur its semantics. It might be cleaner to introduce a separate class like KVConnectorOutput
for this purpose.
Also, as mentioned above, I think aggregation in multi-worker setups should be handled at the MultiprocExecutor
level rather than in the Scheduler
. In that case kv_connector_metadata
should be typed as: Optional["KVConnectorMetadata"]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want the scheduler connector to get access to all of the KVConnectorMetadata
from all workers.
Only the connector knows what's inside the metadata. From the MultiprocExecutor
perspective it's opaque.
The Executor
returns a single ModelRunnerOutput
, so the way I found to let the scheduler connector access all metadatas is having the executor simply compose all of the metadatas to a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand your approach, but I still believe the alternative I suggested - employing connector-specific aggregation logic at the executor - is cleaner. This way, the executor can return a single, aggregated KVConnectorMetadata
object, and the scheduler connector continues to work with a unified metadata instance rather than a list. It keeps the interface consistent and offloads connector-specific logic to where it belongs.
Also, could we revisit the idea of separating the metadata classes for scheduler-to-worker and worker-to-scheduler communication?
return EMPTY_MODEL_RUNNER_OUTPUT | ||
if has_kv_transfer_group(): | ||
with set_forward_context(None, self.vllm_config): | ||
self.maybe_setup_kv_connector(scheduler_output) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're missing a call to clear_connector_metadata
in this case (also before this change).
return output if self.is_driver_worker else None | ||
if has_kv_transfer_group(): | ||
kv_connector_metadata = \ | ||
get_kv_transfer_group().build_worker_connector_meta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_worker_connector_meta
should be called in gpu_model_runner.execute_model
, before invoking clear_connector_metadata
.
Alternatively, we could delegate the state clearing to build_worker_connector_meta
itself and remove the clear_connector_metadata
API. This would also make it symmetric with build_connector_meta
, which is responsible for resetting the scheduler connector’s state.
@@ -1028,21 +1028,27 @@ def _update_waiting_for_remote_kv(self, request: Request) -> bool: | |||
self.finished_recving_kv_req_ids.remove(request.request_id) | |||
return True | |||
|
|||
def _update_from_kv_xfer_finished(self, | |||
model_runner_output: ModelRunnerOutput): | |||
def _update_from_kv_connector_metadata( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kv_connector_metadata
may carry distinct signals for different code paths. I think it would be cleaner if update_from_output
used dedicated connector APIs to extract the relevant information from kv_connector_metadata
, e.g.:
finished_sending, finished_recving = self.connector.get_finished(kv_connector_metadata)
_update_from_kv_xfer_finished(finished_sending, finished_recving)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better the scheduler connector aggregates the metadatas only once, and outputs everything the scheduler needs back (finished reqs, invalid blocks, etc).
So obviously get_finished
is not a good name. Maybe something like process_worker_output(..) -> ConnectorOutput
where ConnectorOutput
is a new struct that will contain all relevant fields (which were previously laid out flat on ModelRunnerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that metadata aggregation should happen only once (ideally at the executor level). After that, the scheduler would hold a single aggregated instance of the worker-side KVConnectorMetadata
(which is still opaque and connector-specific).
From there, the scheduler can use dedicated connector APIs (e.g., get_finished
) to extract only the information it needs. This keeps the design more flexible and scalable, rather than relying on a single API to unpack all possible data upfront.
|
||
def get_finished( | ||
self, | ||
model_runner_output: ModelRunnerOutput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ModelRunnerOutput
and not KVConnectorMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allow the connector full awareness of the model output (maybe someone will want sampled_token_ids
).
Same way the connector gets full access of the SchedulerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler connector only gets access to SchedulerOutput
for the purpose of creating metadata for the worker connector. Similarly, the worker connector should only access ModelRunnerOutput
to generate metadata for the scheduler connector.
In any case, I don’t think the scheduler connector should have access to ModelRunnerOutput
. That separation helps keep responsibilities clear and avoids unnecessary coupling.
self, finished_req_ids: set[str] | ||
) -> tuple[Optional[set[str]], Optional[set[str]]]: | ||
def build_worker_connector_meta( | ||
self, scheduler_output: SchedulerOutput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to pass in SchedulerOutput
? I think we should make it symmetric with the scheduler-side build_connector_meta
and pass just ModelRunnerOutput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your suggestion will work assuming the connector already got the SchedulerOutput
(by bind_connector_metadata
).
But there's also clear_connector_metadata
in the way, so this seems more fragile to me to try to correspond to the correct scheduler output. I would prefer to directly pass in the scheduler output here to make it easier and more explicit for the worker side connector to build its metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe clear_connector_metadata
should always be the last worker-side API invoked in each step, after which the worker connector’s state is reset. As I mentioned earlier, a cleaner alternative is to make build_worker_connector_meta
responsible for both building the worker-side metadata and resetting the state - mirroring the behavior of build_connector_meta
on the scheduler side - thereby removing the need for a separate clear_connector_metadata
API.
In any case, the worker connector shouldn't need access to the SchedulerOutput
. As you said, it should already receive everything it needs via bind_connector_metadata
.
This keeps the design symmetric:
- The scheduler connector builds the metadata for the worker connector from
SchedulerOutput
and resets its state. - The worker connector builds the metadata for the scheduler connector from
ModelRunnerOutput
and resets its state.
kv_connector_metadata = [] | ||
for i, output in enumerate(outputs): | ||
kv_connector_metadata += output.kv_connector_metadata or [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to use connector-specific aggregation logic here - for example, by introducing a new worker-side KVConnector
API dedicated to aggregating the metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this, but I did not want to introduce the connector inside the executor. Currently it's only in scheduler.py
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're already introducing KVConnectorMetadata
into the executor - so it seems reasonable to also give the executor its own KVConnector
instance. This could be a new EXECUTOR
role with a single API dedicated to aggregating worker-side metadata.
Personally, I find this cleaner than having each worker return a list with a single metadata object and adding ad-hoc logic in the executor to manually merge those lists. Delegating aggregation to the connector keeps the logic encapsulated and consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@njhill What are your thoughts on introducing the connector to the executor to allow aggregation of workers output there?
As suggested in my review, I think we should introduce a new connector API to extract the invalid block IDs from the worker-side connector metadata. |
This pull request has merge conflicts that must be resolved before it can be |
Thanks @orozery. I like this but just trying to think thorough implications/alternatives. In the interests of keeping the interface as simple as possible and minimizing concerns on the connector impl side. One idea is whether it would make sense to abstract this return flow of information in the form of events, i.e. a generalization of what we already have with the lists of finished request ids. It may be that we can then still encapsulate the TP aggregation of these within the framework, since we would require a positive response from all workers. One or more negative results (failures) would translate to a negative result when aggregated. |
@njhill IIUC (please correct me) your suggestion is as follows: @sdavidbd your thoughts on this? |
I really like the idea of making the worker-side connector metadata explicit rather than opaque - especially since it's ultimately consumed by the framework. Given the choice between:
- I’d strongly prefer the latter. Regarding aggregation, I think we can keep it simple and sufficient by following two principles:
For example:
|
This PR enables the worker-side KV connector to pass on arbitrary metadata to the scheduler-side connector.
This allows a standard and easy mechanism to aggregate kv-connector events from all workers.
In a nut-shell, we introduce the following connector APIs:
build_worker_connector_meta
on the worker side, allowing the worker to build metadata to be sent back to the scheduler.get_finished
- on the scheduler side (which was previously a worker side) - gets the connector metadata from all workers and yields the finished request transfers.