[ci] small multigpu speedups (#5643)

Signed-off-by: Omer Ullman Argov <118735753+omera-nv@users.noreply.github.com>
This commit is contained in:
Omer Ullman Argov 2025-07-03 15:06:10 +03:00 committed by GitHub
parent dccbfc8b1e
commit c72856188c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 124 additions and 139 deletions

View File

@ -15,7 +15,7 @@ l0_dgx_h100:
auto_trigger: others
tests:
# ------------- PyTorch tests ---------------
- unittest/_torch/multi_gpu TIMEOUT (90)
- unittest/_torch/multi_gpu -m "not post_merge" TIMEOUT (90)
- unittest/_torch/auto_deploy/unit/multigpu
- unittest/llmapi/test_llm_multi_gpu_pytorch.py -m "gpu4 or gpu2"
- accuracy/test_llm_api_pytorch.py::TestLlama3_1_8BInstruct::test_bfloat16_4gpus[tp4-attn_backend=TRTLLM-torch_compile=False]

View File

@ -89,6 +89,7 @@ l0_dgx_h200:
- accuracy/test_llm_api_pytorch.py::TestLlama3_1_8BInstruct::test_guided_decoding_4gpus[llguidance]
- test_e2e.py::test_trtllm_bench_llmapi_launch[pytorch_backend-llama-v3-llama3-8b]
- test_e2e.py::test_trtllm_bench_mgmn
- unittest/_torch/multi_gpu -m "post_merge" TIMEOUT (90)
- condition:
ranges:
system_gpu_count:

View File

@ -6,7 +6,6 @@ import cloudpickle
import pytest
import torch
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
from torch import nn
import tensorrt_llm
@ -21,6 +20,9 @@ MPI.pickle.__init__(
pickle.HIGHEST_PROTOCOL,
)
# needed since we reuse the mpi executor pool, first test running will leak a thread
pytestmark = pytest.mark.threadleak(enabled=False)
def run_single_rank(tensor_parallel_size, single_rank_forward_func, input,
weights, vocab_size, hidden_size, dtype):
@ -193,92 +195,81 @@ def row_lm_head_forward(x, vocab_size, hidden_size, dtype, tensor_parallel_size,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("vocab_size", [128, 127],
ids=["balanced", "unbalanced"])
def test_column_embedding(vocab_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_column_embedding(vocab_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 10
hidden_size = 16
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
input = torch.randint(0, vocab_size, (seq_len, ))
weight = torch.randn((vocab_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_embedding_forward, input,
weight, vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_embedding_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("hidden_size", [16, 15],
ids=["balanced", "unbalanced"])
def test_row_embedding(hidden_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_row_embedding(hidden_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 2
vocab_size = 128
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
input = torch.randint(0, vocab_size, (seq_len, ))
weight = torch.randn((vocab_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_embedding_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_embedding_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("vocab_size", [128, 127],
ids=["balanced", "unbalanced"])
def test_column_lm_head(vocab_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_column_lm_head(vocab_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 10
hidden_size = 16
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
input = torch.randn((seq_len, hidden_size), dtype=dtype)
weight = torch.randn((vocab_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_lm_head_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_lm_head_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("hidden_size", [16, 15],
ids=["balanced", "unbalanced"])
def test_row_lm_head(hidden_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_row_lm_head(hidden_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 2
vocab_size = 128
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
input = torch.randn((seq_len, hidden_size), dtype=dtype)
weight = torch.randn((vocab_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_lm_head_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True
if __name__ == '__main__':
test_column_embedding(128)
test_column_embedding(127)
test_row_embedding(16)
test_row_embedding(15)
test_column_lm_head(128)
test_column_lm_head(127)
test_row_lm_head(16)
test_row_lm_head(15)
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_lm_head_forward, input, weight,
vocab_size, hidden_size, dtype)] * 2))
for r in results:
assert r is True

View File

@ -6,7 +6,6 @@ import cloudpickle
import pytest
import torch
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
from torch import nn
import tensorrt_llm
@ -21,6 +20,9 @@ MPI.pickle.__init__(
pickle.HIGHEST_PROTOCOL,
)
# needed since we reuse the mpi executor pool, first test running will leak a thread
pytestmark = pytest.mark.threadleak(enabled=False)
def rms_norm(x: torch.Tensor, weight: torch.Tensor = None, eps: float = 1e-6):
y = x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + eps)
@ -246,100 +248,88 @@ def row_linear_norm_fusion_forward(x, hidden_size, dtype, tensor_parallel_size,
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
def test_mlp():
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_mlp(mpi_pool_executor):
torch.manual_seed(42)
seq_len = 2
hidden_size = 16
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
x = torch.randn((seq_len, hidden_size), dtype=dtype)
l0_weight = torch.randn((4 * hidden_size, hidden_size), dtype=dtype)
l1_weight = torch.randn((hidden_size, 4 * hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, mlp_forward, x,
[l0_weight, l1_weight], hidden_size, dtype)] * 2))
for r in results:
assert r is True
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, mlp_forward, x, [l0_weight, l1_weight],
hidden_size, dtype)] * 2))
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("hidden_size", [128, 127],
ids=["balanced", "unbalanced"])
def test_column_linear(hidden_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_column_linear(hidden_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 10
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
x = torch.randn((seq_len, hidden_size), dtype=dtype)
l0_weight = torch.randn((hidden_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_linear_forward, x, [l0_weight],
hidden_size, dtype)] * 2))
if hidden_size % 2 != 0:
with pytest.raises(AssertionError):
for r in results:
assert r is True
else:
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, column_linear_forward, x, [l0_weight],
hidden_size, dtype)] * 2))
if hidden_size % 2 != 0:
with pytest.raises(AssertionError):
for r in results:
assert r is True
else:
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("hidden_size", [16, 15],
ids=["balanced", "unbalanced"])
def test_row_linear(hidden_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_row_linear(hidden_size, mpi_pool_executor):
torch.manual_seed(42)
seq_len = 2
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
x = torch.randn((seq_len, hidden_size), dtype=dtype)
l0_weight = torch.randn((hidden_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_linear_forward, x, [l0_weight],
hidden_size, dtype)] * 2))
if hidden_size % 2 != 0:
with pytest.raises(AssertionError):
for r in results:
assert r is True
else:
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_linear_forward, x, [l0_weight],
hidden_size, dtype)] * 2))
if hidden_size % 2 != 0:
with pytest.raises(AssertionError):
for r in results:
assert r is True
else:
for r in results:
assert r is True
@pytest.mark.skipif(torch.cuda.device_count() < 2,
reason='needs 2 GPUs to run this test')
@pytest.mark.parametrize("seq_len", [2, 32], ids=lambda x: f"seqlen:{x}")
@pytest.mark.parametrize("hidden_size", [16, 256], ids=lambda x: f"hidden:{x}")
def test_row_linear_norm_fusion(seq_len, hidden_size):
@pytest.mark.parametrize("mpi_pool_executor", [2], indirect=True)
def test_row_linear_norm_fusion(seq_len, hidden_size, mpi_pool_executor):
torch.manual_seed(42)
dtype = torch.bfloat16
tensor_parallel_size = 2
tensor_parallel_size = mpi_pool_executor.num_workers
x = torch.randn((seq_len, hidden_size), dtype=dtype)
l0_weight = torch.randn((hidden_size, hidden_size), dtype=dtype)
with MPIPoolExecutor(max_workers=tensor_parallel_size) as executor:
results = executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_linear_norm_fusion_forward, x,
[l0_weight], hidden_size, dtype)] * 2))
for r in results:
assert r is True
if __name__ == '__main__':
test_column_linear(128)
test_column_linear(127)
test_row_linear(16)
test_row_linear(15)
test_mlp()
test_row_linear_norm_fusion(32, 256)
test_row_linear_norm_fusion(32, 16)
test_row_linear_norm_fusion(2, 16)
test_row_linear_norm_fusion(2, 256)
results = mpi_pool_executor.map(
run_single_rank,
*zip(*[(tensor_parallel_size, row_linear_norm_fusion_forward, x,
[l0_weight], hidden_size, dtype)] * 2))
for r in results:
assert r is True

View File

@ -13,6 +13,7 @@ from tensorrt_llm.models.modeling_utils import QuantAlgo, QuantConfig
MAX_SEQ_LEN = 4096 + 1024
@pytest.mark.post_merge
@pytest.mark.parametrize("backend", ["pytorch"])
@pytest.mark.parametrize("model_name",
["llama-models-v3/Llama-3-8B-Instruct-Gradient-1048k"],

View File

@ -1289,7 +1289,7 @@ def test_executor_lookahead_decoding_config():
assert sampling_params.lookahead_config.max_verification_set_size == 8
def llama_v2_13b_lora_test_harness(**llm_kwargs):
def llama_v2_13b_lora_from_dir_test_harness(**llm_kwargs):
# Shahar- perhaps disable build config
hf_model_dir = get_model_path("llama-models-v2/llama-v2-13b-hf")
hf_lora_dir = get_model_path("llama-models-v2/chinese-llama-2-lora-13b")
@ -1321,7 +1321,7 @@ def llama_v2_13b_lora_test_harness(**llm_kwargs):
assert similar(output.outputs[0].text, ref)
def llama_7b_multi_lora_test_harness(**llm_kwargs):
def llama_7b_multi_lora_from_request_test_harness(**llm_kwargs):
hf_model_dir = get_model_path("llama-models/llama-7b-hf")
hf_lora_dir1 = get_model_path("llama-models/luotuo-lora-7b-0.1")
hf_lora_dir2 = get_model_path("llama-models/Japanese-Alpaca-LoRA-7b-v0")
@ -1376,12 +1376,12 @@ def llama_7b_multi_lora_test_harness(**llm_kwargs):
@skip_gpu_memory_less_than_40gb
def test_llama_v2_13b_lora():
llama_v2_13b_lora_test_harness()
llama_v2_13b_lora_from_dir_test_harness()
@skip_gpu_memory_less_than_40gb
def test_llama_7b_multi_lora():
llama_7b_multi_lora_test_harness(max_loras=1, max_cpu_loras=8)
llama_7b_multi_lora_from_request_test_harness(max_loras=1, max_cpu_loras=8)
def llama_v2_7b_prompt_adapter_test_harness(**llm_kwargs):

View File

@ -21,9 +21,10 @@ from .test_llm import (
DummyError, DummyExecutorWorker3, _test_llm_capture_request_error,
_test_llm_generate_async, check_llm_return_context_logits,
check_llm_return_generation_logits, llm_return_logprobs_test_harness,
default_model_name, get_model_path, llama_7b_multi_lora_test_harness,
llama_model_path, llama_v2_7b_prompt_adapter_test_harness,
llama_v2_13b_lora_test_harness, llm_check_output,
default_model_name, get_model_path,
llama_7b_multi_lora_from_request_test_harness, llama_model_path,
llama_v2_7b_prompt_adapter_test_harness,
llama_v2_13b_lora_from_dir_test_harness, llm_check_output,
llm_get_stats_async_test_harness, llm_get_stats_test_harness,
llm_test_harness, mixtral_model_name, prompts, test_llm_api_eagle,
tinyllama_logits_processor_test_harness, run_llm_with_postprocess_parallel,
@ -253,17 +254,18 @@ def test_tinyllama_logits_processor_tp2pp2():
@pytest.mark.gpu2
@pytest.mark.part3
def test_llama_v2_13b_lora_tp2():
llama_v2_13b_lora_test_harness(tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
llama_v2_13b_lora_from_dir_test_harness(
tensor_parallel_size=2, kv_cache_config=global_kv_cache_config)
@pytest.mark.gpu2
@pytest.mark.part3
def test_llama_7b_multi_lora_tp2():
llama_7b_multi_lora_test_harness(tensor_parallel_size=2,
max_loras=1,
max_cpu_loras=8,
kv_cache_config=global_kv_cache_config)
llama_7b_multi_lora_from_request_test_harness(
tensor_parallel_size=2,
max_loras=1,
max_cpu_loras=8,
kv_cache_config=global_kv_cache_config)
@pytest.mark.skip(reason="https://nvbugs/5362426")

View File

@ -3,8 +3,8 @@ import pytest
# isort: off
from .test_llm import tinyllama_logits_processor_test_harness
from tensorrt_llm.llmapi import KvCacheConfig
from .test_llm_pytorch import (llama_v2_13b_lora_test_harness,
llama_7b_multi_lora_test_harness)
from .test_llm_pytorch import (llama_7b_lora_from_dir_test_harness,
llama_7b_multi_lora_from_request_test_harness)
# isort: on
@ -28,12 +28,12 @@ def test_tinyllama_logits_processor_2gpu(tp_size: int, pp_size: int):
@pytest.mark.gpu2
def test_llama_v2_13b_lora_tp2():
llama_v2_13b_lora_test_harness(tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
def test_llama_7b_lora_tp2():
llama_7b_lora_from_dir_test_harness(tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
@pytest.mark.gpu2
def test_llama_7b_multi_lora_tp2():
llama_7b_multi_lora_test_harness(tensor_parallel_size=2,
kv_cache_config=global_kv_cache_config)
llama_7b_multi_lora_from_request_test_harness(
tensor_parallel_size=2, kv_cache_config=global_kv_cache_config)

View File

@ -128,25 +128,23 @@ def test_llm_with_postprocess_parallel_and_result_handler(streaming):
tp_size=1)
def llama_v2_13b_lora_test_harness(**llm_kwargs) -> None:
lora_config = LoraConfig(lora_dir=[
f"{llm_models_root()}/llama-models-v2/chinese-llama-2-lora-13b"
],
max_lora_rank=64)
llm = LLM(model=f"{llm_models_root()}/llama-models-v2/llama-v2-13b-hf",
def llama_7b_lora_from_dir_test_harness(**llm_kwargs) -> None:
lora_config = LoraConfig(
lora_dir=[f"{llm_models_root()}/llama-models/luotuo-lora-7b-0.1"],
max_lora_rank=8)
llm = LLM(model=f"{llm_models_root()}/llama-models/llama-7b-hf",
lora_config=lora_config,
**llm_kwargs)
prompts = [
"今天天气很好,我到公园的时候,",
"美国的首都在哪里? \n答案:",
]
references = [
"发现公园里到处都是人,有的在跑步,有的在打羽毛球,还有",
"美国的首都是华盛顿。\n\n美国",
]
sampling_params = SamplingParams(max_tokens=20, add_special_tokens=False)
sampling_params = SamplingParams(max_tokens=20)
lora_req = LoRARequest(
"task-0", 0,
f"{llm_models_root()}/llama-models-v2/chinese-llama-2-lora-13b")
"task-0", 0, f"{llm_models_root()}/llama-models/luotuo-lora-7b-0.1")
lora_request = [lora_req]
outputs = llm.generate(prompts, sampling_params, lora_request=lora_request)
@ -154,7 +152,7 @@ def llama_v2_13b_lora_test_harness(**llm_kwargs) -> None:
assert similar(outputs[0].outputs[0].text, references[0])
def llama_7b_multi_lora_test_harness(**llm_kwargs) -> None:
def llama_7b_multi_lora_from_request_test_harness(**llm_kwargs) -> None:
hf_model_dir = f"{llm_models_root()}/llama-models/llama-7b-hf"
hf_lora_dir1 = f"{llm_models_root()}/llama-models/luotuo-lora-7b-0.1"
hf_lora_dir2 = f"{llm_models_root()}/llama-models/Japanese-Alpaca-LoRA-7b-v0"
@ -164,6 +162,7 @@ def llama_7b_multi_lora_test_harness(**llm_kwargs) -> None:
# (2) provide a lora_dir to infer the lora_target_modules.
lora_config = LoraConfig(lora_target_modules=['attn_q', 'attn_k', 'attn_v'],
max_lora_rank=8)
llm = LLM(hf_model_dir, lora_config=lora_config, **llm_kwargs)
prompts = [
@ -194,8 +193,8 @@ def llama_7b_multi_lora_test_harness(**llm_kwargs) -> None:
@skip_gpu_memory_less_than_40gb
def test_llama_v2_13b_lora():
llama_v2_13b_lora_test_harness()
def test_llama_7b_lora():
llama_7b_lora_from_dir_test_harness()
@skip_gpu_memory_less_than_40gb
@ -224,7 +223,7 @@ def test_llama_7b_lora_default_modules() -> None:
@skip_gpu_memory_less_than_40gb
def test_llama_7b_multi_lora():
llama_7b_multi_lora_test_harness()
llama_7b_multi_lora_from_request_test_harness()
# TODO smor: currently Nemotron-Super-49B-v1 with LoRA memory consumption is overly high

View File

@ -18,3 +18,4 @@ markers =
part4
gpu2: this test uses 2 GPUs
gpu4: this test uses 4 GPUs
post_merge: this test should only run in post merge