From 5ab0d1edec8d4aff4ae2be5b068c193736a2c40e Mon Sep 17 00:00:00 2001 From: Zheyu Fu Date: Fri, 19 Dec 2025 02:01:38 +0000 Subject: [PATCH] Fix thread leak for test_draft_len_schedule. Enhance stability for test_spec_gate. Signed-off-by: Zheyu Fu --- tensorrt_llm/llmapi/utils.py | 3 +- tests/integration/test_lists/waives.txt | 1 - .../speculative/test_draft_len_schedule.py | 20 +++--- .../_torch/speculative/test_spec_gate.py | 63 ++++++++++--------- 4 files changed, 41 insertions(+), 46 deletions(-) diff --git a/tensorrt_llm/llmapi/utils.py b/tensorrt_llm/llmapi/utils.py index bfc81f7cfd..f03e1b532f 100644 --- a/tensorrt_llm/llmapi/utils.py +++ b/tensorrt_llm/llmapi/utils.py @@ -15,7 +15,7 @@ import time import traceback import warnings import weakref -from functools import cache, wraps +from functools import wraps from pathlib import Path from queue import Queue from typing import (Any, Callable, Iterable, List, Optional, Tuple, Type, @@ -353,7 +353,6 @@ def enable_llmapi_debug() -> bool: return _enable_llmapi_debug_ -@cache def enable_worker_single_process_for_tp1() -> bool: ''' Tell whether to make worker use single process for TP1. This is helpful for return-logits performance and debugging. ''' diff --git a/tests/integration/test_lists/waives.txt b/tests/integration/test_lists/waives.txt index d8ceef7084..a1bbd30a32 100644 --- a/tests/integration/test_lists/waives.txt +++ b/tests/integration/test_lists/waives.txt @@ -361,7 +361,6 @@ accuracy/test_llm_api_autodeploy.py::TestLlama3_1_8B::test_auto_dtype[False-2] S unittest/_torch/auto_deploy/unit/multigpu/test_ad_build_small_multi.py::test_build_ad[meta-llama/Meta-Llama-3.1-8B-Instruct-llm_extra_args0-2] SKIP (https://nvbugs/5680755) examples/test_ray.py::test_ray_disaggregated_serving[tp2] SKIP (https://nvbugs/5683039) full:H100_PCIe/unittest/llmapi/test_llm_pytorch.py::test_llama_7b_multi_lora_evict_and_reload_lora_gpu_cache SKIP (https://nvbugs/5682551) -unittest/_torch/speculative/test_draft_len_schedule.py::test_correctness_across_batch_sizes[model_drafter-schedule1] SKIP (https://nvbugs/5680911) test_e2e.py::test_openai_responses SKIP (https://nvbugs/5635153) accuracy/test_llm_api_pytorch.py::TestSeedOss_36B::test_auto_dtype SKIP (https://nvbugs/5612438) accuracy/test_llm_api_autodeploy.py::TestNemotronH::test_auto_dtype[True] SKIP (https://nvbugs/5688721) diff --git a/tests/unittest/_torch/speculative/test_draft_len_schedule.py b/tests/unittest/_torch/speculative/test_draft_len_schedule.py index dc4aa57764..9dc8a7149c 100644 --- a/tests/unittest/_torch/speculative/test_draft_len_schedule.py +++ b/tests/unittest/_torch/speculative/test_draft_len_schedule.py @@ -13,23 +13,18 @@ from utils.util import similar # # ============================================================================ -# # Fixture: Force single-worker mode for all tests in this module +# # Fixture: Force single-worker mode (only for tests that use mocking) # # ============================================================================ -@pytest.fixture(scope="module", autouse=True) -def enforce_single_worker(): - """Force single-worker mode for all tests in this module.""" - import os - - os.environ["TLLM_WORKER_USE_SINGLE_PROCESS"] = "1" +@pytest.fixture(scope="function") +def enforce_single_worker(monkeypatch): + """Mock functions don't work with multiple processes, so we enforce single worker.""" + monkeypatch.setenv("TLLM_WORKER_USE_SINGLE_PROCESS", "1") yield - if "TLLM_WORKER_USE_SINGLE_PROCESS" in os.environ: - del os.environ["TLLM_WORKER_USE_SINGLE_PROCESS"] # # ============================================================================ # # test 1: Generation correctness check # # ============================================================================ -@pytest.mark.skip("https://nvbugspro.nvidia.com/bug/5680911") @pytest.mark.parametrize( "drafter_type,schedule", [ @@ -151,8 +146,9 @@ def test_correctness_across_batch_sizes(drafter_type: str, schedule: dict): ], ) @pytest.mark.high_cuda_memory -@pytest.mark.skip("https://nvbugspro.nvidia.com/bug/5680911") -def test_draft_len_schedule_functionality(drafter_type: str, draft_schedule: dict): +def test_draft_len_schedule_functionality( + enforce_single_worker, drafter_type: str, draft_schedule: dict +): if not torch.cuda.is_available(): pytest.skip("CUDA not available") diff --git a/tests/unittest/_torch/speculative/test_spec_gate.py b/tests/unittest/_torch/speculative/test_spec_gate.py index bc9e2b95f1..82d00ff853 100644 --- a/tests/unittest/_torch/speculative/test_spec_gate.py +++ b/tests/unittest/_torch/speculative/test_spec_gate.py @@ -101,42 +101,43 @@ def test_spec_gate_e2e(enforce_single_worker): llm_spec = LLM(**llm_common_config, speculative_config=spec_config) - with patch.object(SpeculationGate, 'record_avg_decoded', - mock_record_avg_decoded): - llm_spec.generate(prompts, sampling_params) + try: + with patch.object(SpeculationGate, 'record_avg_decoded', + mock_record_avg_decoded): + llm_spec.generate(prompts, sampling_params) - # Verify the mock was called (requests completed) - assert len(gate_state["record_calls"] - ) > 0, "record_avg_decoded should have been called" + # Verify the mock was called (requests completed) + assert len(gate_state["record_calls"] + ) > 0, "record_avg_decoded should have been called" - # Verify the gate was disabled after enough requests with low acceptance - assert gate_state["gate_disabled"], \ - f"Gate should have been disabled with simulated low acceptance. Calls: {gate_state['record_calls']}" + # Verify the gate was disabled after enough requests with low acceptance + assert gate_state["gate_disabled"], \ + f"Gate should have been disabled with simulated low acceptance. Calls: {gate_state['record_calls']}" - # Verify the gate triggered at the right time (after window is filled) - # The gate should trigger on the `acceptance_window`-th call (index = window - 1) - disable_indices = [ - i for i, call in enumerate(gate_state["record_calls"]) - if call["disabled_now"] - ] - assert len(disable_indices) == 1, \ - f"Gate should have triggered exactly once, but triggered at indices: {disable_indices}" - assert disable_indices[0] >= acceptance_window - 1, \ - f"Gate should trigger after window ({acceptance_window}) is filled, but triggered at index {disable_indices[0]}" + # Verify the gate triggered at the right time (after window is filled) + # The gate should trigger on the `acceptance_window`-th call (index = window - 1) + disable_indices = [ + i for i, call in enumerate(gate_state["record_calls"]) + if call["disabled_now"] + ] + assert len(disable_indices) == 1, \ + f"Gate should have triggered exactly once, but triggered at indices: {disable_indices}" + assert disable_indices[0] >= acceptance_window - 1, \ + f"Gate should trigger after window ({acceptance_window}) is filled, but triggered at index {disable_indices[0]}" - # Verify the average acceptance was below threshold when disabled - disable_call = gate_state["record_calls"][disable_indices[0]] - assert disable_call["avg_accept"] is not None - assert disable_call["avg_accept"] < acceptance_threshold, \ - f"Avg acceptance ({disable_call['avg_accept']}) should be below threshold ({acceptance_threshold})" + # Verify the average acceptance was below threshold when disabled + disable_call = gate_state["record_calls"][disable_indices[0]] + assert disable_call["avg_accept"] is not None + assert disable_call["avg_accept"] < acceptance_threshold, \ + f"Avg acceptance ({disable_call['avg_accept']}) should be below threshold ({acceptance_threshold})" - logger.debug( - f"Gate correctly triggered after {disable_indices[0] + 1} requests") - logger.debug( - f"Final avg acceptance: {disable_call['avg_accept']:.3f} < threshold {acceptance_threshold}" - ) - - llm_spec.shutdown() + logger.debug( + f"Gate correctly triggered after {disable_indices[0] + 1} requests") + logger.debug( + f"Final avg acceptance: {disable_call['avg_accept']:.3f} < threshold {acceptance_threshold}" + ) + finally: + llm_spec.shutdown() def test_returns_none_until_window_and_enabled_when_above_threshold():