Skip to content

Commit 80299e0

Browse files
committed
feat: support offline expert load distribution recording
Signed-off-by: Jade Zheng <[email protected]>
1 parent 3d330c4 commit 80299e0

File tree

7 files changed

+89
-1
lines changed

7 files changed

+89
-1
lines changed

vllm/engine/protocol.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,21 @@ async def start_profile(self) -> None:
284284
async def stop_profile(self) -> None:
285285
"""Start profiling the engine"""
286286
...
287+
288+
@abstractmethod
289+
async def start_expert_distribution_record(self) -> None:
290+
"""Start recording expert distribution"""
291+
...
292+
293+
@abstractmethod
294+
async def stop_expert_distribution_record(self) -> None:
295+
"""Stop recording expert distribution"""
296+
...
297+
298+
@abstractmethod
299+
async def dump_expert_distribution_record(self) -> None:
300+
"""Dump expert distribution record"""
301+
...
287302

288303
@abstractmethod
289304
async def reset_mm_cache(self) -> None:

vllm/entrypoints/openai/api_server.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,41 @@ async def stop_profile(raw_request: Request):
972972
return Response(status_code=200)
973973

974974

975+
if envs.VLLM_EXPERT_DISTRIBUTION_RECORDER_DIR:
976+
@router.get("/start_expert_distribution_record")
977+
async def start_expert_distribution_record(raw_request: Request):
978+
"""Start recording the expert distribution. Clear the previous record if any."""
979+
logger.info("Starting expert distribution record...")
980+
await engine_client(raw_request).start_expert_distribution_record()
981+
logger.info("Expert distribution record started.")
982+
return Response(
983+
content="Start recording the expert distribution.\n",
984+
status_code=200,
985+
)
986+
987+
@router.get("/stop_expert_distribution_record")
988+
async def stop_expert_distribution_record(raw_request: Request):
989+
"""Stop recording the expert distribution."""
990+
logger.info("Stopping expert distribution record...")
991+
await engine_client(raw_request).stop_expert_distribution_record()
992+
logger.info("Expert distribution record stopped.")
993+
return Response(
994+
content="Stop recording the expert distribution.\n",
995+
status_code=200,
996+
)
997+
998+
@router.get("/dump_expert_distribution_record")
999+
async def dump_expert_distribution_record(raw_request: Request):
1000+
"""Dump expert distribution record."""
1001+
logger.info("Dumping expert distribution record...")
1002+
await engine_client(raw_request).dump_expert_distribution_record()
1003+
logger.info("Expert distribution record dumped.")
1004+
return Response(
1005+
content="Dump expert distribution record.\n",
1006+
status_code=200,
1007+
)
1008+
1009+
9751010
if envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING:
9761011
logger.warning(
9771012
"LoRA dynamic loading & unloading is enabled in the API server. "

vllm/envs.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@
128128
VLLM_TOOL_PARSE_REGEX_TIMEOUT_SECONDS: int = 1
129129
VLLM_SLEEP_WHEN_IDLE: bool = False
130130
VLLM_MQ_MAX_CHUNK_BYTES_MB: int = 16
131-
131+
VLLM_EXPERT_DISTRIBUTION_RECORDER_DIR: Optional[str] = None
132132

133133
def get_default_cache_root():
134134
return os.getenv(
@@ -879,6 +879,11 @@ def get_vllm_port() -> Optional[int]:
879879
# processes via zmq.
880880
"VLLM_MQ_MAX_CHUNK_BYTES_MB":
881881
lambda: int(os.getenv("VLLM_MQ_MAX_CHUNK_BYTES_MB", "16")),
882+
# Directory to store expert distribution recorder files.
883+
884+
"VLLM_EXPERT_DISTRIBUTION_RECORDER_DIR":
885+
lambda: os.path.expanduser(
886+
os.getenv("VLLM_EXPERT_DISTRIBUTION_RECORDER_DIR", None)),
882887
}
883888

884889
# --8<-- [end:env-vars-definition]

vllm/v1/engine/async_llm.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,15 @@ async def reset_mm_cache(self) -> None:
498498
self.processor.mm_input_cache_client.reset()
499499
await self.engine_core.reset_mm_cache_async()
500500

501+
async def start_expert_distribution_record(self):
502+
await self.engine_core.expert_distribution_record_async(is_start=True)
503+
504+
async def stop_expert_distribution_record(self):
505+
await self.engine_core.expert_distribution_record_async(is_start=False)
506+
507+
async def dump_expert_distribution_record(self):
508+
await self.engine_core.dump_expert_distribution_record_async()
509+
501510
async def reset_prefix_cache(self,
502511
device: Optional[Device] = None) -> None:
503512
if device == Device.CPU:

vllm/v1/engine/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,12 @@ def reset_mm_cache(self):
302302

303303
self.mm_input_cache_server.reset()
304304

305+
def expert_distribution_record(self, is_start: bool) -> None:
306+
self.model_executor.expert_distribution_record(is_start)
307+
308+
def dump_expert_distribution_record(self) -> None:
309+
self.model_executor.dump_expert_distribution_record()
310+
305311
def reset_prefix_cache(self):
306312
self.scheduler.reset_prefix_cache()
307313

vllm/v1/engine/core_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ def add_request(self, request: EngineCoreRequest) -> None:
105105

106106
def profile(self, is_start: bool = True) -> None:
107107
raise NotImplementedError
108+
109+
def expert_distribution_record(self, is_start: bool) -> None:
110+
raise NotImplementedError
111+
112+
def dump_expert_distribution_record(self) -> None:
113+
raise NotImplementedError
108114

109115
def reset_mm_cache(self) -> None:
110116
raise NotImplementedError
@@ -857,6 +863,12 @@ async def profile_async(self, is_start: bool = True) -> None:
857863
async def reset_mm_cache_async(self) -> None:
858864
await self.call_utility_async("reset_mm_cache")
859865

866+
async def expert_distribution_record_async(self, is_start: bool) -> None:
867+
await self.call_utility_async("expert_distribution_record", is_start)
868+
869+
async def dump_expert_distribution_record_async(self) -> None:
870+
await self.call_utility_async("dump_expert_distribution_record")
871+
860872
async def reset_prefix_cache_async(self) -> None:
861873
await self.call_utility_async("reset_prefix_cache")
862874

vllm/v1/executor/abstract.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ def max_concurrent_batches(self) -> int:
9595
def profile(self, is_start: bool = True):
9696
self.collective_rpc("profile", args=(is_start, ))
9797

98+
def expert_distribution_record(self, is_start: bool):
99+
self.collective_rpc("expert_distribution_record",
100+
args=(is_start,))
101+
102+
def dump_expert_distribution_record(self):
103+
self.collective_rpc("dump_expert_distribution_record")
98104

99105
class UniProcExecutor(UniProcExecutorV0, Executor):
100106
pass

0 commit comments

Comments
 (0)