mirror of
https://github.com/NVIDIA/TensorRT-LLM.git
synced 2026-01-14 06:27:45 +08:00
* fix bug 5277113. Signed-off-by: Wangshanshan <30051912+dominicshanshan@users.noreply.github.com> * fix bug 5277113 and 5278517. Signed-off-by: Wangshanshan <30051912+dominicshanshan@users.noreply.github.com> --------- Signed-off-by: Wangshanshan <30051912+dominicshanshan@users.noreply.github.com>
1194 lines
46 KiB
Python
1194 lines
46 KiB
Python
# SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
Stress test script for inference of model using TensorRT-LLM with PyTorch/TRT backend.
|
|
This script is used for stress testing inference performance using trtllm-serve and genai-perf.
|
|
"""
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from glob import glob
|
|
from typing import List, Optional, Tuple
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
import requests
|
|
import yaml
|
|
from defs.conftest import get_device_count, get_device_memory, llm_models_root
|
|
from defs.trt_test_alternative import (Popen, cleanup_process_tree, print_info,
|
|
print_warning)
|
|
|
|
# Install genai-perf in requirements-dev.txt will affect triton and pytorch version mismatch
|
|
# def genai_perf_install():
|
|
# """Ensures genai-perf is installed without affecting the global environment"""
|
|
|
|
# import os
|
|
# import subprocess
|
|
# import sys
|
|
|
|
# current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
# requirements_file = os.path.join(current_dir,
|
|
# "requirements-stress-test.txt")
|
|
|
|
# if not os.path.exists(requirements_file):
|
|
# with open(requirements_file, "w") as f:
|
|
# f.write("genai-perf\n")
|
|
|
|
# subprocess.check_call(
|
|
# [sys.executable, "-m", "pip", "install", "-r", requirements_file])
|
|
|
|
# Define a constant for process termination timeouts
|
|
GRACEFUL_TERMINATION_TIMEOUT = 300 # seconds - set longer when stress large model
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ServerConfig:
|
|
"""Dataclass to store server configuration for trtllm-serve"""
|
|
port: int = 8000
|
|
host: str = "localhost"
|
|
pp_size: int = 1
|
|
ep_size: Optional[int] = 1
|
|
max_batch_size: Optional[int] = 1024 # 2048 is default value in BuildConfig
|
|
max_num_tokens: Optional[int] = 8192 # 8192 is default value in BuildConfig
|
|
kv_cache_free_gpu_memory_fraction: Optional[
|
|
float] = 0.9 # 0.9 is default value in BuildConfig
|
|
capacity_scheduler_policy: str = "GUARANTEED_NO_EVICT"
|
|
wait_interval: int = 10 # seconds
|
|
max_wait_seconds: int = 600 # 10 mins <- Larger model need longer model loading time
|
|
health_check_timeout: float = 8 # seconds <- Make it smaller than wait_interval
|
|
|
|
@property
|
|
def url(self) -> str:
|
|
"""Get the server URL"""
|
|
return f"http://localhost:{self.port}"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ModelConfig:
|
|
"""Dataclass to store model configuration for stress tests"""
|
|
model_dir: str
|
|
tp_size: int
|
|
memory_requirement: int
|
|
backend: Optional[str] = None
|
|
|
|
def __str__(self) -> str:
|
|
model_name = os.path.basename(self.model_dir)
|
|
backend_str = f"_{self.backend}" if self.backend else ""
|
|
return f"{model_name}_tp{self.tp_size}{backend_str}"
|
|
|
|
@property
|
|
def model_name(self) -> str:
|
|
"""Extract model name from model_dir for genai-perf"""
|
|
return os.path.basename(self.model_dir)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class StressTestConfig:
|
|
"""Dataclass to store stress test configuration"""
|
|
model_config: ModelConfig
|
|
server_config: ServerConfig
|
|
# Stress test parameters for stress-test mode
|
|
# stress_time:
|
|
# Used as control parameter to get request count for stress test in stage3
|
|
stress_time: int = 180 # 3 mins default, can be overridden
|
|
# stress_timeout:
|
|
# Maximum time allowed for stress test to run; to prevent hanging tests
|
|
# Must be greater than stress_time to account for initialization, warmup, etc.
|
|
stress_timeout: int = 300 # 5 mins default, can be overridden
|
|
|
|
# Customized stress test parameters for stress-stage-alone mode
|
|
customized_stress_test: bool = True
|
|
# customized_stress_time:
|
|
# Used as control parameter to get request count for customized stress test in stage3 alone
|
|
customized_stress_time: int = 60 # 1 mins
|
|
# customized_stress_timeout:
|
|
# Maximum time allowed for customized stress test to complete
|
|
# Must be greater than customized_stress_time to account for initialization, warmup, etc prevent run indefinitely
|
|
customized_stress_timeout: int = 180 # 3 mins
|
|
customized_stress_concurrency: int = 128
|
|
customized_stress_request_rate: int = 20
|
|
|
|
@property
|
|
def request_count_stress_test(self) -> int:
|
|
"""Calculate request count for stress test"""
|
|
# Cannot set exact stress time in genai-perf test, WR is set the stress_time as customized value to get request count
|
|
stress_request_count = self.customized_stress_request_rate * self.customized_stress_time
|
|
return stress_request_count
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PerformanceParams:
|
|
"""Dataclass to store test parameters for genai-perf"""
|
|
input_len_mean: int = 64 # customized for tinyllama and llama-v3-8b-instruct-hf
|
|
input_len_std: int = 16
|
|
output_len_mean: int = 128 # customized for tinyllama and llama-v3-8b-instruct-hf
|
|
output_len_std: int = 32
|
|
# test_timeout:
|
|
# Maximum time allowed for the entire performance test to complete
|
|
# Ensure indefinite runs specially for different concurrency values
|
|
test_timeout: int = 3600 # 1 hours for tinyllama and llama-v3-8b-instruct-hf
|
|
concurrency_list: List[int] = field(
|
|
default_factory=lambda:
|
|
[8, 16, 32, 64, 128, 256, 384, 512, 640, 768, 896, 1024])
|
|
|
|
@property
|
|
def request_count_list(self) -> List[int]:
|
|
"""Calculate request count based on concurrency"""
|
|
# Keep fair amount of request count even when concurrency is low
|
|
result = []
|
|
for concurrency in self.concurrency_list:
|
|
if concurrency <= 128:
|
|
result.append(128)
|
|
else:
|
|
result.append(concurrency * 2)
|
|
return result
|
|
|
|
|
|
class RequestCounter:
|
|
"""Thread-safe counter for tracking completion requests"""
|
|
|
|
def __init__(self):
|
|
self.count = 0
|
|
self.lock = threading.Lock()
|
|
|
|
def increment(self):
|
|
with self.lock:
|
|
self.count += 1
|
|
|
|
def get_count(self):
|
|
with self.lock:
|
|
return self.count
|
|
|
|
def reset(self):
|
|
with self.lock:
|
|
self.count = 0
|
|
|
|
|
|
def filter_server_output(
|
|
pipe,
|
|
pattern_to_exclude=r'INFO: .+ - "POST /v1/completions HTTP/1.1" 200 OK',
|
|
counter=None):
|
|
"""
|
|
Filter function that reads from pipe and writes to stdout,
|
|
excluding lines that match the given pattern.
|
|
|
|
If a counter is provided, counts occurrences of the pattern.
|
|
"""
|
|
pattern = re.compile(pattern_to_exclude)
|
|
try:
|
|
for line in iter(pipe.readline, ''):
|
|
# Count matches if counter is provided
|
|
if counter is not None and pattern.search(line):
|
|
counter.increment()
|
|
|
|
# Print lines that don't match the pattern
|
|
if not pattern.search(line):
|
|
print(line, end='', flush=True)
|
|
except (BrokenPipeError, IOError, ValueError) as e:
|
|
print_warning(f"Pipe error in filter_server_output: {str(e)}")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def launch_process(cmd,
|
|
start_new_session=True,
|
|
filter_pattern=None,
|
|
request_counter=None):
|
|
"""
|
|
Context manager to handle process execution and filter output.
|
|
|
|
Args:
|
|
cmd: Command list to execute
|
|
start_new_session: Whether to start the process in a new session
|
|
filter_pattern: Optional regex pattern to exclude from output
|
|
request_counter: Optional counter to track requests
|
|
|
|
Yields:
|
|
The process object
|
|
"""
|
|
process = None
|
|
stdout_reader = None
|
|
stderr_reader = None
|
|
|
|
try:
|
|
# Only create pipes if we plan to filter output
|
|
if filter_pattern:
|
|
process = Popen(
|
|
cmd,
|
|
start_new_session=start_new_session,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
bufsize=1, # Line buffered
|
|
universal_newlines=True) # Text mode
|
|
|
|
print_info(f"Process started with PID: {process.pid}")
|
|
|
|
# Start threads to filter and process output
|
|
stdout_reader = threading.Thread(
|
|
target=filter_server_output,
|
|
args=(process.stdout, filter_pattern, request_counter),
|
|
daemon=True # Make sure thread doesn't block program exit
|
|
)
|
|
stdout_reader.start()
|
|
|
|
stderr_reader = threading.Thread(
|
|
target=filter_server_output,
|
|
args=(process.stderr, filter_pattern, request_counter),
|
|
daemon=True # Make sure thread doesn't block program exit
|
|
)
|
|
stderr_reader.start()
|
|
else:
|
|
process = Popen(cmd,
|
|
start_new_session=start_new_session,
|
|
stdout=None,
|
|
stderr=None)
|
|
|
|
print_info(f"Process started with PID: {process.pid}")
|
|
|
|
yield process
|
|
finally:
|
|
if process:
|
|
print_info(f"Stopping process with PID: {process.pid}")
|
|
if process.poll() is None:
|
|
# Send termination signal
|
|
process.terminate()
|
|
|
|
try:
|
|
process.wait(timeout=GRACEFUL_TERMINATION_TIMEOUT)
|
|
print_info("Process terminated gracefully")
|
|
except subprocess.TimeoutExpired:
|
|
# Process didn't exit within timeout, force kill
|
|
print_warning(
|
|
"Process did not terminate gracefully, force killing..."
|
|
)
|
|
cleanup_process_tree(process, has_session=True)
|
|
print_info("Process killed forcefully")
|
|
|
|
if process.stdout:
|
|
process.stdout.close()
|
|
if process.stderr:
|
|
process.stderr.close()
|
|
|
|
|
|
def get_model_path(model_dir: str) -> str:
|
|
"""Get the full path to a model using llm_models_root"""
|
|
return os.path.join(llm_models_root(), model_dir)
|
|
|
|
|
|
def check_server_health(server_url: str,
|
|
timeout: float = 10.0) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Check if the server is healthy by making a request to its health endpoint.
|
|
|
|
Args:
|
|
server_url: The base URL of the server
|
|
timeout: Timeout in seconds for the health check request
|
|
|
|
Returns:
|
|
A tuple of (is_healthy, error_message)
|
|
- is_healthy: True if server is healthy, False otherwise
|
|
- error_message: None if healthy, error message string if not
|
|
"""
|
|
try:
|
|
# Increase timeout if needed
|
|
response = requests.get(f"{server_url}/health", timeout=timeout)
|
|
|
|
if response.status_code == 200:
|
|
return True, None
|
|
else:
|
|
return False, f"Server health check failed with status code: {response.status_code}"
|
|
except requests.RequestException as e:
|
|
return False, f"Server health check failed: {str(e)}"
|
|
except Exception as e:
|
|
return False, f"Unexpected error during health check: {str(e)}"
|
|
|
|
|
|
@pytest.mark.parametrize("test_mode", ["stress-test", "stress-stage-alone"],
|
|
ids=lambda x: x)
|
|
@pytest.mark.parametrize("backend", ["trt", "pytorch"], ids=lambda x: x)
|
|
@pytest.mark.parametrize("capacity_scheduler_policy",
|
|
["GUARANTEED_NO_EVICT", "MAX_UTILIZATION"],
|
|
ids=lambda x: x)
|
|
@pytest.mark.parametrize("stress_time_timeout", [(180, 300), (300, 450),
|
|
(600, 900), (3600, 5400)],
|
|
ids=lambda x: f"stress_time_{x[0]}s_timeout_{x[1]}s")
|
|
@pytest.mark.parametrize(
|
|
"config",
|
|
[
|
|
# Configuration for TinyLlama model
|
|
ModelConfig(model_dir="llama-models-v2/TinyLlama-1.1B-Chat-v1.0",
|
|
tp_size=1,
|
|
memory_requirement=12),
|
|
# Configuration for Llama-v3 model
|
|
ModelConfig(model_dir="llama-models-v3/llama-v3-8b-instruct-hf",
|
|
tp_size=1,
|
|
memory_requirement=12),
|
|
# Configuration for DeepSeek-V3 model
|
|
ModelConfig(model_dir="DeepSeek-V3", tp_size=8, memory_requirement=96),
|
|
],
|
|
ids=lambda x: f"{os.path.basename(x.model_dir)}_tp{x.tp_size}")
|
|
def test_run_stress_test(config, stress_time_timeout, backend,
|
|
capacity_scheduler_policy, test_mode):
|
|
"""Run the stress test with the provided configuration, backend, and test mode.
|
|
|
|
This test function calls the stress_test function with the given parameters.
|
|
The function should start with test_ prefix to be recognized as a test function by pytest.
|
|
|
|
Args:
|
|
config: Model configuration for the test (injected by pytest.mark.parametrize)
|
|
stress_time_timeout: Tuple of (stress_time, stress_timeout) in seconds
|
|
backend: Backend to use ("trt" or "pytorch")
|
|
capacity_scheduler_policy: Scheduler policy ("GUARANTEED_NO_EVICT", "MAX_UTILIZATION")
|
|
test_mode: Test mode ("stress-test" or "stress-stage-alone")
|
|
"""
|
|
# Create a new ModelConfig with the backend parameter
|
|
# Convert 'trt' to None as expected by the ModelConfig
|
|
backend_param = None if backend == "trt" else backend
|
|
|
|
new_config = ModelConfig(model_dir=config.model_dir,
|
|
tp_size=config.tp_size,
|
|
memory_requirement=config.memory_requirement,
|
|
backend=backend_param)
|
|
|
|
# Extract stress_time and stress_timeout from the tuple
|
|
stress_time, stress_timeout = stress_time_timeout
|
|
|
|
# Initialize server config with specified capacity scheduler policy
|
|
server_config = ServerConfig(
|
|
capacity_scheduler_policy=capacity_scheduler_policy)
|
|
|
|
# Call the existing stress_test function with the new config and test mode
|
|
stress_test(new_config, test_mode, server_config, stress_time,
|
|
stress_timeout)
|
|
|
|
|
|
def stress_test(config,
|
|
test_mode,
|
|
server_config=None,
|
|
stress_time=None,
|
|
stress_timeout=None):
|
|
"""Test LLM model performance using trtllm-serve and genai-perf.
|
|
|
|
This function supports multiple testing modes controlled by the --test-mode option:
|
|
- "stress-test": Runs the measure capacity stage first, then the stress stage,
|
|
using the same server instance.
|
|
- "stress-stage-alone": Performs only the stress stage with customized
|
|
stress_concurrency and calculated request count.
|
|
|
|
Args:
|
|
config: Model configuration for the test (injected by pytest.mark.parametrize)
|
|
test_mode: Test mode from the --test-mode option
|
|
("stress-test" or "stress-stage-alone")
|
|
server_config: Optional server configuration to use, if None a default
|
|
will be created
|
|
stress_time: Optional stress time in seconds, overrides the default in StressTestConfig
|
|
stress_timeout: Optional stress timeout in seconds, overrides the default in StressTestConfig
|
|
"""
|
|
# Ensure genai-perf is installed
|
|
# genai_perf_install()
|
|
# Import genai-perf - needed after installation to make sure it's available
|
|
# import genai_perf # noqa: F401
|
|
|
|
# Test mode handling - determine which tests to run
|
|
if test_mode == "stress-test":
|
|
run_performance = True
|
|
run_stress = True
|
|
elif test_mode == "stress-stage-alone":
|
|
run_performance = False
|
|
run_stress = True
|
|
else:
|
|
pytest.skip(f"Skipping test for unsupported mode: {test_mode}. "
|
|
f"Supported modes: stress-test, stress-stage-alone")
|
|
return
|
|
|
|
# Skip if not enough GPU memory
|
|
if get_device_memory() < config.memory_requirement:
|
|
pytest.skip(
|
|
f"Not enough GPU memory. Required: {config.memory_requirement}GB")
|
|
|
|
# Skip if not enough GPUs for tensor parallelism
|
|
if get_device_count() < config.tp_size:
|
|
pytest.skip(f"Not enough GPUs. Required: {config.tp_size}")
|
|
|
|
# Get full model path
|
|
model_path = get_model_path(config.model_dir)
|
|
model_name = config.model_name
|
|
|
|
# Initialize server config that will be used for all tests if not provided
|
|
test_server_config = server_config if server_config is not None else ServerConfig(
|
|
)
|
|
|
|
# Define test configurations
|
|
performance_config = None
|
|
if run_performance:
|
|
performance_config = PerformanceParams()
|
|
|
|
# For DeepSeek-V3 specific parameters
|
|
if "DeepSeek-V3" in config.model_dir:
|
|
performance_config = PerformanceParams(
|
|
test_timeout=
|
|
36000 # 10 hours for DeepSeek-V3, change this value if needed
|
|
)
|
|
|
|
# For DeepSeek-V3 specific server parameters
|
|
if "DeepSeek-V3" in config.model_dir:
|
|
test_server_config = ServerConfig(
|
|
port=test_server_config.port,
|
|
host=test_server_config.host,
|
|
pp_size=test_server_config.pp_size,
|
|
ep_size=8, # DeepSeek-V3 specific ep_size
|
|
max_batch_size=161, # DeepSeek-V3 specific max_batch_size
|
|
max_num_tokens=1160, # DeepSeek-V3 specific max_num_tokens
|
|
kv_cache_free_gpu_memory_fraction=
|
|
0.7, # DeepSeek-V3 specific kv_cache fraction
|
|
capacity_scheduler_policy=test_server_config.
|
|
capacity_scheduler_policy,
|
|
wait_interval=test_server_config.wait_interval,
|
|
max_wait_seconds=14400, # DeepSeek-V3 specific wait time (4 hours)
|
|
health_check_timeout=test_server_config.health_check_timeout)
|
|
|
|
# Create a StressTestConfig with customized time parameters if provided
|
|
if run_stress:
|
|
stress_config = StressTestConfig(model_config=config,
|
|
server_config=test_server_config)
|
|
|
|
# Override stress_time and stress_timeout if provided
|
|
if stress_time is not None:
|
|
stress_config = StressTestConfig(
|
|
model_config=config,
|
|
server_config=test_server_config,
|
|
stress_time=stress_time,
|
|
stress_timeout=stress_timeout
|
|
if stress_timeout is not None else stress_time * 2)
|
|
else:
|
|
stress_config = None
|
|
|
|
# Check if server is already running
|
|
is_healthy, _ = check_server_health(test_server_config.url,
|
|
test_server_config.health_check_timeout)
|
|
if is_healthy:
|
|
raise RuntimeError(
|
|
f"Server is already running at {test_server_config.url}. Please stop it manually before running the stress test."
|
|
)
|
|
|
|
# Start server
|
|
print_info("Starting trtllm-serve server...")
|
|
print_info(f"Model path: {model_path}")
|
|
|
|
# Verify that model path exists
|
|
if not os.path.exists(model_path):
|
|
raise RuntimeError(f"Model path does not exist: {model_path}")
|
|
|
|
# Create a temporary YAML file for extra_llm_options
|
|
extra_llm_options = {
|
|
"scheduler_config": {
|
|
"capacity_scheduler_policy":
|
|
test_server_config.capacity_scheduler_policy
|
|
},
|
|
"pytorch_backend_config": {
|
|
"enable_overlap_scheduler": True,
|
|
},
|
|
}
|
|
|
|
# Add DeepSeek-V3 specific configuration
|
|
if "DeepSeek-V3" in config.model_dir:
|
|
|
|
extra_llm_options["enable_attention_dp"] = True
|
|
|
|
if config.backend == "pytorch":
|
|
extra_llm_options["pytorch_backend_config"] = {
|
|
"use_cuda_graph": True,
|
|
"cuda_graph_padding_enabled": True,
|
|
"cuda_graph_batch_sizes":
|
|
[1, 2, 4, 8, 16, 32, 64, 128, 256, 384],
|
|
"print_iter_log": True,
|
|
"enable_overlap_scheduler": True
|
|
}
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml',
|
|
delete=False) as temp_file:
|
|
yaml.dump(extra_llm_options, temp_file)
|
|
extra_llm_options_path = temp_file.name
|
|
|
|
# Build server command
|
|
server_cmd = [
|
|
"trtllm-serve",
|
|
model_path,
|
|
"--port",
|
|
str(test_server_config.port),
|
|
"--host",
|
|
test_server_config.host,
|
|
"--tp_size",
|
|
str(config.tp_size),
|
|
"--pp_size",
|
|
str(test_server_config.pp_size),
|
|
]
|
|
|
|
# Only add ep_size parameter if it's not None
|
|
if test_server_config.ep_size is not None:
|
|
server_cmd.extend(["--ep_size", str(test_server_config.ep_size)])
|
|
|
|
# Add remaining parameters
|
|
server_cmd.extend([
|
|
"--max_batch_size",
|
|
str(test_server_config.max_batch_size),
|
|
"--max_num_tokens",
|
|
str(test_server_config.max_num_tokens),
|
|
"--kv_cache_free_gpu_memory_fraction",
|
|
str(test_server_config.kv_cache_free_gpu_memory_fraction),
|
|
"--extra_llm_api_options",
|
|
extra_llm_options_path,
|
|
])
|
|
|
|
# Add backend option only if specified
|
|
# backend = None means trt backend
|
|
# backend = pytorch means pytorch backend
|
|
if config.backend:
|
|
server_cmd.extend(["--backend", config.backend])
|
|
|
|
# Log the command we're about to run
|
|
print_info(f"Running command: {' '.join(server_cmd)}")
|
|
|
|
try:
|
|
# Create a request counter to track completions
|
|
request_counter = RequestCounter()
|
|
|
|
# Start server with the launch_process context manager and filtered output
|
|
# HTTP access log pattern to filter out
|
|
http_log_pattern = r'INFO: .+ - "POST /v1/completions HTTP/1.1" 200 OK'
|
|
with launch_process(server_cmd,
|
|
start_new_session=True,
|
|
filter_pattern=http_log_pattern,
|
|
request_counter=request_counter) as server_process:
|
|
server_pid = server_process.pid
|
|
print_info(f"Server started with PID: {server_pid}")
|
|
|
|
# Wait for server to initialize
|
|
print_info("Waiting for server to initialize...")
|
|
server_ready = False
|
|
for wait_sec in range(0, test_server_config.max_wait_seconds,
|
|
test_server_config.wait_interval):
|
|
deadline = time.time() + test_server_config.wait_interval
|
|
is_healthy, error_msg = check_server_health(
|
|
test_server_config.url,
|
|
test_server_config.health_check_timeout)
|
|
|
|
if is_healthy:
|
|
print_info(f"Server is ready after {wait_sec} seconds!")
|
|
server_ready = True
|
|
break
|
|
else:
|
|
if wait_sec >= test_server_config.max_wait_seconds - test_server_config.wait_interval:
|
|
print_warning(error_msg)
|
|
|
|
# Check if process is still running
|
|
if server_process.poll() is not None:
|
|
print_warning(
|
|
f"ERROR: Server process died. Exit code: {server_process.returncode}"
|
|
)
|
|
try:
|
|
# Try to get process stderr if available
|
|
stderr_output = server_process.stderr.read(
|
|
) if server_process.stderr else "No stderr available"
|
|
print_warning(f"Server stderr output: {stderr_output}")
|
|
except Exception:
|
|
pass
|
|
raise RuntimeError(
|
|
f"Server process died. Exit code: {server_process.returncode}"
|
|
)
|
|
|
|
print_info(
|
|
f"Still waiting for server... ({wait_sec} seconds elapsed)")
|
|
|
|
time.sleep(max(0, deadline - time.time()))
|
|
|
|
# Final check if we didn't already confirm server is ready
|
|
if not server_ready:
|
|
is_healthy, error_msg = check_server_health(
|
|
test_server_config.url,
|
|
test_server_config.health_check_timeout)
|
|
if not is_healthy:
|
|
print_warning(
|
|
f"ERROR: Server failed to start properly after {test_server_config.max_wait_seconds} seconds."
|
|
)
|
|
raise RuntimeError(f"Server failed to start: {error_msg}")
|
|
|
|
# Run performance tests only if server is healthy
|
|
print_info(
|
|
f"Server is running with model {model_name}. Starting tests...")
|
|
|
|
# Run performance test first if enabled
|
|
stage2_output = None # Initialize stage2_output to None
|
|
if run_performance:
|
|
print_info("=== Running STAGE 1 PERFORMANCE TEST ===")
|
|
measure_capacity_stage(model_name,
|
|
model_path,
|
|
test_server_config,
|
|
performance_config,
|
|
request_counter=request_counter)
|
|
print_info("=== Running STAGE 2 ANALYSIS ===")
|
|
stage2_output = extract_stress_test_metrics(
|
|
current_model=model_name)
|
|
print_info(f"Stage 2 output: {stage2_output}")
|
|
print_info("=== Running STAGE 3 STRESS TEST ===")
|
|
stress_stage(model_name,
|
|
model_path,
|
|
test_server_config,
|
|
stress_config,
|
|
stage2_output,
|
|
request_counter=request_counter)
|
|
|
|
# Then run stress test if enabled (will run after performance test if both are enabled)
|
|
if run_stress and not run_performance: # Only run here if not already run above
|
|
print_info(
|
|
"=== Running STAGE 3 STRESS TEST WITH CUSTOMIZED PARAMETERS ==="
|
|
)
|
|
stress_stage(model_name,
|
|
model_path,
|
|
test_server_config,
|
|
stress_config,
|
|
None,
|
|
request_counter=request_counter)
|
|
finally:
|
|
# Clean up temp yaml file
|
|
if os.path.exists(extra_llm_options_path):
|
|
os.unlink(extra_llm_options_path)
|
|
|
|
|
|
def create_genai_perf_command(model_name,
|
|
model_path,
|
|
request_count,
|
|
concurrency,
|
|
input_len_mean=PerformanceParams.input_len_mean,
|
|
input_len_std=PerformanceParams.input_len_std,
|
|
output_len_mean=PerformanceParams.output_len_mean,
|
|
output_len_std=PerformanceParams.output_len_std,
|
|
warmup_request_count=10):
|
|
"""
|
|
Create a command list for genai-perf with standardized parameters.
|
|
|
|
Args:
|
|
model_name: Name of the model
|
|
model_path: Path to the model
|
|
request_count: Number of requests to send
|
|
concurrency: Number of concurrent requests
|
|
input_len_mean: Mean input length
|
|
input_len_std: Standard deviation of input length
|
|
output_len_mean: Mean output length
|
|
output_len_std: Standard deviation of output length
|
|
warmup_request_count: Number of warmup requests
|
|
|
|
Returns:
|
|
List of command-line arguments for genai-perf
|
|
"""
|
|
return [
|
|
"genai-perf",
|
|
"profile",
|
|
"-m",
|
|
model_name,
|
|
"--tokenizer",
|
|
model_path,
|
|
"--endpoint-type",
|
|
"completions",
|
|
"--random-seed",
|
|
"123",
|
|
"--synthetic-input-tokens-mean",
|
|
str(input_len_mean),
|
|
"--synthetic-input-tokens-stddev",
|
|
str(input_len_std),
|
|
"--output-tokens-mean",
|
|
str(output_len_mean),
|
|
"--output-tokens-stddev",
|
|
str(output_len_std),
|
|
"--request-count",
|
|
str(request_count),
|
|
"--concurrency",
|
|
str(concurrency),
|
|
"--warmup-request-count",
|
|
str(warmup_request_count),
|
|
"--verbose",
|
|
]
|
|
|
|
|
|
def run_genai_perf_process(cmd,
|
|
test_start_time,
|
|
test_timeout,
|
|
server_config,
|
|
request_counter=None):
|
|
"""
|
|
Run a genai-perf process and monitor both the process and server health.
|
|
|
|
Args:
|
|
cmd: Command list to execute genai-perf
|
|
test_start_time: Start time of the test
|
|
test_timeout: Timeout for the test in seconds
|
|
server_config: Server configuration object
|
|
request_counter: Optional counter to track requests
|
|
|
|
Returns:
|
|
Boolean indicating whether the process completed successfully
|
|
"""
|
|
# Start genai-perf process with our context manager
|
|
with launch_process(cmd,
|
|
start_new_session=True,
|
|
filter_pattern=None,
|
|
request_counter=request_counter) as process:
|
|
# Set monitoring parameters
|
|
last_health_check = time.time()
|
|
process_completed = False
|
|
|
|
# Monitor both the server and genai-perf process
|
|
while process.poll() is None:
|
|
current_time = time.time()
|
|
|
|
# Check if genai-perf is still running but exceeded timeout
|
|
elapsed_time = current_time - test_start_time
|
|
if elapsed_time > test_timeout:
|
|
cleanup_process_tree(process, has_session=True)
|
|
raise RuntimeError(
|
|
f"genai-perf test timed out after {test_timeout} seconds")
|
|
|
|
# Check server health periodically
|
|
if current_time - last_health_check > server_config.health_check_timeout:
|
|
|
|
is_healthy, error_msg = check_server_health(
|
|
server_config.url, server_config.health_check_timeout)
|
|
|
|
if is_healthy:
|
|
print_info(
|
|
f"Server health check passed after {elapsed_time:.1f} seconds of test"
|
|
)
|
|
else:
|
|
# Raise an exception to stop the test
|
|
print_warning(f"Server health check failed: {error_msg}")
|
|
cleanup_process_tree(process, has_session=True)
|
|
raise RuntimeError(
|
|
f"Server health check failed during test: {error_msg}")
|
|
|
|
# Update last health check time
|
|
last_health_check = current_time
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Check final status of genai-perf process
|
|
retcode = process.poll()
|
|
if retcode is not None:
|
|
if retcode != 0:
|
|
cleanup_process_tree(process, has_session=True)
|
|
raise RuntimeError(
|
|
f"genai-perf exited with non-zero code: {retcode}")
|
|
else:
|
|
print_info("genai-perf completed successfully")
|
|
process_completed = True
|
|
else:
|
|
cleanup_process_tree(process, has_session=True)
|
|
raise RuntimeError(
|
|
"genai-perf did not complete normally, will terminate")
|
|
|
|
return process_completed
|
|
|
|
|
|
def measure_capacity_stage(model_name,
|
|
model_path,
|
|
server_config,
|
|
performance_params,
|
|
request_counter=None):
|
|
"""Run performance test with multiple concurrency levels"""
|
|
total_start_time = time.time()
|
|
total_tests = len(performance_params.concurrency_list)
|
|
completed_tests = 0
|
|
test_times = []
|
|
|
|
print("Test Parameters (constant for all runs):")
|
|
print("----------------------------------------")
|
|
print(f"Input Length Mean: {performance_params.input_len_mean}")
|
|
print(f"Input Length Std: {performance_params.input_len_std}")
|
|
print(f"Output Length Mean: {performance_params.output_len_mean}")
|
|
print(f"Output Length Std: {performance_params.output_len_std}")
|
|
print(f"Test Timeout: {performance_params.test_timeout} seconds")
|
|
print("----------------------------------------")
|
|
|
|
# Reset the counter before starting tests
|
|
if request_counter:
|
|
request_counter.reset()
|
|
|
|
# Iterate through concurrency levels and corresponding request counts
|
|
for test_index, (concurrency, request_count) in enumerate(
|
|
zip(performance_params.concurrency_list,
|
|
performance_params.request_count_list)):
|
|
test_start_time = time.time()
|
|
|
|
print_info(
|
|
f"Running test {test_index+1}/{total_tests}: concurrency={concurrency}, request_count={request_count}"
|
|
)
|
|
|
|
# Prepare genai-perf command
|
|
cmd = create_genai_perf_command(
|
|
model_name=model_name,
|
|
model_path=model_path,
|
|
request_count=request_count,
|
|
concurrency=concurrency,
|
|
input_len_mean=performance_params.input_len_mean,
|
|
input_len_std=performance_params.input_len_std,
|
|
output_len_mean=performance_params.output_len_mean,
|
|
output_len_std=performance_params.output_len_std,
|
|
warmup_request_count=10)
|
|
|
|
# Run genai-perf process
|
|
process_completed = run_genai_perf_process(
|
|
cmd, test_start_time, performance_params.test_timeout,
|
|
server_config, request_counter)
|
|
|
|
# Increment completed tests counter if the process completed successfully
|
|
if process_completed:
|
|
completed_tests += 1
|
|
|
|
test_end_time = time.time()
|
|
duration = int(test_end_time - test_start_time)
|
|
print_info(
|
|
f"Test {test_index+1}/{total_tests} completed in {duration} seconds"
|
|
)
|
|
test_times.append((concurrency, request_count, duration))
|
|
|
|
total_time = time.time() - total_start_time
|
|
|
|
# Print summary
|
|
print("\n========== Performance Test Summary ==========")
|
|
print(f"Total tests run: {total_tests}")
|
|
print(f"Successfully completed tests: {completed_tests}")
|
|
print(f"Total time spent: {format_time(int(total_time))}")
|
|
print("\nDetailed test times:")
|
|
print("Concurrency Request Count Time Spent")
|
|
print("----------------------------------------")
|
|
for concurrency, request_count, duration in test_times:
|
|
print(
|
|
f"{concurrency:10d} {request_count:12d} {format_time(duration)}")
|
|
|
|
if request_counter:
|
|
print(
|
|
f"Total successful completion requests: {request_counter.get_count()}"
|
|
)
|
|
|
|
|
|
def stress_stage(model_name,
|
|
model_path,
|
|
server_config,
|
|
stress_config,
|
|
stage2_output=None,
|
|
request_counter=None):
|
|
"""Run a single stress test with the configured parameters"""
|
|
# Validate inputs
|
|
if not model_name or not model_path:
|
|
raise ValueError("model_name and model_path must be provided")
|
|
|
|
if not os.path.exists(model_path):
|
|
raise ValueError(f"Model path does not exist: {model_path}")
|
|
|
|
# Determine stress test parameters
|
|
if stage2_output is None:
|
|
if stress_config.customized_stress_test:
|
|
# Use customized parameters when stage2_output is None but customized test is enabled
|
|
stress_concurrency = stress_config.customized_stress_concurrency
|
|
request_count = stress_config.request_count_stress_test
|
|
test_timeout = stress_config.customized_stress_timeout
|
|
else:
|
|
raise ValueError(
|
|
"stage2_output is required when not using customized stress test"
|
|
)
|
|
else:
|
|
if model_name not in stage2_output:
|
|
raise ValueError(f"No data for model {model_name} in stage2_output")
|
|
|
|
model_results = stage2_output[model_name]
|
|
stress_concurrency = model_results["concurrency"]
|
|
stress_request_rate = model_results["request_rate"]
|
|
stress_time = stress_config.stress_time
|
|
# Ensure request_count is an integer by using int() conversion
|
|
request_count = int(stress_request_rate * stress_time)
|
|
test_timeout = stress_config.stress_timeout
|
|
|
|
print_info(
|
|
f"Running stress test with concurrency={stress_concurrency}, request_count={request_count}"
|
|
)
|
|
|
|
test_start_time = time.time()
|
|
|
|
# Reset the counter before starting the stress test
|
|
if request_counter:
|
|
request_counter.reset()
|
|
|
|
# Prepare genai-perf command
|
|
cmd = create_genai_perf_command(
|
|
model_name=model_name,
|
|
model_path=model_path,
|
|
request_count=request_count,
|
|
concurrency=stress_concurrency,
|
|
input_len_mean=PerformanceParams.input_len_mean,
|
|
input_len_std=PerformanceParams.input_len_std,
|
|
output_len_mean=PerformanceParams.output_len_mean,
|
|
output_len_std=PerformanceParams.output_len_std,
|
|
warmup_request_count=10)
|
|
|
|
# Start genai-perf process
|
|
process_completed = run_genai_perf_process(cmd, test_start_time,
|
|
test_timeout, server_config,
|
|
request_counter)
|
|
|
|
test_end_time = time.time()
|
|
duration = int(test_end_time - test_start_time)
|
|
|
|
# Now print the counter results after the test has completed
|
|
if request_counter:
|
|
print(
|
|
f"Total successful completion requests: {request_counter.get_count()}"
|
|
)
|
|
|
|
print_info(
|
|
f"Stress test completed in {duration} seconds. Success: {process_completed}"
|
|
)
|
|
|
|
# Display summary for stress test
|
|
if process_completed:
|
|
print("\n========== Stress Test Summary ==========")
|
|
print(f"Model: {model_name}")
|
|
print(f"Concurrency: {stress_concurrency}")
|
|
print(f"Request Count: {request_count}")
|
|
print(f"Time Spent: {format_time(duration)}")
|
|
|
|
|
|
# Helper function to format time
|
|
def format_time(seconds: int) -> str:
|
|
"""Format time in seconds to a human-readable string"""
|
|
minutes, seconds = divmod(seconds, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
if hours > 0:
|
|
return f"{hours}h {minutes}m {seconds}s"
|
|
elif minutes > 0:
|
|
return f"{minutes}m {seconds}s"
|
|
else:
|
|
return f"{seconds}s"
|
|
|
|
|
|
def extract_stress_test_metrics(artifacts_dir="./artifacts",
|
|
current_model=None):
|
|
"""
|
|
Extract stress test metrics from the artifacts directory
|
|
|
|
Args:
|
|
artifacts_dir (str): Path to the artifacts directory
|
|
current_model (str, optional): If provided, only analyze artifacts for this model
|
|
"""
|
|
# Find all profile_export_genai_perf.json files in the artifacts directory
|
|
json_files = glob(os.path.join(artifacts_dir,
|
|
"**/profile_export_genai_perf.json"),
|
|
recursive=True)
|
|
|
|
if not json_files:
|
|
raise RuntimeError(
|
|
"No profile_export_genai_perf.json files found in the artifacts directory"
|
|
)
|
|
|
|
# Get a list of directory names in the artifacts directory
|
|
directories = [
|
|
d for d in os.listdir(artifacts_dir)
|
|
if os.path.isdir(os.path.join(artifacts_dir, d))
|
|
]
|
|
|
|
# Extract model names from directory names (before "-openai-completions-")
|
|
model_name_map = {}
|
|
for directory in directories:
|
|
if "-openai-completions-" in directory:
|
|
model_name = directory.split("-openai-completions-")[0]
|
|
model_name_map[directory] = model_name
|
|
print(f"Found model: {model_name} in directory: {directory}")
|
|
|
|
if not model_name_map and current_model:
|
|
raise RuntimeError(
|
|
f"No model directories found with the expected naming pattern for model: {current_model}"
|
|
)
|
|
|
|
# Initialize a list to store metrics
|
|
output_token_throughput = []
|
|
concurrency = []
|
|
request_throughput = []
|
|
model_name = []
|
|
|
|
# Process each JSON file
|
|
for json_file in json_files:
|
|
try:
|
|
# Extract the directory containing the JSON file
|
|
# Get the first directory name after artifacts_dir
|
|
rel_path = os.path.relpath(json_file, artifacts_dir)
|
|
first_dir = rel_path.split(os.sep)[0]
|
|
|
|
# Skip this file if it's not from the current model we're analyzing
|
|
if current_model and first_dir in model_name_map:
|
|
if model_name_map[first_dir] != current_model:
|
|
continue
|
|
|
|
print(f"Processing {json_file}")
|
|
|
|
with open(json_file, "r") as f:
|
|
results = json.load(f)
|
|
|
|
reqThroughput = results.get("request_throughput",
|
|
{}).get("avg", 0)
|
|
tokThroughput = results.get("output_token_throughput",
|
|
{}).get("avg", 0)
|
|
conCurrency = results.get("input_config", {}).get(
|
|
"perf_analyzer", {}).get("stimulus",
|
|
{}).get("concurrency", 0)
|
|
|
|
# Try to determine model name from directory structure first
|
|
if first_dir in model_name_map:
|
|
modelName = model_name_map[first_dir]
|
|
else:
|
|
# Fall back to model name from JSON if we can't extract from directory
|
|
modelName = results.get("input_config",
|
|
{}).get("model", ["unknown"])
|
|
modelName = modelName[0] if isinstance(modelName,
|
|
list) else modelName
|
|
|
|
# Check that values are valid before appending
|
|
if reqThroughput and tokThroughput and conCurrency and modelName:
|
|
request_throughput.append(reqThroughput)
|
|
output_token_throughput.append(tokThroughput)
|
|
concurrency.append(conCurrency)
|
|
model_name.append(modelName)
|
|
else:
|
|
raise ValueError(
|
|
f"Please check {json_file} due to missing or invalid metrics"
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {json_file}: {e}")
|
|
|
|
# Check if we have any valid data
|
|
if not model_name:
|
|
if current_model:
|
|
raise RuntimeError(
|
|
f"No valid data extracted for model: {current_model}")
|
|
else:
|
|
raise RuntimeError("No valid data extracted from the JSON files")
|
|
|
|
# Create a DataFrame to store the metrics
|
|
metrics_df = pd.DataFrame({
|
|
"Model": model_name,
|
|
"Concurrency": concurrency,
|
|
"RequestThroughput": request_throughput,
|
|
"OutputTokenThroughput": output_token_throughput
|
|
})
|
|
|
|
# Sort by Model, Concurrency, and RequestThroughput
|
|
metrics_df = metrics_df.sort_values(
|
|
by=["Model", "Concurrency", "RequestThroughput"])
|
|
|
|
print("\n========== Stress Test Metrics Summary ==========")
|
|
print(metrics_df.to_string(index=False))
|
|
|
|
# Define the high performance threshold
|
|
throughput_threshold = 0.5 # value range [0,1], 0.95 maybe too high, suggest use 0.5
|
|
concurrency_no_gain_count_threshold = 5 # change this value per different model and throughput threshold
|
|
high_perf_results = {}
|
|
|
|
# Calculate normalized throughput for each model
|
|
normalized_df = metrics_df.copy()
|
|
|
|
for model_name in normalized_df["Model"].unique():
|
|
# Get min and max values from model
|
|
model_data = normalized_df[normalized_df["Model"] == model_name]
|
|
min_val = model_data["OutputTokenThroughput"].min()
|
|
max_val = model_data["OutputTokenThroughput"].max()
|
|
|
|
range_val = max_val - min_val
|
|
if range_val == 0:
|
|
raise ValueError(
|
|
"Please check OutputTokenThroughput from genai-perf")
|
|
else:
|
|
normalized_df.loc[
|
|
normalized_df["Model"] == model_name,
|
|
"NormalizedThroughput"] = (
|
|
(normalized_df.loc[normalized_df["Model"] == model_name,
|
|
"OutputTokenThroughput"] - min_val) /
|
|
range_val)
|
|
|
|
# Find rows where normalized throughput exceeds threshold
|
|
high_perf_rows = normalized_df[(normalized_df["Model"] == model_name)
|
|
& (normalized_df["NormalizedThroughput"]
|
|
> throughput_threshold)].sort_values(
|
|
by="Concurrency")
|
|
|
|
high_perf_indices = high_perf_rows.index.tolist()
|
|
|
|
# Rule setup to get the highest throughput point
|
|
if len(high_perf_rows) >= concurrency_no_gain_count_threshold:
|
|
optimized_idx = high_perf_indices[
|
|
-concurrency_no_gain_count_threshold]
|
|
optimized_row = normalized_df.loc[optimized_idx]
|
|
|
|
high_perf_results[model_name] = {
|
|
"concurrency":
|
|
int(optimized_row["Concurrency"]),
|
|
"normalized_throughput":
|
|
float(optimized_row["NormalizedThroughput"]),
|
|
"throughput":
|
|
float(optimized_row["OutputTokenThroughput"]),
|
|
"request_rate":
|
|
float(optimized_row["RequestThroughput"])
|
|
}
|
|
elif len(high_perf_rows) > 0:
|
|
optimized_idx = high_perf_indices[0]
|
|
optimized_row = normalized_df.loc[optimized_idx]
|
|
|
|
high_perf_results[model_name] = {
|
|
"concurrency":
|
|
int(optimized_row["Concurrency"]),
|
|
"normalized_throughput":
|
|
float(optimized_row["NormalizedThroughput"]),
|
|
"throughput":
|
|
float(optimized_row["OutputTokenThroughput"]),
|
|
"request_rate":
|
|
float(optimized_row["RequestThroughput"]),
|
|
"note":
|
|
f"Only {len(high_perf_indices)} values above threshold {throughput_threshold}, using the first one"
|
|
}
|
|
else:
|
|
raise ValueError(
|
|
f"No high performance point found for {model_name}")
|
|
|
|
# Print the normalized results
|
|
print(
|
|
"\n========== Normalized Token Throughput by Model and Concurrency =========="
|
|
)
|
|
print(normalized_df[[
|
|
"Model", "Concurrency", "OutputTokenThroughput", "NormalizedThroughput"
|
|
]].to_string(index=False))
|
|
# Print the high performance results
|
|
print(
|
|
f"\n========== High Performance Results (Threshold = {throughput_threshold}) =========="
|
|
)
|
|
for model, results in high_perf_results.items():
|
|
print(f"\nModel: {model}")
|
|
for key, value in results.items():
|
|
print(f"{key}: {value}")
|
|
|
|
# Return the high performance concurrency values to potentially use for stress testing
|
|
return high_perf_results
|