test: conditional disagg and cache aware balancing for deepseek v3 (#4522)

Signed-off-by: Zheng Duan <200704041+zhengd-nv@users.noreply.github.com>
This commit is contained in:
Zheng Duan 2025-06-11 09:44:29 +08:00 committed by GitHub
parent 1b79041f5d
commit 580a92521e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 233 additions and 16 deletions

View File

@ -0,0 +1,35 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
backend: "pytorch"
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 2
router:
type: kv_cache_aware
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.1
urls:
- "localhost:8001"
- "localhost:8002"
generation_servers:
num_instances: 2
router:
type: kv_cache_aware
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.1
urls:
- "localhost:8003"
- "localhost:8004"

View File

@ -0,0 +1,31 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
free_gpu_memory_fraction: 0.15
backend: "pytorch"
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
urls:
- "localhost:8001"
generation_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
router:
type: kv_cache_aware
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.05
urls:
- "localhost:8002"

View File

@ -0,0 +1,34 @@
hostname: localhost
port: 8000
model: DeepSeek-V3-Lite/bf16
backend: "pytorch"
free_gpu_memory_fraction: 0.15
conditional_disagg_config:
max_local_prefill_length: 100
use_cuda_graph: False
disable_overlap_scheduler: True
autotuner_enabled: False
context_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.15
urls:
- "localhost:8001"
generation_servers:
num_instances: 1
tensor_parallel_size: 1
pipeline_parallel_size: 1
router:
type: kv_cache_aware
kv_cache_config:
enable_block_reuse: True
enable_partial_reuse: True
event_buffer_max_size: 1024
free_gpu_memory_fraction: 0.15
urls:
- "localhost:8002"

View File

@ -100,6 +100,12 @@ def get_test_config(test_desc, example_dir, test_root):
(2,
f"{test_configs_root}/disagg_config_ctxtp1_gentp1_deepseek_v3_lite_one_mtp_attention_dp_overlap.yaml"
),
"deepseek_v3_lite_bf16_cache_aware_balance":
(4,
f"{test_configs_root}/disagg_config_cache_aware_balance_deepseek_v3.yaml"
),
"deepseek_v3_lite_bf16_conditional":
(2, f"{test_configs_root}/disagg_config_conditional_deepseek_v3.yaml"),
}
if test_desc not in config_map:
@ -757,3 +763,45 @@ def test_disaggregated_deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp(
"deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())
@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_disaggregated_deepseek_v3_lite_bf16_cache_aware_balance(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
src_dst_dict = {
deepseek_v3_model_root:
f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16",
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)
run_disaggregated_test(disaggregated_example_root,
"deepseek_v3_lite_bf16_cache_aware_balance",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())
@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_disaggregated_deepseek_v3_lite_bf16_conditional(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
src_dst_dict = {
deepseek_v3_model_root:
f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16",
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)
run_disaggregated_test(disaggregated_example_root,
"deepseek_v3_lite_bf16_conditional",
env=llm_venv._new_env,
cwd=llm_venv.get_working_directory())

View File

@ -9,6 +9,9 @@ from typing import Generator, List, Optional, Tuple
import aiohttp
import pytest
import yaml
from defs.conftest import skip_no_hopper
from defs.disaggregated.test_disaggregated_single_gpu import \
model_path as get_model_path
from defs.trt_test_alternative import popen
from transformers import AutoTokenizer
@ -19,8 +22,6 @@ from tensorrt_llm.serve.router import (KvCacheAwareRouter,
KvCacheAwareServerState, ServerRole,
block_key_hasher)
MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
def get_ctx_gen_server_urls_from_cfg(config_file: str):
with open(config_file, 'r') as file:
@ -184,15 +185,17 @@ class ConditionalWorkerTester(BasicWorkerTester):
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 180):
server_start_timeout_secs: int = 180,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.model_name = model_name
async def multi_round_request(self, session: aiohttp.ClientSession,
init_prompt: str, max_rounds: int,
threshold: float):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 10,
"ignore_eos": True,
@ -235,10 +238,15 @@ class KvCacheEventWorkerTester(BasicWorkerTester):
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 240):
server_start_timeout_secs: int = 240,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
model_path: Optional[str] = None):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if model_path is None:
model_path = get_model_path(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model_name = model_name
self.kv_cache_block_maps: dict[str, KvCacheAwareServerState] = {}
self.kv_cache_event_maps: dict[str, list[dict]] = {}
for ctx_server in ctx_servers:
@ -266,7 +274,7 @@ class KvCacheEventWorkerTester(BasicWorkerTester):
max_rounds: int,
check_match_count: bool = True):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 64,
"ignore_eos": True,
@ -347,13 +355,18 @@ class KvCacheAwareRouterTester(BasicWorkerTester):
ctx_servers: List[str],
gen_servers: List[str],
req_timeout_secs: int = 180,
server_start_timeout_secs: int = 180):
server_start_timeout_secs: int = 180,
model_name: str = "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
tokens_per_block: int = 32):
super().__init__(ctx_servers, gen_servers, req_timeout_secs,
server_start_timeout_secs)
self.ctx_router = KvCacheAwareRouter(server_role=ServerRole.CONTEXT,
servers=ctx_servers)
servers=ctx_servers,
tokens_per_block=tokens_per_block)
self.gen_router = KvCacheAwareRouter(server_role=ServerRole.GENERATION,
servers=gen_servers)
servers=gen_servers,
tokens_per_block=tokens_per_block)
self.model_name = model_name
async def multi_round_request(self,
session: aiohttp.ClientSession,
@ -361,7 +374,7 @@ class KvCacheAwareRouterTester(BasicWorkerTester):
max_rounds: int = 8,
check_server_match: bool = True):
request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": init_prompt,
"max_tokens": 64,
"ignore_eos": True,
@ -373,7 +386,7 @@ class KvCacheAwareRouterTester(BasicWorkerTester):
gen_match = 0
for i in range(max_rounds):
openai_request = CompletionRequest(
model=MODEL_NAME,
model=self.model_name,
prompt=request["prompt"],
disaggregated_params=DisaggregatedParams(
request_type="context_only"))
@ -425,7 +438,7 @@ class KvCacheAwareRouterTester(BasicWorkerTester):
async with await self.new_session() as session:
# send a dummy request for initialization
dummy_request = {
"model": MODEL_NAME,
"model": self.model_name,
"prompt": [3] * 200,
"max_tokens": 1,
"ignore_eos": True,
@ -447,7 +460,7 @@ class KvCacheAwareRouterTester(BasicWorkerTester):
logger.info(f"Block pool size: {block_pool_size}")
# the dummy request can be reused
openai_request = CompletionRequest(model=MODEL_NAME,
openai_request = CompletionRequest(model=self.model_name,
prompt=dummy_request["prompt"])
server, info = await self.gen_router.get_next_server(openai_request)
first_match = info["matches"][0]
@ -503,8 +516,7 @@ def load_default_prompts(disaggregated_example_root: str):
@contextlib.contextmanager
def background_workers(llm_venv, config_file: str, num_ranks: int = None):
cwd = llm_venv.get_working_directory()
with open(os.path.join(cwd, 'output_workers.log'), 'w') as log_file:
with open(os.path.join(cwd, 'output_workers.log'), 'w+') as log_file:
workers_proc, ctx_servers, gen_servers = run_disaggregated_workers(
config_file=config_file,
stdout=log_file,
@ -537,6 +549,30 @@ def test_workers_conditional_disaggregation(disaggregated_test_root,
asyncio.run(tester.test_multi_round_request(prompts))
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_workers_conditional_disaggregation_deepseek_v3_lite_bf16(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
config_file = os.path.join(
disaggregated_test_root,
'test_configs/disagg_config_cache_reuse_deepseek_v3.yaml')
model_root = f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16"
src_dst_dict = {
deepseek_v3_model_root: model_root,
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)
with background_workers(llm_venv, config_file,
2) as (ctx_servers, gen_servers):
tester = ConditionalWorkerTester(ctx_servers, gen_servers)
prompts = load_default_prompts(disaggregated_example_root)
asyncio.run(tester.test_multi_round_request(prompts))
@pytest.mark.parametrize("llama_model_root", ['TinyLlama-1.1B-Chat-v1.0'],
indirect=True)
def test_workers_kv_cache_events(disaggregated_test_root,
@ -570,6 +606,35 @@ def test_workers_kv_cache_aware_router(disaggregated_test_root,
asyncio.run(tester.test_multi_round_request(prompts, 16, 4))
@skip_no_hopper
@pytest.mark.parametrize("deepseek_v3_model_root", ['DeepSeek-V3-Lite-bf16'],
indirect=True)
def test_workers_kv_cache_aware_router_deepseek_v3_lite_bf16(
disaggregated_test_root, disaggregated_example_root, llm_venv,
deepseek_v3_model_root):
config_file = os.path.join(
disaggregated_test_root,
'test_configs/disagg_config_cache_aware_balance_deepseek_v3.yaml')
model_root = f"{llm_venv.get_working_directory()}/DeepSeek-V3-Lite/bf16"
src_dst_dict = {
deepseek_v3_model_root: model_root,
}
for src, dst in src_dst_dict.items():
if not os.path.islink(dst):
os.makedirs(os.path.dirname(dst), exist_ok=True)
os.symlink(src, dst, target_is_directory=True)
with background_workers(llm_venv, config_file,
4) as (ctx_servers, gen_servers):
os.chdir(llm_venv.get_working_directory())
tester = KvCacheAwareRouterTester(ctx_servers,
gen_servers,
model_name="DeepSeek-V3-Lite/bf16",
tokens_per_block=64)
prompts = load_default_prompts(disaggregated_example_root)
asyncio.run(tester.test_multi_round_request(prompts, 8, 4))
@pytest.mark.parametrize("llama_model_root", ['TinyLlama-1.1B-Chat-v1.0'],
indirect=True)
def test_workers_kv_cache_aware_router_eviction(disaggregated_test_root,

View File

@ -114,6 +114,10 @@ l0_dgx_h100:
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_tp1_attention_dp_overlap_one_mtp[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_attention_dp_overlap_cuda_graph[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_fp8_overlap_cuda_graph[DeepSeek-V3-Lite-fp8]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_bf16_cache_aware_balance[DeepSeek-V3-Lite-bf16]
- disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_bf16_conditional[DeepSeek-V3-Lite-bf16]
- disaggregated/test_workers.py::test_workers_conditional_disaggregation_deepseek_v3_lite_bf16[DeepSeek-V3-Lite-bf16]
- disaggregated/test_workers.py::test_workers_kv_cache_aware_router_deepseek_v3_lite_bf16[DeepSeek-V3-Lite-bf16]
- condition:
ranges:
system_gpu_count: