TensorRT-LLMs/tests/unittest/llmapi/test_llm_multi_gpu.py
Yan Chunwei 63f3fba679
waive test_llm_multi_node_pytorch (#3592)
Signed-off-by: Superjomn <328693+Superjomn@users.noreply.github.com>
2025-04-16 10:49:07 +08:00

517 lines
18 KiB
Python

import asyncio
import json
import os
import subprocess # nosec B404
import tempfile
import pytest
from parameterized import parameterized
from tensorrt_llm.executor import ExecutorBindingsProxy
from tensorrt_llm.llmapi import LLM, BuildConfig, KvCacheConfig, SamplingParams
from tensorrt_llm.llmapi.tokenizer import TransformersTokenizer
from tensorrt_llm.mapping import Mapping
from tensorrt_llm.models import PretrainedConfig
from tensorrt_llm.models.llama.model import LLaMAForCausalLM
# isort: off
from .test_llm import (
DummyError, DummyExecutorWorker3, _test_llm_capture_request_error,
_test_llm_generate_async, check_llm_return_context_logits,
check_llm_return_generation_logits, default_model_name, get_model_path,
llama_7b_multi_lora_test_harness, llama_model_path,
llama_v2_7b_prompt_adapter_test_harness, llama_v2_13b_lora_test_harness,
llm_check_output, llm_get_stats_async_test_harness,
llm_get_stats_test_harness, llm_test_harness, mixtral_model_name, prompts,
tinyllama_guided_decoding_test_harness,
tinyllama_logits_processor_test_harness, run_llm_with_postprocess_parallel,
run_llm_with_postprocess_parallel_and_result_handler, run_llm_abort_request,
sampling_params_for_aborting_request)
from .test_llm_kv_cache_events import create_llm
from utils.util import (skip_gpu_memory_less_than, skip_single_gpu,
unittest_name_func, force_ampere)
# isort: on
pytestmark = pytest.mark.threadleak(enabled=False)
# shrink the kv_cache_config to avoid OOM in CI
global_kv_cache_config = KvCacheConfig(free_gpu_memory_fraction=0.4)
# python api does not seem to support extra tokens needed for prompt tuning + reuse.
# disable block reuse for those tests.
# TODO: Add extra tokens to prompt tuning unit tests.
global_kv_cache_config_no_reuse = KvCacheConfig(free_gpu_memory_fraction=0.4,
enable_block_reuse=False)
@pytest.fixture(scope="module")
def engine_from_checkpoint() -> tempfile.TemporaryDirectory:
tokenizer = TransformersTokenizer.from_pretrained(llama_model_path)
assert tokenizer is not None
tp_size = 2
with tempfile.TemporaryDirectory() as ckpt_dir:
for rank in range(tp_size):
mapping = Mapping(world_size=tp_size, tp_size=tp_size, rank=rank)
llama = LLaMAForCausalLM.from_hugging_face(llama_model_path,
mapping=mapping)
llama.save_checkpoint(ckpt_dir, save_config=(rank == 0))
del llama
llm = LLM(
ckpt_dir,
tokenizer=tokenizer,
kv_cache_config=global_kv_cache_config,
)
assert llm.args.parallel_config.tp_size == tp_size
tmpdir = tempfile.TemporaryDirectory()
with llm:
llm.save(tmpdir.name)
return tmpdir
@pytest.mark.gpu2
@pytest.mark.part0
def test_llm_loading_from_ckpt_for_tp2(
engine_from_checkpoint: tempfile.TemporaryDirectory):
tokenizer = TransformersTokenizer.from_pretrained(llama_model_path)
llm_test_harness(engine_from_checkpoint.name,
prompts, ["D E F G H I J K"],
sampling_params=SamplingParams(max_tokens=8),
tokenizer=tokenizer,
kv_cache_config=global_kv_cache_config)
@pytest.mark.gpu2
@pytest.mark.part0
def test_llm_generate_tp2():
llm_test_harness(llama_model_path,
prompts, ["D E F G H I J K"],
sampling_params=SamplingParams(max_tokens=8),
tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
def test_llm_explicit_shutdown():
# with-statement will invoke `shutdown()` explicitly
with LLM(model=llama_model_path,
tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config,
fast_build=True) as llm:
llm_check_output(llm,
prompts, ["D E F G H I J K"],
sampling_params=SamplingParams(max_tokens=8))
@skip_single_gpu
def test_llm_return_context_logits_tp2():
check_llm_return_context_logits(tp_size=2)
@skip_single_gpu
def test_llm_return_generation_logits_tp2():
check_llm_return_generation_logits(tp_size=2)
@pytest.mark.parametrize("use_auto_parallel", [True, False],
ids=["enable_auto_parallel", "disable_auto_parallel"])
@pytest.mark.parametrize("from_ckpt", [True, False],
ids=["from_ckpt", "from_hf"])
@pytest.mark.gpu2
@pytest.mark.part0
def test_llm_generate_async_tp2(
engine_from_checkpoint: tempfile.TemporaryDirectory, from_ckpt: bool,
use_auto_parallel: bool):
if use_auto_parallel and from_ckpt:
pytest.skip("Skip auto parallel for TP2 checkpoint")
model_dir = engine_from_checkpoint.name if from_ckpt else get_model_path(
llama_model_path)
tokenizer_dir = get_model_path(llama_model_path)
tokenizer = TransformersTokenizer.from_pretrained(tokenizer_dir)
_test_llm_generate_async(model_dir,
tp_size=2,
use_auto_parallel=use_auto_parallel,
tokenizer=tokenizer)
@skip_gpu_memory_less_than(70 * 1024**3)
@pytest.mark.gpu2
@pytest.mark.part1
def test_llm_generate_mixtral_for_tp2():
llm = LLM(get_model_path(mixtral_model_name),
tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
for output in llm.generate(prompts):
print(output)
@skip_gpu_memory_less_than(70 * 1024**3)
@pytest.mark.gpu2
@pytest.mark.part1
def test_llm_generate_mixtral_for_ep2():
llm = LLM(get_model_path(mixtral_model_name),
tensor_parallel_size=2,
moe_expert_parallel_size=2,
moe_tensor_parallel_size=1,
kv_cache_config=global_kv_cache_config)
for output in llm.generate(prompts):
print(output)
tmpdir = tempfile.TemporaryDirectory()
with llm:
llm.save(tmpdir.name)
with open(os.path.join(tmpdir.name, "config.json"), "r") as f:
# read the build_config and check if the parameters are correctly saved
engine_config = json.load(f)
pretrained_config = PretrainedConfig.from_dict(
engine_config["pretrained_config"])
assert pretrained_config.mapping.moe_tp_size == 1
assert pretrained_config.mapping.moe_ep_size == 2
@pytest.mark.gpu2
@pytest.mark.part2
def test_llm_pp2():
llm_test_harness(llama_model_path,
prompts, ["D E F G H I J K"],
sampling_params=SamplingParams(max_tokens=8),
pipeline_parallel_size=2,
auto_parallel=False,
kv_cache_config=global_kv_cache_config)
def llm_end2end_tp2_cases():
yield ({}, ) # Default options
yield ({'embedding_parallel_mode': 'NONE'}, )
yield ({'embedding_parallel_mode': 'SHARDING_ALONG_HIDDEN'}, )
@parameterized.expand(llm_end2end_tp2_cases(), name_func=unittest_name_func)
@pytest.mark.gpu2
@pytest.mark.part2
def test_llm_end2end_tp2(llm_additional_options):
model_path = get_model_path(default_model_name)
llm = LLM(model_path,
tensor_parallel_size=2,
**llm_additional_options,
kv_cache_config=global_kv_cache_config)
assert llm.args._convert_checkpoint_options
embedding_parallel_mode = llm_additional_options.pop(
'embedding_parallel_mode', 'SHARDING_ALONG_VOCAB')
if embedding_parallel_mode == 'NONE':
assert llm.args._convert_checkpoint_options[
'use_parallel_embedding'] is False
elif embedding_parallel_mode == 'SHARDING_ALONG_VOCAB':
assert llm.args._convert_checkpoint_options[
'use_parallel_embedding'] is True
assert llm.args._convert_checkpoint_options[
'embedding_sharding_dim'] == 0
elif embedding_parallel_mode == 'SHARDING_ALONG_HIDDEN':
assert llm.args._convert_checkpoint_options[
'use_parallel_embedding'] is True
assert llm.args._convert_checkpoint_options[
'embedding_sharding_dim'] == 1
assert not llm_additional_options
llm_check_output(llm,
prompts, ["D E F G H I J K"],
sampling_params=SamplingParams(max_tokens=8))
@pytest.mark.gpu4
@pytest.mark.part0
def test_tinyllama_logits_processor_tp2pp2():
tinyllama_logits_processor_test_harness(tensor_parallel_size=2,
pipeline_parallel_size=2)
@pytest.mark.gpu4
@pytest.mark.part0
@pytest.mark.parametrize("backend", ['tensorrt', 'pytorch'])
def test_tinyllama_guided_decoding_tp2pp2(backend: str):
llm_kwargs = {}
if backend == 'pytorch':
llm_kwargs['backend'] = 'pytorch'
tinyllama_guided_decoding_test_harness(
tensor_parallel_size=2,
pipeline_parallel_size=2,
kv_cache_config=global_kv_cache_config,
**llm_kwargs)
@pytest.mark.gpu2
@pytest.mark.part3
def test_llama_v2_13b_lora_tp2():
llama_v2_13b_lora_test_harness(tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
@pytest.mark.gpu2
@pytest.mark.part3
def test_llama_7b_multi_lora_tp2():
llama_7b_multi_lora_test_harness(tensor_parallel_size=2,
max_loras=1,
max_cpu_loras=8,
kv_cache_config=global_kv_cache_config)
@pytest.mark.gpu2
@pytest.mark.part3
def test_llama_v2_7b_prompt_adapter_tp2():
llama_v2_7b_prompt_adapter_test_harness(
tensor_parallel_size=2, kv_cache_config=global_kv_cache_config_no_reuse)
def run_command(command: str):
try:
result = subprocess.run(command,
shell=True,
check=True,
env=os.environ,
capture_output=True,
text=True) # nosec B603
print("Command output:")
print(result.stdout)
except subprocess.CalledProcessError as e:
print("Command failed with exit code:", e.returncode)
print("Error output:")
print(e.stderr)
print("Standard output:")
print(e.stdout)
raise e
@skip_single_gpu
def test_llm_multi_node(engine_from_checkpoint: tempfile.TemporaryDirectory):
# TODO[chunweiy]: reactivate this later
nworkers = 2
test_case_file = os.path.join(os.path.dirname(__file__), "run_llm.py")
os.path.join(os.path.dirname(__file__), "launch.py")
command = f"mpirun --allow-run-as-root -n {nworkers} trtllm-llmapi-launch python3 {test_case_file} --model_dir {engine_from_checkpoint.name} --tp_size {nworkers}"
print(f"Command: {command}")
run_command(command)
@pytest.mark.skip(reason="https://nvbugspro.nvidia.com/bug/5223608: timeout")
@skip_single_gpu
def test_llm_multi_node_pytorch():
nworkers = 2
test_case_file = os.path.join(os.path.dirname(__file__), "run_llm.py")
os.path.join(os.path.dirname(__file__), "launch.py")
command = f"mpirun --allow-run-as-root -n {nworkers} trtllm-llmapi-launch python3 {test_case_file} --model_dir {llama_model_path} --tp_size {nworkers} --use_pytorch"
print(f"Command: {command}")
run_command(command)
@skip_single_gpu
def test_llm_multi_node_with_postproc():
# TODO[chunweiy]: reactivate this later
nworkers = 2
test_case_file = os.path.join(os.path.dirname(__file__),
"run_llm_with_postproc.py")
os.path.join(os.path.dirname(__file__), "launch.py")
command = f"mpirun --allow-run-as-root -n {nworkers} trtllm-llmapi-launch python3 {test_case_file} --model_dir {llama_model_path} --tp_size {nworkers}"
print(f"Command: {command}")
run_command(command)
@skip_single_gpu
def test_executor_results_cleanup():
llm = LLM(model=llama_model_path,
kv_cache_config=global_kv_cache_config,
tensor_parallel_size=2)
sampling_params = SamplingParams(max_new_tokens=6)
for i in range(20):
llm.generate(prompts, sampling_params=sampling_params)
num_remaining_results = len(llm._executor._results)
assert num_remaining_results == 0
class DummyExecutorMeta(type):
def __new__(cls, name, bases, dic, proxy_class):
new_cls = super().__new__(cls, name, bases, dic)
@staticmethod
def create(engine,
executor_config,
model_world_size: int = 1,
world_size: int = 0,
mpi_session=None,
reuse_mpi_comm: bool = False):
worker_kwargs = {
"engine": engine,
"executor_config": executor_config,
}
return proxy_class(worker_kwargs,
model_world_size=model_world_size,
mpi_session=mpi_session)
new_cls.create = create
return new_cls
class DummyExecutorProxy2(ExecutorBindingsProxy):
''' This is for testing the error occur in the thread in the Proxy. '''
def __init__(
self,
workers_kwargs,
model_world_size: int = 1,
mpi_session=None,
) -> None:
super().__init__(workers_kwargs, model_world_size, mpi_session)
self.counter = 0
def dispatch_result_task(self) -> bool:
self.counter += 1
# This will raise error in dispatch_result_thread in the main process, it will be captured by ManagedThread and
# redirect to the error_queue
if self.counter == 2:
raise DummyError("Test error")
return super().dispatch_result_task()
DummyExecutor2 = DummyExecutorMeta("DummyExecutor2", (), {},
proxy_class=DummyExecutorProxy2)
# TODO[chunweiy]: This test is not stable, need to investigate
def _test_executor_handle_background_error_in_dispatch_result_thread():
llm = LLM(model=llama_model_path,
executor_cls=DummyExecutor2,
kv_cache_config=global_kv_cache_config,
fast_build=True)
# The dummy executor will delay the responses
sampling_params = SamplingParams(max_tokens=6)
# test in streaming mode
async def task():
with pytest.raises(DummyError):
async for output in llm.generate_async(
prompts[0], streaming=True,
sampling_params=sampling_params):
print(output)
asyncio.run(task())
class DummyExecutorProxy3(ExecutorBindingsProxy):
''' This is for testing the error occur in a Worker process in the Proxy. '''
def __init__(
self,
workers_kwargs,
model_world_size: int = 1,
mpi_session=None,
) -> None:
super().__init__(
workers_kwargs,
model_world_size,
mpi_session,
# The worker process will raise error, and be captured by mpi4py done handler, and redirect to
# the global error queue.
worker_cls=DummyExecutorWorker3)
DummyExecutor3 = DummyExecutorMeta("DummyExecutor3", (), {},
proxy_class=DummyExecutorProxy3)
@skip_single_gpu
@pytest.mark.parametrize("pytorch_backend", [False, True])
def test_llm_get_stats_tp2(pytorch_backend):
if pytorch_backend:
pytest.skip("https://nvbugs/5150466: Flaky hang")
return
llm_get_stats_test_harness(tp_size=2, pytorch_backend=pytorch_backend)
@skip_single_gpu
@pytest.mark.parametrize("pytorch_backend", [False, True])
def test_llm_get_stats_async_tp2(pytorch_backend):
if pytorch_backend:
pytest.skip("https://nvbugs/5150466: Flaky hang")
return
llm_get_stats_async_test_harness(tp_size=2, pytorch_backend=pytorch_backend)
def test_llm_capture_request_error():
_test_llm_capture_request_error(tp_size=2)
def test_llm_with_postprocess_parallel_tp2():
run_llm_with_postprocess_parallel(tp_size=2)
@pytest.mark.parametrize("streaming", [True, False])
@pytest.mark.parametrize("backend", [None, "pytorch"])
def test_llm_with_postprocess_parallel_and_result_handler_tp2(
streaming: bool, backend: str):
run_llm_with_postprocess_parallel_and_result_handler(streaming,
backend,
tp_size=2)
@force_ampere
@skip_single_gpu
def test_llm_get_kv_cache_events_tp2():
llm = create_llm(tensor_parallel_size=2)
sampling_params = SamplingParams(max_tokens=6, temperature=0.01)
prompt = "Hello, my name is"
_ = llm.generate(prompt, sampling_params=sampling_params)
events = llm.get_kv_cache_events(5)
# created + stored events
assert events and len(events) >= 2
for event in events:
if event:
if event[0]["event_id"] == 0:
assert event[0]["data"]["type"] == "created"
elif event[0]["event_id"] == 1:
assert event[0]["data"]["type"] == "stored"
@pytest.fixture(scope="module")
def llm_for_sampling_params_tp2():
build_config = BuildConfig(max_beam_width=3)
llm = LLM(
model=llama_model_path,
build_config=build_config,
fast_build=True,
kv_cache_config=global_kv_cache_config,
tensor_parallel_size=2,
)
yield llm
llm.shutdown()
@pytest.mark.parametrize("sampling_params",
sampling_params_for_aborting_request)
def test_llm_abort_request_tp2(llm_for_sampling_params_tp2: LLM,
sampling_params: SamplingParams):
run_llm_abort_request(llm=llm_for_sampling_params_tp2,
sampling_params=sampling_params)
if __name__ == '__main__':
#test_llm_capture_request_error()
test_llm_generate_tp2()