[None][feat] Add performance alignment to layer-wise benchmarks (#11018)

Signed-off-by: Tailing Yuan <yuantailing@gmail.com>
This commit is contained in:
Tailing Yuan 2026-01-29 14:01:51 +08:00 committed by GitHub
parent 34a730aaf7
commit 91528365a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1908 additions and 153 deletions

View File

@ -171,6 +171,121 @@ You will receive three reports, each containing kernel timing statistics grouped
2. A CSV report at `profiles/report_np4_rank0.csv`
3. An HTML report at `profiles/report_np4_rank0.html`
## Performance alignment between end-to-end performance and layer-wise benchmarks
An overall example can be found in `sample_performance_alignment.sh`. Here is an abstract of the main steps.
1. Run end-to-end serving in **COLLECT** mode, and capture nsys profiles. This step generates a calibration file.
Please meet the following requirements.
1. Add the following fields to `config.yaml`.
```yaml
layer_wise_benchmarks_config:
calibration_mode: COLLECT
calibration_file_path: profiles/calibration_data.json
```
2. Set `TLLM_PROFILE_START_STOP` to a range that can capture some iterations (typically tens of iterations) of GEN phase. Ensure every iteration has the same batch size. Please capture 5 more iterations at beginning, because the first 5 iterations are regarded as warm-ups and will be dropped by the parser by default.
3. Capture per-rank nsys profiles, and every rank should produce a separate file.
You need to put `nsys profile` behind `mpirun` or `srun`. To minimize profile overhead and file size, there is no need to capture samples and GPU metrics.
If you use `trtllm-serve` or `trtllm-bench`, please follow the following command order. If you use `examples/disaggregated/slurm/benchmark/submit.py`, setting `gen_profile_range` is enough.
```bash
NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \
nsys profile \
-t cuda,nvtx \
--cpuctxsw none --cuda-event-trace false \
--cuda-graph-trace node \
-c cudaProfilerApi --capture-range-end stop \
-o profiles/report_e2e_collect_rank%q{RANK}.nsys-rep \
--force-overwrite true \
trtllm-llmapi-launch \
trtllm-bench \
--model ...
```
4. To be more precise, set the same `TLLM_AUTOTUNER_CACHE_PATH` for all the steps. The autotuner cache file should be generated by Step 1, and be reused by Step 2 and Step 3.
2. If the end-to-end serving uses CUDA Graphs, run Step 1 again in **MARK** mode without CUDA Graphs, and also capture nsys profiles.
The differences are as follows.
1. Add the following fields to `config.yaml`.
```yaml
cuda_graph_config: null
layer_wise_benchmarks_config:
calibration_mode: MARK
```
2. Change the paths of profiles. The recommended argument is `-o profiles/report_e2e_mark_rank%q{RANK}.nsys-rep`.
3. Run layer-wise benchmarks with the calibration file obtained by Step 1.
```bash
NP=4 ./mpi_launch.sh ./run.sh config_gen.yaml \
--model "$LLM_MODELS_ROOT/DeepSeek-R1/DeepSeek-R1-0528-FP4-v2" \
--load-format AUTO \
--layer-indices 5,6,7 \
--batch-size 32 \
--seq-len-q 1 \
--seq-len-kv-cache 2090 \
--balance-method NotModified \
--replay-file-path profiles/calibration_data.json \
--replay-start 47 \
--replay-stop 67
```
Here are explanations of every argument:
| Argument/Parameter | Explanation |
|-------------------|-------------|
| `NP=4` | Should match the end-to-end run. |
| `--load-format AUTO` | Instruct the benchmark to load model weights instead of initializing random weights. |
| `--layer-indices 5,6,7` | A list of contiguous layers you want to calibrate. |
| `--batch-size 32` | Should match the end-to-end run. |
| `--seq-len-q 1` | Should match (1+MTP) of the end-to-end run. |
| `--seq-len-kv-cache 2090` | Estimation of the average context length for iterations you captured. The first 5 iterations should be excluded from the estimation, because they will be dropped by parser. |
| `--replay-file-path` | The calibration file obtained by Step 1. |
| `--replay-start` and `--replay-stop` | Should match the end-to-end `TLLM_PROFILE_START_STOP`. Do not replay the first 5 iterations, because they will be dropped by parser. |
4. Parse end-to-end profiles with `parse_e2e.py`, and parse layer-wise benchmarks profiles with `parse.py`.
```bash
seq 0 $((NP - 1)) | xargs -I% python3 parse_e2e.py \
--eager-trace profiles/report_e2e_mark_rank%.nsys-rep \
--graph-trace profiles/report_e2e_collect_rank%.nsys-rep \
--layer-indices 5,6,7 \
--warmup-times 5 \
-o profiles/report_e2e_collect_rank%.json
seq 0 $((NP - 1)) | xargs -I% python3 parse.py \
--world-size $NP \
--rank %
```
5. Run `correlation.py` to generate the correlation report.
```bash
python3 correlation.py \
--reference profiles/report_e2e_collect_rank0.json \
$(seq 1 $((NP - 1)) | xargs -I% echo "--target profiles/report_e2e_collect_rank%.json") \
$(seq 0 $((NP - 1)) | xargs -I% echo "--target profiles/report_np${NP}_rank%.json") \
-o profiles/correlation.html
```
Please find `profiles/correlation.html` for the report.
Limitations:
1. Pipeline parallelism is not supported.
2. MoE backends CUTLASS and WIDEEP are supported.
3. Only tested with GEN phase and attention DP.
## Developer utilities
1. Less startup time when debug a model

View File

@ -0,0 +1,96 @@
import argparse
import json
from pathlib import Path
import jinja2
from parser_utils import kernel_short_name, shortest_common_supersequence
# Parse cmdline
parser = argparse.ArgumentParser()
parser.add_argument("--reference", type=str, required=True)
parser.add_argument("--target", action="append", type=str, required=True)
parser.add_argument("--output", "-o", type=str, required=True)
args = parser.parse_args()
print(args)
with open(args.reference) as f:
ref_data_all = json.load(f)
if len(ref_data_all) != 1:
raise ValueError("Ambiguous reference data")
ref_data = ref_data_all[0]
ref_kernel_names = [o["name"] for o in ref_data["timeline"]]
data = []
data.append(
{
"series": f"reference: {ref_data['name']}",
"points": [
{
"x": i + 1,
"name": kernel_short_name(o["name"]),
"duration": o["duration"] / 1000,
"end": o["end"] / 1000,
}
for i, o in enumerate(ref_data["timeline"])
],
}
)
for target_id, target in enumerate(args.target):
with open(target) as f:
tgt_data_all = json.load(f)
for timeline_id, tgt_data in enumerate(tgt_data_all):
tgt_kernel_names = [o["name"] for o in tgt_data["timeline"]]
sup_kernel_names = shortest_common_supersequence(ref_kernel_names, tgt_kernel_names)
x_sup = []
j = 0
for sup_kernel_name in sup_kernel_names:
if j < len(ref_kernel_names) and sup_kernel_name == ref_kernel_names[j]:
x_sup.append(j + 1)
j += 1
else:
x_sup.append(None)
print(f"target {target_id} timeline {timeline_id} {x_sup=}")
x_tgt = []
j = 0
for tgt_kernel_name in tgt_kernel_names:
while sup_kernel_names[j] != tgt_kernel_name:
j += 1
x_tgt.append(x_sup[j])
j += 1
if x_tgt[0] is None:
x_tgt[0] = min(1, min(x for x in x_tgt if x is not None) - 1)
if x_tgt[-1] is None:
x_tgt[-1] = max(len(ref_kernel_names), max(x for x in x_tgt if x is not None) + 1)
top = 0
while top < len(x_tgt) - 1:
next_top = top + 1
while x_tgt[next_top] is None:
next_top += 1
for i in range(top + 1, next_top):
x_tgt[i] = x_tgt[top] + (x_tgt[next_top] - x_tgt[top]) * (i - top) / (
next_top - top
)
top = next_top
print(f"target {target_id} timeline {timeline_id} {x_tgt=}")
data.append(
{
"series": f"target{target_id}: {tgt_data['name']}",
"points": [
{
"x": x,
"name": kernel_short_name(o["name"]),
"duration": o["duration"] / 1000,
"end": o["end"] / 1000,
}
for x, o in zip(x_tgt, tgt_data["timeline"])
],
}
)
loader = jinja2.FileSystemLoader(Path(__file__).parent)
template = jinja2.Environment(loader=loader).get_template("correlation_template.html")
with open(args.output, "w") as f:
f.write(template.render(rawData=data))

View File

@ -0,0 +1,152 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Correlation</title>
<script
src="https://cdn.jsdelivr.net/npm/echarts@6.0.0/dist/echarts.min.js"
integrity="sha384-F07Cpw5v8spSU0H113F33m2NQQ/o6GqPTnTjf45ssG4Q6q58ZwhxBiQtIaqvnSpR"
crossorigin="anonymous">
</script>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
display: flex;
flex-direction: column;
align-items: center;
}
.chart-container {
width: 95%;
max-width: 1600px;
min-width: 800px;
height: 800px;
background-color: white;
box-shadow: 0 4px 8px rgba(0,0,0,0.1);
border-radius: 8px;
margin-bottom: 30px;
padding: 20px;
box-sizing: border-box;
}
h1 {
text-align: center;
color: #333;
}
</style>
</head>
<body>
<h1>CUDA Kernel Correlation</h1>
<div id="durationChart" class="chart-container"></div>
<div id="endChart" class="chart-container"></div>
<script type="text/javascript">
const rawData = {{ rawData }};
const referenceData = rawData.find(s => s.series.startsWith('reference:'));
const xLabelMap = {};
let xMax = -Infinity;
referenceData.points.forEach(p => {
xLabelMap[p.x] = p.name;
if (p.x > xMax) xMax = p.x;
});
const xAxisMin = 0;
const xAxisMax = Math.ceil(xMax) + 1;
function getOption(title, valueKey) {
const seriesList = rawData.map(item => {
return {
name: item.series,
type: 'line',
symbolSize: 5,
data: item.points.map(p => {
return [p.x, p[valueKey], p.name];
})
};
});
return {
title: {
text: title,
left: 'center',
top: 5
},
tooltip: {
trigger: 'item',
formatter: function (params) {
return `<b>${params.seriesName}</b><br/>` +
`Kernel: ${params.data[2]}<br/>` +
`X Index: ${params.data[0].toFixed(2)}<br/>` +
`${valueKey}: ${params.data[1].toFixed(1)}`;
}
},
legend: {
data: rawData.map(d => d.series),
top: 40,
left: 'center'
},
grid: {
left: '3%',
right: '4%',
top: 100,
bottom: 60,
containLabel: true
},
xAxis: {
type: 'value',
min: xAxisMin,
max: xAxisMax,
interval: 1,
axisLabel: {
formatter: function (value) {
const label = xLabelMap[value] || '';
return label.length > 30 ? label.substring(0, 30) + '...' : label;
},
rotate: 60,
fontSize: 11,
hideOverlap: false
},
splitLine: {
lineStyle: {
type: 'dashed'
}
}
},
yAxis: {
type: 'value',
name: valueKey,
scale: true
},
dataZoom: [
{
type: 'slider',
show: true,
start: 0,
end: 100,
bottom: 10,
height: 25
},
{
type: 'inside',
start: 0,
end: 100
}
],
series: seriesList
};
}
const durationChart = echarts.init(document.getElementById('durationChart'));
durationChart.setOption(getOption('Kernel Duration', 'duration'));
const endChart = echarts.init(document.getElementById('endChart'));
endChart.setOption(getOption('Kernel End Time', 'end'));
window.addEventListener('resize', function() {
durationChart.resize();
endChart.resize();
});
</script>
</body>
</html>

View File

@ -0,0 +1,10 @@
#!/bin/bash
set -euo pipefail
export WORLD_SIZE=$OMPI_COMM_WORLD_SIZE
export RANK=$OMPI_COMM_WORLD_RANK
export LOCAL_RANK=$OMPI_COMM_WORLD_LOCAL_RANK
export NODE_RANK=$OMPI_COMM_WORLD_NODE_RANK
"$@"

View File

@ -4,13 +4,17 @@ import csv
import json
import re
import sqlite3
import subprocess
import sys
from pathlib import Path
import jinja2
import numpy as np
import pandas as pd
from parser_utils import (
kernel_short_name,
lazy_convert_sqlite,
shortest_common_supersequence,
warned_names,
)
# Parse cmdline
parser = argparse.ArgumentParser()
@ -32,66 +36,6 @@ if (args.file_path is None) == (args.world_size is None):
parser.error("Please specify exactly one of --file-path and --world-size.")
print(args)
def lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path):
if (
not sqlite_file_path.is_file()
or nsys_rep_file_path.stat().st_mtime > sqlite_file_path.stat().st_mtime
):
subprocess.check_call(
[
"nsys",
"export",
"--type",
"sqlite",
"-o",
sqlite_file_path,
"--force-overwrite=true",
nsys_rep_file_path,
]
)
def shortest_common_supersequence(a, b):
# Merge two lists into their shortest common supersequence,
# so that both `a` and `b` are subsequences of the result.
# Uses dynamic programming to compute the shortest common supersequence, then reconstructs it.
m, n = len(a), len(b)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
dp[i][0] = i
for j in range(n + 1):
dp[0][j] = j
for i in range(1, m + 1):
for j in range(1, n + 1):
if a[i - 1] == b[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
else:
dp[i][j] = min(dp[i - 1][j] + 1, dp[i][j - 1] + 1)
# Backtrack to build the merged sequence
res = []
i, j = m, n
while i > 0 and j > 0:
if a[i - 1] == b[j - 1]:
res.append(a[i - 1])
i -= 1
j -= 1
elif dp[i - 1][j] < dp[i][j - 1]:
res.append(a[i - 1])
i -= 1
else:
res.append(b[j - 1])
j -= 1
while i > 0:
res.append(a[i - 1])
i -= 1
while j > 0:
res.append(b[j - 1])
j -= 1
res.reverse()
return res
if args.file_path is not None:
nsys_rep_file_path = Path(args.file_path)
if not nsys_rep_file_path.name.endswith(".nsys-rep"):
@ -106,6 +50,9 @@ csv_file_path = nsys_rep_file_path.parent / (nsys_rep_file_path.name[: -len(".ns
html_file_path = nsys_rep_file_path.parent / (
nsys_rep_file_path.name[: -len(".nsys-rep")] + ".html"
)
json_file_path = nsys_rep_file_path.parent / (
nsys_rep_file_path.name[: -len(".nsys-rep")] + ".json"
)
lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path)
conn = sqlite3.connect(f"file:{sqlite_file_path}?mode=ro", uri=True)
@ -133,7 +80,7 @@ for start, text in df.itertuples(index=False):
problem_start.append(start)
problem_set.append(
{
"spec": json.loads(text[len("layer_wise_benchmarks problem_spec") :]),
"spec": json.loads(text[len("layer_wise_benchmarks problem_spec ") :]),
"text": "",
"runs": [],
"runs_end": [],
@ -145,15 +92,19 @@ for start, text in df.itertuples(index=False):
query = """SELECT T1.start, T1.end, T2.value AS text
FROM NVTX_EVENTS AS T1
JOIN StringIds AS T2 ON T1.textId = T2.id
WHERE eventType = ? AND T2.value NOT LIKE ? AND T2.value NOT LIKE ? AND domainId != ?"""
WHERE eventType = ? AND T2.value NOT LIKE ? AND domainId != ?"""
df = pd.read_sql_query(
query,
conn,
params=(event_id_NvtxPushPopRange, "layer_wise_benchmarks %", "[DG]%", nccl_domain_id),
params=(event_id_NvtxPushPopRange, "[DG]%", nccl_domain_id),
)
for start, end, text in df.itertuples(index=False):
problem_id = bisect.bisect(problem_start, start) - 1
assert problem_id != -1
if text.startswith("layer_wise_benchmarks "):
if text != "layer_wise_benchmarks ignore":
continue
else:
assert problem_id != -1
if re.match(r"b=\d+ s=\d+ ", text):
problem_set[problem_id]["text"] = text
problem_set[problem_id]["runs"].append(start)
@ -216,7 +167,9 @@ for (
for range_id in ranges:
problem["kernel_count_per_range"][range_id] += 1
range_names = [problem["ranges"][i][2] for i in ranges]
if args.module is None or args.module in range_names:
if (
args.module is None or args.module in range_names
) and "layer_wise_benchmarks ignore" not in range_names:
kernel_list.append(
(
problem_id,
@ -235,6 +188,7 @@ for (
query = "SELECT * FROM StringIds"
df = pd.read_sql_query(query, conn)
string_ids = dict(zip(df["id"], df["value"]))
string_ids.update({-2: "Memcpy", -3: "Memset"})
conn.close()
@ -275,75 +229,13 @@ for problem_id in range(len(kernels)):
seq = [demangledName for demangledName, _, _, _ in kernels[problem_id][run_id]]
assert seq == required_seq
parser_keywords = [
("cuBLASGemm", "nvjet"),
("cutlassGroupGemm", "cutlass::device_kernel<cutlass::gemm::kernel::GemmUniversal"),
("cutlassGemm", "GemmUniversal"),
("CuteDSLMoePermute", "cute_dsl::moePermuteKernel"),
(
"CuteDSLGemm",
["cute_dsl_kernels", "blockscaled_gemm_persistent"],
),
(
"CuteDSLGroupedGemmSwiglu",
["cute_dsl_kernels", "blockscaled_contiguous_grouped_gemm_swiglu_fusion"],
),
(
"CuteDSLGroupedGemmFinalize",
["cute_dsl_kernels", "blockscaled_contiguous_grouped_gemm_finalize_fusion"],
),
("torchAdd", "at::native::CUDAFunctorOnSelf_add"),
("torchAdd", "CUDAFunctor_add"),
("torchClamp", "at::native::<unnamed>::launch_clamp_scalar("),
("torchCompare", "at::native::<unnamed>::CompareFunctor<"),
("torchCopy", "at::native::bfloat16_copy_kernel_cuda"),
("torchCopy", "at::native::direct_copy_kernel_cuda("),
("torchFill", "at::native::FillFunctor"),
("torchIndexPut", "at::native::index_put_kernel_impl<"),
("torchMul", "at::native::binary_internal::MulFunctor<"),
("torchPow", "at::native::<unnamed>::pow_tensor_scalar_kernel_impl<"),
("torchReduceSum", ["at::native::reduce_kernel<", "at::native::sum_functor<"]),
("torchSigmoid", "at::native::sigmoid_kernel_cuda"),
("torchWhere", "at::native::<unnamed>::where_kernel_impl("),
]
warned_names = set()
def parse_kernel_name(demangledName):
if demangledName == -2:
return "Memcpy"
if demangledName == -3:
return "Memset"
name = string_ids[demangledName]
for dst, src in parser_keywords:
if not isinstance(src, (tuple, list)):
src = [src]
if all(keyword in name for keyword in src):
return dst
if re.search(r"at::native::.*elementwise_kernel<", name):
if name not in warned_names:
print(f"Not parsed torch kernel name: {name}", file=sys.stderr)
warned_names.add(name)
assert "!unnamed!" not in name
name = name.replace("<unnamed>", "!unnamed!")
if "<" in name:
name = name[: name.index("<")]
if "(" in name:
name = name[: name.index("(")]
if "::" in name:
name = name[name.rindex("::") + 2 :]
name = name.replace("!unnamed!", "<unnamed>")
return name
converted_seqs = []
warmup_times = run_args["warmup_times"] if args.warmup_times is None else args.warmup_times
for runs in kernels:
warmup_times = run_args["warmup_times"] if args.warmup_times is None else args.warmup_times
converted_seq = []
# Kernel time
for i, (demangledName, _, _, ranges) in enumerate(runs[0]):
name = parse_kernel_name(demangledName)
name = kernel_short_name(string_ids[demangledName])
category = (*ranges, name)
time_list = [run[i][2] - run[i][1] for run in runs]
t = np.mean(time_list[warmup_times:]).tolist()
@ -399,6 +291,7 @@ csv_data = [["", *[problem["text"] for problem in problem_set]]]
js_data = []
js_stack = [js_data]
max_title_len = max((len(title) - 1) * 3 + len(title[-1][:40]) for title in merged_title)
print("-" * (max_title_len + 1 + 6 * len(problem_set)))
for title, time_data in zip(merged_title, merged_data):
while stack != list(title[: len(stack)]):
level_title = stack[-1]
@ -458,7 +351,7 @@ for problem in problem_set:
innermost_children = innermost_children[-1]["children"]
innermost_children.append({"name": problem["text"]})
loader = jinja2.FileSystemLoader(Path(__file__).parent)
template = jinja2.Environment(loader=loader).get_template("template.html")
template = jinja2.Environment(loader=loader).get_template("breakdown_template.html")
with html_file_path.open("w") as f:
configText = (
"Run:\n"
@ -481,3 +374,26 @@ if args.query is not None:
query + " " * (max_title_len - len(query)),
*[f"{x / 1000:-6.1f}" for x in query_matched],
)
correlation = []
for problem, runs in zip(problem_set, kernels):
timeline = []
for i, (demangledName, _, _, _) in enumerate(runs[0]):
name = string_ids[demangledName]
duration_list = [run[i][2] - run[i][1] for run in runs]
end_list = [run[i][2] - run[0][1] for run in runs]
timeline.append(
{
"name": name,
"duration": np.mean(duration_list[warmup_times:]).tolist(),
"end": np.mean(end_list[warmup_times:]).tolist(),
}
)
correlation.append(
{
"name": problem["text"],
"timeline": timeline,
}
)
with json_file_path.open("w") as f:
json.dump(correlation, f)

View File

@ -0,0 +1,247 @@
import argparse
import bisect
import json
import re
import sqlite3
from pathlib import Path
import numpy as np
import pandas as pd
from parser_utils import (
kernel_short_name,
lazy_convert_sqlite,
shortest_common_supersequence,
warned_names,
)
def comma_separated_ints(s):
return [int(x) for x in s.split(",")]
parser = argparse.ArgumentParser()
parser.add_argument("--eager-trace", type=str, required=True)
parser.add_argument("--graph-trace", type=str)
parser.add_argument("--target-ctx-reqs", type=int, default=0)
parser.add_argument("--target-gen-reqs", type=int)
parser.add_argument("--layer-indices", type=comma_separated_ints, required=True)
parser.add_argument("--warmup-times", type=int, default=5)
group = parser.add_mutually_exclusive_group()
group.add_argument("--error-on-unknown-kernel", action="store_true", dest="error_on_unknown_kernel")
group.add_argument(
"--no-error-on-unknown-kernel", action="store_false", dest="error_on_unknown_kernel"
)
parser.set_defaults(error_on_unknown_kernel=False)
parser.add_argument("--output", "-o", type=str)
args = parser.parse_args()
if not args.eager_trace.endswith(".nsys-rep"):
parser.error("Please provide a .nsys-rep file for the --eager-trace option.")
if args.graph_trace is not None and not args.graph_trace.endswith(".nsys-rep"):
parser.error("Please provide a .nsys-rep file for the --graph-trace option.")
print(args)
def is_gemm(name):
return "nvjet" in name or "gemm" in name.lower()
eager_nsys_rep_file_path = Path(args.eager_trace)
# For CTX phase which does not use CUDA Graphs, analysis the eager trace instead.
# Here we do not change the identifier name "graph_*" for convenience.
graph_nsys_rep_file_path = Path(args.graph_trace or args.eager_trace)
eager_sqlite_file_path = eager_nsys_rep_file_path.parent / (
eager_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite"
)
graph_sqlite_file_path = graph_nsys_rep_file_path.parent / (
graph_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite"
)
lazy_convert_sqlite(eager_nsys_rep_file_path, eager_sqlite_file_path)
lazy_convert_sqlite(graph_nsys_rep_file_path, graph_sqlite_file_path)
eager_conn = sqlite3.connect(f"file:{eager_sqlite_file_path}?mode=ro", uri=True)
graph_conn = sqlite3.connect(f"file:{graph_sqlite_file_path}?mode=ro", uri=True)
query = "SELECT * FROM ENUM_NSYS_EVENT_TYPE"
df = pd.read_sql_query(query, eager_conn)
eager_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist()
df = pd.read_sql_query(query, graph_conn)
graph_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist()
query = """SELECT T1.start, T1.end, T2.value AS text
FROM NVTX_EVENTS AS T1
JOIN StringIds AS T2 ON T1.textId = T2.id
WHERE eventType = ?"""
df = pd.read_sql_query(query, eager_conn, params=(eager_event_id_NvtxPushPopRange,))
target_ctx_reqs = args.target_ctx_reqs
target_gen_reqs = args.target_gen_reqs
if target_gen_reqs is None:
if target_ctx_reqs == 0:
for _, _, text in df.itertuples(index=False):
if m := re.match(
r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text
):
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs:
target_gen_reqs = gen_reqs
break
else:
raise ValueError("Cannot determine target_gen_reqs")
else:
target_gen_reqs = 0
print(f"{target_ctx_reqs=} {target_gen_reqs=}")
eager_iters = []
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text):
it = int(m.group(1))
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs:
eager_iters.append((start, end, it))
eager_iters = sorted(eager_iters)[args.warmup_times :]
iter_list = [t[2] for t in eager_iters]
print("Iters (eager)", *iter_list)
per_iter_eager_layers = [[] for _ in iter_list]
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^layer_wise_benchmarks layer_idx (\d+)$", text):
layer_idx = int(m.group(1))
it_idx = bisect.bisect(eager_iters, (start,)) - 1
if it_idx < 0 or end > eager_iters[it_idx][1]:
continue
assert end <= eager_iters[it_idx][1], "Not belong to any iter"
per_iter_eager_layers[it_idx].append((start, end, it_idx, layer_idx))
layer_list = [t[3] for t in per_iter_eager_layers[0]]
print("Layers (eager)", *layer_list)
for eager_layers in per_iter_eager_layers:
assert [t[3] for t in eager_layers] == layer_list, "inconsistent layer idx"
df = pd.read_sql_query(query, graph_conn, params=(graph_event_id_NvtxPushPopRange,))
graph_iters = []
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text):
it = int(m.group(1))
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs:
graph_iters.append((start, end, it))
graph_iters = sorted(graph_iters)[args.warmup_times :]
graph_iter_list = [t[2] for t in graph_iters]
print("Iters (graph)", *graph_iter_list)
if iter_list != graph_iter_list:
raise ValueError("The ID of iterations do not match")
def query_kernels(conn, iters):
query = """SELECT name FROM sqlite_master WHERE type = ?"""
df = pd.read_sql_query(query, conn, params=("table",))
tables = df["name"].tolist()
unified_subquery = """SELECT T1.start, T1.end, T1.demangledName, T1.correlationId, T1.graphNodeId
FROM CUPTI_ACTIVITY_KIND_KERNEL AS T1"""
if "CUPTI_ACTIVITY_KIND_MEMCPY" in tables:
unified_subquery += """ UNION ALL
SELECT T2.start, T2.end, -2 AS demangledName, T2.correlationId, T2.graphNodeId
FROM CUPTI_ACTIVITY_KIND_MEMCPY AS T2"""
if "CUPTI_ACTIVITY_KIND_MEMSET" in tables:
unified_subquery += """ UNION ALL
SELECT T3.start, T3.end, -3 AS demangledName, T3.correlationId, T3.graphNodeId
FROM CUPTI_ACTIVITY_KIND_MEMSET AS T3"""
query = f"""SELECT unified.start, unified.end, unified.graphNodeId, unified.demangledName,
R.start AS runtime_start, R.end AS runtime_end
FROM ({unified_subquery}) AS unified
JOIN CUPTI_ACTIVITY_KIND_RUNTIME AS R ON unified.correlationId = R.correlationId"""
df = pd.read_sql_query(query, conn)
per_iter_kernels = [[] for _ in iters]
for start, end, graphNodeId, demangledName, runtime_start, runtime_end in df.itertuples(
index=False
):
it_idx = bisect.bisect(iters, (runtime_start,)) - 1
if it_idx < 0 or runtime_end > iters[it_idx][1]:
continue
per_iter_kernels[it_idx].append((runtime_start, graphNodeId, start, end, demangledName))
for kernels in per_iter_kernels:
kernels.sort()
return per_iter_kernels
eager_per_iter_kernels = query_kernels(eager_conn, eager_iters)
graph_per_iter_kernels = query_kernels(graph_conn, graph_iters)
print("#Kernels (eager)", *[len(kernels) for kernels in eager_per_iter_kernels])
print("#Kernels (graph)", *[len(kernels) for kernels in graph_per_iter_kernels])
for eager_kernels, graph_kernels in zip(eager_per_iter_kernels, graph_per_iter_kernels):
assert all(a[4] == eager_per_iter_kernels[0][i][4] for i, a in enumerate(eager_kernels)), (
"eager kernels change across iterations"
)
assert all(a[4] == graph_per_iter_kernels[0][i][4] for i, a in enumerate(graph_kernels)), (
"graph kernels change across iterations"
)
query = "SELECT * FROM StringIds"
df = pd.read_sql_query(query, eager_conn)
eager_string_ids = dict(zip(df["id"], df["value"]))
eager_string_ids.update({-2: "Memcpy", -3: "Memset"})
df = pd.read_sql_query(query, graph_conn)
graph_string_ids = dict(zip(df["id"], df["value"]))
graph_string_ids.update({-2: "Memcpy", -3: "Memset"})
eager_conn.close()
graph_conn.close()
eager_kernel_names = [eager_string_ids[kernel[4]] for kernel in eager_per_iter_kernels[0]]
graph_kernel_names = [graph_string_ids[kernel[4]] for kernel in graph_per_iter_kernels[0]]
super_kernel_names = shortest_common_supersequence(eager_kernel_names, graph_kernel_names)
print(f"#Kernels (supersequence) {len(super_kernel_names)}")
eager_per_layer_kernels = [[] for _ in layer_list]
for i, eager_kernel in enumerate(eager_per_iter_kernels[0]):
eager_layers_idx = bisect.bisect(per_iter_eager_layers[0], (eager_kernel[0],)) - 1
if eager_layers_idx < 0 or eager_kernel[0] > per_iter_eager_layers[0][eager_layers_idx][1]:
continue
eager_per_layer_kernels[eager_layers_idx].append(i)
eager2super = []
j = 0
for i, eager_kernel_name in enumerate(eager_kernel_names):
while eager_kernel_name != super_kernel_names[j]:
j += 1
eager2super.append(j)
j += 1
super_per_layer_starts = [eager2super[a[0]] for a in eager_per_layer_kernels]
super_per_layer_ends = [eager2super[a[-1]] for a in eager_per_layer_kernels]
graph_per_layer_kernels = [[] for _ in layer_list]
j = 0
for i, graph_kernel_name in enumerate(graph_kernel_names):
while graph_kernel_name != super_kernel_names[j]:
j += 1
layer_idx = bisect.bisect(super_per_layer_starts, j) - 1
if layer_idx >= 0 and j <= super_per_layer_ends[layer_idx]:
graph_per_layer_kernels[layer_idx].append(i)
j += 1
timeline = []
first_kernel_idx = min(graph_per_layer_kernels[layer_idx][0] for layer_idx in args.layer_indices)
for layer_idx in args.layer_indices:
for kernel_idx in graph_per_layer_kernels[layer_idx]:
duration_list = []
end_list = []
for it_idx in range(len(graph_per_iter_kernels)):
layer_start_time = graph_per_iter_kernels[it_idx][first_kernel_idx][2]
kernel_start_time = graph_per_iter_kernels[it_idx][kernel_idx][2]
kernel_end_time = graph_per_iter_kernels[it_idx][kernel_idx][3]
duration_list.append(kernel_end_time - kernel_start_time)
end_list.append(kernel_end_time - layer_start_time)
timeline.append(
{
"name": graph_kernel_names[kernel_idx],
"duration": np.mean(duration_list).tolist(),
"end": np.mean(end_list).tolist(),
}
)
print(f"{'Kernel':40s} {'Duration':>8s} {'End':>8s}")
print("-" * (40 + 1 + 8 + 1 + 8))
for o in timeline:
print(
f"{kernel_short_name(o['name'])[:40]:40s} {o['duration'] / 1000.0:-8.1f} {o['end'] / 1000.0:-8.1f}"
)
if args.error_on_unknown_kernel and warned_names:
raise ValueError("Unknown kernel names encountered")
if args.output:
if not args.output.endswith(".json"):
raise ValueError("Output file name must be *.json")
with open(args.output, "w") as f:
json.dump([{"name": "parse_e2e", "timeline": timeline}], f)

View File

@ -0,0 +1,216 @@
import re
import subprocess
import sys
import numpy as np
def lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path):
if (
not sqlite_file_path.is_file()
or nsys_rep_file_path.stat().st_mtime > sqlite_file_path.stat().st_mtime
):
subprocess.check_call(
[
"nsys",
"export",
"--type",
"sqlite",
"-o",
sqlite_file_path,
"--force-overwrite=true",
nsys_rep_file_path,
]
)
parser_keywords = [
("cuBLASGemm", "nvjet"),
("cutlassGroupGemm", "cutlass::device_kernel<cutlass::gemm::kernel::GemmUniversal"),
("cutlassGemm", "GemmUniversal"),
("CuteDSLMoePermute", "cute_dsl::moePermuteKernel"),
(
"CuteDSLGemm",
["cute_dsl_kernels", "blockscaled_gemm_persistent"],
),
(
"CuteDSLGroupedGemmSwiglu",
["cute_dsl_kernels", "blockscaled_contiguous_grouped_gemm_swiglu_fusion"],
),
(
"CuteDSLGroupedGemmFinalize",
["cute_dsl_kernels", "blockscaled_contiguous_grouped_gemm_finalize_fusion"],
),
("torchAdd", "at::native::CUDAFunctorOnSelf_add"),
("torchAdd", "CUDAFunctor_add"),
("torchClamp", "at::native::<unnamed>::launch_clamp_scalar("),
("torchCompare", "at::native::<unnamed>::CompareFunctor<"),
("torchCopy", "at::native::bfloat16_copy_kernel_cuda"),
("torchCopy", "at::native::direct_copy_kernel_cuda("),
("torchDiv", "at::native::binary_internal::DivFunctor<"),
("torchFill", "at::native::FillFunctor"),
("torchIndexPut", "at::native::index_put_kernel_impl<"),
("torchMul", "at::native::binary_internal::MulFunctor<"),
("torchPow", "at::native::<unnamed>::pow_tensor_scalar_kernel_impl<"),
("torchReduceSum", ["at::native::reduce_kernel<", "at::native::sum_functor<"]),
("torchScatterGather", "void at::native::_scatter_gather_elementwise_kernel<"),
("torchSigmoid", "at::native::sigmoid_kernel_cuda"),
("torchWhere", "at::native::<unnamed>::where_kernel_impl("),
]
warned_names = set()
def kernel_short_name(name):
for dst, src in parser_keywords:
if not isinstance(src, (tuple, list)):
src = [src]
if all(keyword in name for keyword in src):
return dst
if re.search(r"at::native::.*elementwise_kernel<", name):
if name not in warned_names:
print(f"Not parsed torch kernel name: {name}", file=sys.stderr)
warned_names.add(name)
assert "!unnamed!" not in name
name = name.replace("<unnamed>", "!unnamed!")
if "<" in name:
name = name[: name.index("<")]
if "(" in name:
name = name[: name.index("(")]
if "::" in name:
name = name[name.rindex("::") + 2 :]
name = name.replace("!unnamed!", "<unnamed>")
return name
def shortest_common_supersequence(a, b):
# Merge two lists into their shortest common supersequence,
# so that both `a` and `b` are subsequences of the result.
# Uses dynamic programming to compute the shortest common supersequence, then reconstructs it.
m, n = len(a), len(b)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
dp[i][0] = i
for j in range(n + 1):
dp[0][j] = j
for i in range(1, m + 1):
for j in range(1, n + 1):
if a[i - 1] == b[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
else:
dp[i][j] = min(dp[i - 1][j] + 1, dp[i][j - 1] + 1)
# Backtrack to build the merged sequence
res = []
i, j = m, n
while i > 0 and j > 0:
if a[i - 1] == b[j - 1]:
res.append(a[i - 1])
i -= 1
j -= 1
elif dp[i - 1][j] < dp[i][j - 1]:
res.append(a[i - 1])
i -= 1
else:
res.append(b[j - 1])
j -= 1
while i > 0:
res.append(a[i - 1])
i -= 1
while j > 0:
res.append(b[j - 1])
j -= 1
res.reverse()
return res
try:
import numba
numba_installed = True
except ImportError:
numba_installed = False
if numba_installed:
# The core computation function: compiled to machine code by Numba.
# 'nopython=True' ensures it runs entirely without the Python interpreter for max speed.
@numba.jit(nopython=True)
def _core_scs(a_ids, b_ids):
m = len(a_ids)
n = len(b_ids)
# Use a NumPy array instead of a Python list of lists.
# This creates a continuous memory block, similar to int dp[m+1][n+1] in C.
dp = np.zeros((m + 1, n + 1), dtype=np.int32)
# 1. Initialize boundaries
# Corresponds to: dp[i][0] = i
for i in range(m + 1):
dp[i, 0] = i
# Corresponds to: dp[0][j] = j
for j in range(n + 1):
dp[0, j] = j
# 2. Fill the DP table
for i in range(1, m + 1):
for j in range(1, n + 1):
if a_ids[i - 1] == b_ids[j - 1]:
dp[i, j] = dp[i - 1, j - 1] + 1
else:
val1 = dp[i - 1, j] + 1
val2 = dp[i, j - 1] + 1
if val1 < val2:
dp[i, j] = val1
else:
dp[i, j] = val2
# 3. Backtrack to reconstruct the result
# dp[m, n] holds the total length of the shortest common supersequence.
res_len = dp[m, n]
# Pre-allocate the result array.
# Filling a pre-allocated array is much faster than appending to a list.
res_ids = np.empty(res_len, dtype=np.int32)
k = res_len - 1 # Index for writing into res_ids
i, j = m, n
while i > 0 and j > 0:
if a_ids[i - 1] == b_ids[j - 1]:
res_ids[k] = a_ids[i - 1]
i -= 1
j -= 1
elif dp[i - 1, j] < dp[i, j - 1]:
res_ids[k] = a_ids[i - 1]
i -= 1
else:
res_ids[k] = b_ids[j - 1]
j -= 1
k -= 1
while i > 0:
res_ids[k] = a_ids[i - 1]
i -= 1
k -= 1
while j > 0:
res_ids[k] = b_ids[j - 1]
j -= 1
k -= 1
return res_ids
def shortest_common_supersequence(a, b):
# 1. Build a mapping table (String -> Int)
# Extract unique tokens from both lists
unique_tokens = list(set(a) | set(b))
token_to_id = {token: i for i, token in enumerate(unique_tokens)}
id_to_token = {i: token for i, token in enumerate(unique_tokens)}
# 2. Convert input lists to NumPy integer arrays
a_ids = np.array([token_to_id[x] for x in a], dtype=np.int32)
b_ids = np.array([token_to_id[x] for x in b], dtype=np.int32)
# 3. Call the JIT-compiled core function
# The first time this runs, it will compile (takes ~200ms). Subsequent runs are instant.
res_ids = _core_scs(a_ids, b_ids)
# 4. Convert the result back to strings (Int -> String)
return [id_to_token[idx] for idx in res_ids]

View File

@ -13,7 +13,9 @@ from tensorrt_llm._torch.autotuner import AutoTuner, autotune
from tensorrt_llm._torch.modules.multi_stream_utils import with_multi_stream
from tensorrt_llm._utils import local_mpi_rank, mpi_rank, mpi_world_size
from tensorrt_llm.logger import logger
from tensorrt_llm.tools.layer_wise_benchmarks import BalanceMethod, Runner, mark_ranges
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from tensorrt_llm.tools.layer_wise_benchmarks.mark_utils import mark_ranges
from tensorrt_llm.tools.layer_wise_benchmarks.runner import BalanceMethod, Runner
def comma_separated_ints(s):
@ -79,6 +81,15 @@ parser.add_argument("--seq-len-q", type=comma_separated_ints, dest="seq_len_q_li
parser.add_argument("--seq-len-kv-cache", type=comma_separated_ints, dest="seq_len_kv_cache_list")
parser.add_argument("--balance-method", type=str)
parser.add_argument("--balance-ratio", type=comma_separated_floats, dest="balance_ratio_list")
# Calibration
parser.add_argument("--replay-file-path", type=str)
parser.add_argument("--replay-start-iter", type=int)
parser.add_argument("--replay-stop-iter", type=int)
group = parser.add_mutually_exclusive_group()
group.add_argument("--replay-verify-metadata", action="store_true", dest="replay_verify_metadata")
group.add_argument(
"--no-replay-verify-metadata", action="store_false", dest="replay_verify_metadata"
)
# Schedule
parser.add_argument("--warmup-times", type=int, default=20)
parser.add_argument("--run-times", type=int, default=100)
@ -131,6 +142,10 @@ if args.enable_autotuner is None:
args.enable_autotuner = True
if args.use_cuda_graph is None:
args.use_cuda_graph = False
if (args.replay_start_iter is None) != (args.replay_stop_iter is None):
parser.error("Both --replay-start-iter and --replay-stop-iter must be provided or none")
if args.replay_verify_metadata is None:
args.replay_verify_metadata = True
print(args)
# MPI args
@ -179,6 +194,27 @@ runner = Runner(
)
logger.info("Layer-wise benchmarks: Create runner ... Done")
calibrator = get_calibrator()
if args.replay_file_path:
calibrator.init(
"REPLAY",
args.replay_file_path,
args.layer_indices,
replay_verify_metadata=args.replay_verify_metadata,
mapping=mapping,
)
if args.replay_start_iter is None:
replay_start_iter, replay_stop_iter = calibrator.get_replay_iteration_range()
else:
replay_start_iter, replay_stop_iter = args.replay_start_iter, args.replay_stop_iter
logger.info(
f"Layer-wise benchmarks: Replay iteration range [{replay_start_iter}, {replay_stop_iter}]"
)
else:
calibrator.init("NONE", None, None)
replay_start_iter, replay_stop_iter = 1, 1 # To avoid None in mathematics
calibrator.maybe_wrap_model(runner.model)
# Autotune
run_pack = runner.create_run_pack(
args.run_type,
@ -320,14 +356,19 @@ for batch_size, seq_len_q, seq_len_kv_cache, balance_ratio in itertools.product(
balance_ratio_str = "" if balance_ratio is None else f" balance={balance_ratio:.2g}"
nvtx_message = f"b={batch_size} s={seq_len_q} past={seq_len_kv_cache}{balance_ratio_str} NP{world_size}"
calibrator.start()
for i in range(args.warmup_times + args.run_times):
events[i].record()
replay_iter = replay_start_iter + i % (replay_stop_iter - replay_start_iter + 1)
calibrator.pre_step(replay_iter)
with nvtx.annotate(nvtx_message):
if args.use_cuda_graph:
g.replay()
else:
run_pack()
calibrator.post_step(replay_iter)
events[-1].record()
calibrator.stop()
torch.cuda.synchronize()
# Print statistics

View File

@ -0,0 +1,144 @@
#!/bin/bash
set -euo pipefail
# Common settings and preparation
MODEL="${MODEL:-$LLM_MODELS_ROOT/DeepSeek-R1/DeepSeek-R1-0528-FP4-v2}"
NP=${NP:-4}
BATCH_SIZE=32
export PROFILE_DIR="${PROFILE_DIR:-profiles}"
export TLLM_AUTOTUNER_CACHE_PATH="$PROFILE_DIR/sample_performance_alignment_cache.json"
mkdir -p -- "$PROFILE_DIR"
mkdir -p -- "$(dirname "$TLLM_AUTOTUNER_CACHE_PATH")"
python3 ../../benchmarks/cpp/prepare_dataset.py \
--tokenizer "$MODEL" \
--stdout \
--random-seed 42 \
token-norm-dist \
--num-requests $((BATCH_SIZE*NP)) \
--input-mean 2048 \
--input-stdev 0 \
--output-mean 256 \
--output-stdev 0 \
>/tmp/dataset.jsonl
# Step 1
rm -f -- "$TLLM_AUTOTUNER_CACHE_PATH"
cat <<EOF >/tmp/config_collect.yaml
enable_attention_dp: true
layer_wise_benchmarks_config:
calibration_mode: COLLECT
calibration_file_path: "$PROFILE_DIR/calibration_data.json"
moe_config:
backend: CUTLASS
print_iter_log: true
EOF
TLLM_PROFILE_START_STOP=$((BATCH_SIZE + 10))-$((BATCH_SIZE + 35)) \
NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \
nsys profile \
-t cuda,nvtx \
--cpuctxsw none --cuda-event-trace false \
--cuda-graph-trace node \
-c cudaProfilerApi --capture-range-end stop \
-o "$PROFILE_DIR/report_e2e_collect_rank%q{RANK}.nsys-rep" \
--force-overwrite true \
trtllm-llmapi-launch \
trtllm-bench \
--model deepseek-ai/DeepSeek-V3 \
--model_path "$MODEL" \
throughput \
--tp $NP \
--ep $NP \
--warmup 0 \
--dataset /tmp/dataset.jsonl \
--max_batch_size $BATCH_SIZE \
--max_num_tokens 3072 \
--disable_chunked_context \
--num_requests $((BATCH_SIZE*NP)) \
--concurrency $((BATCH_SIZE*NP)) \
--config /tmp/config_collect.yaml
# Step 2
cat <<EOF >/tmp/config_mark.yaml
cuda_graph_config: null
enable_attention_dp: true
layer_wise_benchmarks_config:
calibration_mode: MARK
moe_config:
backend: CUTLASS
print_iter_log: true
EOF
TLLM_PROFILE_START_STOP=$((BATCH_SIZE + 10))-$((BATCH_SIZE + 35)) \
NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \
nsys profile \
-t cuda,nvtx \
--cpuctxsw none --cuda-event-trace false \
--cuda-graph-trace node \
-c cudaProfilerApi --capture-range-end stop \
-o "$PROFILE_DIR/report_e2e_mark_rank%q{RANK}.nsys-rep" \
--force-overwrite true \
trtllm-llmapi-launch \
trtllm-bench \
--model deepseek-ai/DeepSeek-V3 \
--model_path "$MODEL" \
throughput \
--tp $NP \
--ep $NP \
--warmup 0 \
--dataset /tmp/dataset.jsonl \
--max_batch_size $BATCH_SIZE \
--max_num_tokens 3072 \
--disable_chunked_context \
--num_requests $((BATCH_SIZE*NP)) \
--concurrency $((BATCH_SIZE*NP)) \
--config /tmp/config_mark.yaml
# Step 3
NP=$NP ./mpi_launch.sh ./run.sh config_gen.yaml \
--model "$MODEL" \
--load-format AUTO \
--layer-indices 5,6,7 \
--batch-size $BATCH_SIZE \
--seq-len-q 1 \
--seq-len-kv-cache $((2049 + (BATCH_SIZE / 2 + 25) * 1)) \
--balance-method NotModified \
--replay-file-path "$PROFILE_DIR/calibration_data.json" \
--replay-start $((BATCH_SIZE + 10 + 5)) \
--replay-stop $((BATCH_SIZE + 35))
# Step 4
seq 0 $((NP - 1)) | xargs -I% python3 parse_e2e.py \
--eager-trace "$PROFILE_DIR/report_e2e_mark_rank%.nsys-rep" \
--graph-trace "$PROFILE_DIR/report_e2e_collect_rank%.nsys-rep" \
--layer-indices 5,6,7 \
--warmup-times 5 \
-o "$PROFILE_DIR/report_e2e_collect_rank%.json"
seq 0 $((NP - 1)) | xargs -I% python3 parse.py \
--profile-dir "$PROFILE_DIR" \
--world-size $NP \
--rank %
# Step 5
targets=()
for i in $(seq 1 $((NP - 1))); do
targets+=(--target "$PROFILE_DIR/report_e2e_collect_rank$i.json")
done
for i in $(seq 0 $((NP - 1))); do
targets+=(--target "$PROFILE_DIR/report_np${NP}_rank$i.json")
done
python3 correlation.py \
--reference "$PROFILE_DIR/report_e2e_collect_rank0.json" \
"${targets[@]}" \
-o "$PROFILE_DIR/correlation.html"

View File

@ -12,6 +12,7 @@ salloc -A "$ACCOUNT" \
-p "$PARTITION" \
-N "$NODES" \
--segment "$NODES" \
-J "$ACCOUNT-tensorrt_llm.layer_wise_benchmarks" \
$EXTRA_ARGS \
-t "$TIME" \
--no-shell \

View File

@ -16,7 +16,7 @@ NODES=$(squeue -j $SLURM_JOB_ID -h -o "%D")
if [ -z "${CONTAINER_IMAGE:-}" ]; then
# Read Docker image from current_image_tags.properties
MACHINE="$(uname -m)"
MACHINE="$(srun -N 1 uname -m)"
if [ "$MACHINE" == "x86_64" ]; then
DOCKER_IMAGE=$(source "$TRTLLM_ROOT/jenkins/current_image_tags.properties" && echo $LLM_DOCKER_IMAGE)
elif [ "$MACHINE" == "aarch64" ]; then

View File

@ -39,6 +39,7 @@ from tensorrt_llm._torch.modules.fused_moe.routing import BaseMoeRoutingMethod
from tensorrt_llm._torch.utils import AuxStreamType, EventType, Fp4QuantizedTensor
from tensorrt_llm.logger import logger
from tensorrt_llm.models.modeling_utils import QuantConfig
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from .communication import (
AllGatherReduceScatter,
@ -609,6 +610,9 @@ class ConfigurableMoE(MoE):
if token_selected_slots is not None:
ExpertStatistic.set_layer(self.layer_idx)
ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots)
token_selected_slots = get_calibrator().maybe_collect_or_replay_slots(
self.num_slots, token_selected_slots
)
# ========== Step 3.5: Communication Prepare Phase (BEFORE quantization) ==========
# NVLINK two-sided has a prepare phase to gather EPLB statistics

View File

@ -8,6 +8,7 @@ import torch
from tensorrt_llm._mnnvl_utils import MnnvlMemory, MnnvlMoe
from tensorrt_llm._torch.distributed.moe_alltoall import MoeAlltoAll
from tensorrt_llm.logger import logger
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from ...distributed import allgather
from ...expert_statistic import ExpertStatistic
@ -539,6 +540,8 @@ class CutlassFusedMoE(MoE):
# If load balancer is enabled, the statistics are collected from expert slot IDs.
ExpertStatistic.set_layer(self.layer_idx)
ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots)
token_selected_slots = get_calibrator().maybe_collect_or_replay_slots(
self.num_slots, token_selected_slots)
if self.apply_router_weight_on_input:
assert x.dtype != torch.float8_e4m3fn, "Current workaround for apply_router_weight_on_input does not support fp8 input"

View File

@ -8,6 +8,7 @@ from tensorrt_llm._mnnvl_utils import MnnvlMemory, MnnvlMoe, MoEAlltoallInfo
from tensorrt_llm._utils import is_sm_100f, local_mpi_size
from tensorrt_llm.logger import logger
from tensorrt_llm.mapping import Mapping
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from ...distributed import allgather, reducescatter
from ...expert_statistic import ExpertStatistic
@ -439,6 +440,8 @@ class WideEPMoE(MoE):
# If load balancer is enabled, the statistics are collected from expert slot IDs.
ExpertStatistic.set_layer(self.layer_idx)
ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots)
token_selected_slots = get_calibrator().maybe_collect_or_replay_slots(
self.num_slots, token_selected_slots)
use_allgather = not use_all_to_all

View File

@ -35,6 +35,7 @@ from tensorrt_llm.llmapi.llm_args import PeftCacheConfig
from tensorrt_llm.logger import logger
from tensorrt_llm.mapping import CpType
from tensorrt_llm.runtime.generation import CUASSERT
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from ..distributed import Distributed
from ..models.modeling_utils import DecoderModelForCausalLM
@ -736,8 +737,11 @@ class PyExecutor:
record_shapes=True,
with_modules=True)
calibrator = get_calibrator()
def profile_step():
nonlocal it, enabled, start_time, start_event_1, end_event_1, start_event_2, end_event_2, prev_device_step_time
calibrator.post_step(it)
if it in self.profile_stop_iters and not self.is_warmup:
assert enabled, "Inconsistent CUDA profiling state"
if enable_torch_trace:
@ -746,6 +750,7 @@ class PyExecutor:
logger.info(f"Profiling stopped at iteration {it}, "
f"trace saved to {torch_trace_path}")
torch.cuda.cudart().cudaProfilerStop()
calibrator.stop()
enabled = False
if start_time is not None and self.print_log and self.dist.rank == 0:
@ -786,11 +791,13 @@ class PyExecutor:
if it in self.profile_start_iters and not self.is_warmup:
assert not enabled, "Inconsistent CUDA profiling state"
calibrator.start()
torch.cuda.cudart().cudaProfilerStart()
if enable_torch_trace:
torch_profiler.start()
logger.info(f"Profiling started at iteration {it}.")
enabled = True
calibrator.pre_step(it)
start_time = time.time()
if it % 2 == 0:
if start_event_1 is None:
@ -812,6 +819,7 @@ class PyExecutor:
logger.info(f"Profiling stopped at iteration {it}, "
f"trace saved to {torch_trace_path}")
torch.cuda.cudart().cudaProfilerStop()
calibrator.stop()
def _get_init_iter_stats(self, num_new_active_requests,
new_active_requests_queue_latency_ms):

View File

@ -24,6 +24,7 @@ from tensorrt_llm.llmapi.tokenizer import (TokenizerBase,
from tensorrt_llm.logger import logger
from tensorrt_llm.mapping import Mapping
from tensorrt_llm.quantization import QuantAlgo
from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator
from ..attention_backend.interface import AttentionRuntimeFeatures
from ..attention_backend.trtllm import TrtllmAttention
@ -354,6 +355,15 @@ def create_py_executor(
validate_feature_combination(llm_args, model_engine, llm_args.sampler_type)
calibrator = get_calibrator()
layer_wise_benchmarks_config = llm_args.layer_wise_benchmarks_config
calibrator.init(layer_wise_benchmarks_config.calibration_mode,
layer_wise_benchmarks_config.calibration_file_path,
layer_wise_benchmarks_config.calibration_layer_indices,
mapping=mapping,
dist=dist)
model_engine.model = calibrator.maybe_wrap_model(model_engine.model)
if has_draft_model_engine:
with allocation_scope(ExecutorMemoryType.MODEL_ENGINE_DRAFT,
RestoreMode.PINNED):

View File

@ -840,6 +840,37 @@ class KvCacheConnectorConfig(StrictBaseModel):
..., description="The class name of the worker within the module.")
class LayerwiseBenchmarksConfig(StrictBaseModel):
"""
Configuration for layer-wise benchmarks calibration.
"""
calibration_mode: Literal["NONE", "MARK", "COLLECT"] = Field(
default="NONE",
description=
"Instruct the layer-wise benchmarks calibrator to work on MARK mode, or COLLECT mode",
status="prototype")
calibration_file_path: Optional[str] = Field(
default=None,
description=
"The file path which the layer-wise benchmarks calibrator saves to or loads from",
status="prototype")
calibration_layer_indices: Optional[List[int]] = Field(
default=None,
description=
"Layer indices to filter. If None, all layers are collected in COLLECT mode.",
status="prototype")
@model_validator(mode='after')
def validate_calibration_file_path(self) -> 'LayerwiseBenchmarksConfig':
if self.calibration_mode == "COLLECT" and not self.calibration_file_path:
raise ValueError(
f"Expect calibration_file_path not to be empty when work on {self.calibration_mode} mode"
)
return self
class MedusaDecodingConfig(DecodingBaseConfig):
medusa_choices: Optional[List[List[int]]] = None
num_medusa_heads: Optional[int] = None
@ -2935,6 +2966,7 @@ class TorchLlmArgs(BaseLlmArgs):
'NCCL_SYMMETRIC']] = Field(default='AUTO',
description="Allreduce strategy to use.",
status="beta")
checkpoint_loader: Optional[object] = Field(
default=None,
description=
@ -3011,6 +3043,11 @@ class TorchLlmArgs(BaseLlmArgs):
status="prototype",
)
layer_wise_benchmarks_config: LayerwiseBenchmarksConfig = Field(
default_factory=LayerwiseBenchmarksConfig,
description="Configuration for layer-wise benchmarks calibration.",
status="prototype")
@property
def quant_config(self) -> QuantConfig:
if self._quant_config is None:

View File

@ -1,4 +1,3 @@
from .mark_utils import mark_ranges
from .runner import BalanceMethod, Runner
from .calibrator import get_calibrator
__all__ = ["BalanceMethod", "Runner", "mark_ranges"]
__all__ = ["get_calibrator"]

View File

@ -0,0 +1,716 @@
import base64
import functools
import json
import zlib
from enum import Enum
from typing import Optional
import nvtx
import torch
from tensorrt_llm.logger import logger
class Mode(Enum):
NONE = 1
MARK = 2
COLLECT = 3
REPLAY = 4
class Calibrator:
"""Calibrator for layer-wise benchmarks with MoE expert routing data.
The calibrator operates in one of the following modes:
NONE: Disabled, no-op.
MARK: Add NVTX markers for correlating E2E and layer-wise benchmarks.
COLLECT: Collect `token_selected_slots` data and save to a file.
REPLAY: Load `token_selected_slots` data from a file for replay.
Lifecycle:
init() -> maybe_wrap_model() -> [CUDA Graph capture] -> start() ->
[pre_step() -> forward() -> post_step()] x N -> stop()
Design Notes:
To ensure CUDA Graphs compatibility, `token_selected_slots` tensors are
copied to a fixed GPU buffer during graph capture/replay. The actual data
is then transferred to per-iteration storage in `post_step()`.
Since `model.forward()` does not execute during CUDA Graphs replay, we
cannot access Python metadata objects directly. Instead, we copy an integer
index (`metadata_idx`) pointing to a pre-built metadata list.
For REPLAY mode, metadata verification is deferred to `stop()` to avoid
GPU synchronization during profiling. Metadata indices are recorded on GPU
in `post_step()` and compared after `cuda.synchronize()` in `stop()`.
Tensor Locations (COLLECT mode):
- `_metadata_idx_gpu`: GPU scalar, current iteration's index into `_metadata_list`
- `_metadata_idx_range_gpu`: GPU [0..MAX_NUM_METADATA), pre-generated indices for GPU copy
- `_slots_buffer_gpu`: GPU, fixed buffer for CUDA Graphs compatibility
- `_collected_metadata_idx`: GPU [MAX_COLLECT_ITERATIONS], per-iteration metadata indices
- `_collected_slots_cpu`: CPU (pinned), per-iteration slots data
- `_eager_slots_gpu`: GPU, list of tensors for non-CG (context) phase
Tensor Locations (REPLAY mode):
- `_actual_metadata_idx_gpu`: GPU scalar, current iteration's actual metadata index
- `_metadata_idx_range_gpu`: GPU [0..MAX_NUM_METADATA), pre-generated indices
- `_slots_buffer_gpu`: GPU, fixed buffer for CUDA Graphs compatibility
- `_collected_actual_metadata_idx`: GPU [MAX_REPLAY_ITERATIONS], actual metadata indices
- `_collected_iterations`: Python list, iterations for expected metadata lookup
- `_replay_eager_slots_gpu`: GPU, eager slots data for large context phase
"""
# Maximum number of int32 elements per iteration for CUDA Graphs buffer
MAX_SLOTS_BUFFER_SIZE = 4 * 1024 * 1024
# Maximum number of metadata supported
MAX_NUM_METADATA = 1024 * 1024
# Maximum iterations during collect phase
MAX_COLLECT_ITERATIONS = 101
# Maximum iterations during replay phase
MAX_REPLAY_ITERATIONS = 1024 * 1024
# Data type for token_selected_slots
SLOTS_DTYPE = torch.int32
def __init__(self):
self.mode = Mode.NONE
self._started = False
def init(
self,
mode: str,
file_path: str,
layer_indices: list[int],
*,
replay_verify_metadata: Optional[bool] = None,
mapping=None,
dist=None,
):
"""Initialize the calibrator.
Args:
mode: One of "NONE", "MARK", "COLLECT", "REPLAY".
file_path: Path to the calibration data file.
layer_indices: Optional list of layer indices to filter.
COLLECT mode: If None, all layers are collected.
REPLAY mode: Cannot be None.
replay_verify_metadata: Whether to verify actual metadata in REPLAY mode matches calibration data.
mapping: Tensor parallel mapping containing rank and world_size.
dist: Distributed communication wrapper.
"""
if self.mode != Mode.NONE:
raise ValueError("double init")
self.mode = Mode[mode]
if self.mode == Mode.COLLECT:
self._init_collect_mode(file_path, layer_indices, dist)
if self.mode == Mode.REPLAY:
if replay_verify_metadata is None:
raise ValueError("missing replay_verify_metadata")
self._init_replay_mode(file_path, layer_indices, replay_verify_metadata, mapping)
def _init_collect_mode(self, file_path, layer_indices, dist):
"""Initialize buffers for COLLECT mode."""
self._file_path = file_path
self._layer_indices = layer_indices
self._dist = dist
# Metadata list that `_metadata_idx_gpu` indexes into
self._metadata_list = []
# GPU buffers for CUDA Graphs compatibility:
# - Copy from `_metadata_idx_range_gpu[idx]` to `_metadata_idx_gpu`
# - Copy flattened slots to `_slots_buffer_gpu`
self._metadata_idx_gpu = torch.empty((), dtype=torch.long, device="cuda")
self._metadata_idx_range_gpu = torch.arange(
self.MAX_NUM_METADATA, dtype=torch.long, device="cuda"
)
self._slots_buffer_gpu = torch.empty(
self.MAX_SLOTS_BUFFER_SIZE,
dtype=self.SLOTS_DTYPE,
device="cuda",
)
def _init_replay_mode(self, file_path, layer_indices, replay_verify_metadata, mapping):
"""Initialize replay database from file."""
with open(file_path) as f:
data = json.load(f)
if data["world_size"] != mapping.world_size:
raise ValueError(
f"World size mismatch: file has {data['world_size']}, "
f"but current world_size is {mapping.world_size}"
)
# Verify all ranks have the same iterations (using data from file, not dist)
all_ranks_iterations = [
[record["iteration"] for record in rank_records]
for rank_records in data["all_ranks_records"]
]
if any(iters != all_ranks_iterations[0] for iters in all_ranks_iterations):
raise ValueError("Iterations mismatch across ranks in calibration file")
start_layer_idx = layer_indices[0]
end_layer_idx = layer_indices[-1]
if list(layer_indices) != list(range(start_layer_idx, end_layer_idx + 1)):
raise ValueError("Invalid layer_indices")
self._replay_db = {}
for record in data["all_ranks_records"][mapping.rank]:
iteration = record["iteration"]
metadata = record["metadata"]
raw_data = torch.frombuffer(
zlib.decompress(base64.b64decode(record["raw_data"])),
dtype=self.SLOTS_DTYPE,
)
# Filter metadata for target layers
start_idx = 0
while metadata[start_idx]["layer_idx"] < start_layer_idx:
start_idx += 1
end_idx = start_idx
while end_idx < len(metadata) and metadata[end_idx]["layer_idx"] <= end_layer_idx:
end_idx += 1
# Calculate data offsets for filtered range
offset = sum(
torch.Size(m["token_selected_slots_shape"]).numel() for m in metadata[:start_idx]
)
length = sum(
torch.Size(m["token_selected_slots_shape"]).numel()
for m in metadata[start_idx:end_idx]
)
filtered_slots = raw_data[offset : offset + length]
self._replay_db[iteration] = {
"metadata": metadata[start_idx:end_idx],
"slots_data_gpu": filtered_slots.to("cuda"),
}
# Placeholder buffer for CUDA Graphs (actual data copied in pre_step)
self.MAX_TOPK_AND_MIN_SLOTS = 32
self._slots_buffer_gpu = (
torch.arange(
self.MAX_SLOTS_BUFFER_SIZE,
dtype=self.SLOTS_DTYPE,
device="cuda",
)
% self.MAX_TOPK_AND_MIN_SLOTS
)
self._use_eager_mode = False
# GPU buffers for deferred metadata verification (must be created before CUDA Graph capture)
# These are used during forward() which may be captured into CUDA Graphs
self._replay_verify_metadata = replay_verify_metadata
if self._replay_verify_metadata:
self._actual_metadata_list = []
self._actual_metadata_idx_gpu = torch.empty((), dtype=torch.long, device="cuda")
self._metadata_idx_range_gpu = torch.arange(
self.MAX_NUM_METADATA, dtype=torch.long, device="cuda"
)
def maybe_wrap_model(self, model):
"""Wrap model and layer forward methods for data collection/replay.
Args:
model: The model to wrap.
Returns:
The wrapped model.
"""
if self.mode == Mode.COLLECT:
self._wrap_model_forward_for_collect(model)
if self.mode == Mode.REPLAY:
self._wrap_model_forward_for_replay(model)
# Wrap layer forward methods for all active modes
if self.mode in [Mode.MARK, Mode.COLLECT, Mode.REPLAY]:
self._wrap_layer_forward(model)
return model
def _wrap_model_forward_for_collect(self, model):
"""Wrap model.forward() for COLLECT mode."""
def make_forward(forward_orig):
@functools.wraps(forward_orig)
def forward(*args, **kwargs):
# Initialize per-iteration collection buffer
self._cur_iter_records = []
output = forward_orig(*args, **kwargs)
# Build metadata from collected records
metadata = [
{
"layer_idx": layer_idx,
"num_slots": num_slots,
"token_selected_slots_shape": list(slots.shape),
}
for layer_idx, num_slots, slots in self._cur_iter_records
]
# Store metadata and copy index to GPU (for CUDA Graphs compatibility)
metadata_idx = len(self._metadata_list)
self._metadata_list.append(metadata)
if metadata_idx >= len(self._metadata_idx_range_gpu):
raise ValueError(
f"Metadata index overflow: {metadata_idx} >= {len(self._metadata_idx_range_gpu)}. "
f"Increase MAX_NUM_METADATA if more iterations are needed."
)
self._metadata_idx_gpu.copy_(self._metadata_idx_range_gpu[metadata_idx])
# Copy slots data to fixed buffer or use eager mode
total_size = sum(slots.numel() for _, _, slots in self._cur_iter_records)
if total_size <= self.MAX_SLOTS_BUFFER_SIZE:
# Small data: copy to fixed GPU buffer (CUDA Graphs compatible)
if self._cur_iter_records:
torch.cat(
[slots.flatten() for _, _, slots in self._cur_iter_records],
out=self._slots_buffer_gpu[:total_size],
)
else:
# Large data (context phase): use eager collection
# Context phase does not use CUDA Graphs, so this is safe
assert not torch.cuda.is_current_stream_capturing()
self._use_eager_mode = True
self._eager_slots_gpu = [
slots.flatten() for _, _, slots in self._cur_iter_records
]
del self._cur_iter_records
return output
return forward
model.forward = make_forward(model.forward)
def _wrap_model_forward_for_replay(self, model):
"""Wrap model.forward() for REPLAY mode."""
def make_forward(forward_orig):
@functools.wraps(forward_orig)
def forward(*args, **kwargs):
if torch.cuda.is_current_stream_capturing():
assert not self._use_eager_mode, (
"Eager mode is not compatible with CUDA Graphs capturing"
)
self._replay_chunk_idx = 0
self._replay_offset = 0
# Build metadata for verification during this forward pass
self._cur_iter_actual_metadata = []
output = forward_orig(*args, **kwargs)
# Store metadata and copy index to GPU (for deferred verification)
if self._replay_verify_metadata:
metadata_idx = len(self._actual_metadata_list)
self._actual_metadata_list.append(self._cur_iter_actual_metadata)
if metadata_idx >= len(self._metadata_idx_range_gpu):
raise ValueError(
f"Metadata index overflow: {metadata_idx} >= {len(self._metadata_idx_range_gpu)}. "
f"Increase MAX_NUM_METADATA if more iterations are needed."
)
with nvtx.annotate("layer_wise_benchmarks ignore"):
self._actual_metadata_idx_gpu.copy_(
self._metadata_idx_range_gpu[metadata_idx]
)
# Verify all replay data was consumed
if self._started and self._replay_chunk_idx != len(self._cur_iter_metadata):
raise ValueError(
f"Unused replay data: chunks [{self._replay_chunk_idx}:{len(self._cur_iter_metadata)}] "
f"were not consumed during forward pass"
)
if torch.cuda.is_current_stream_capturing():
if self._replay_offset > len(self._slots_buffer_gpu):
raise ValueError(
f"Slots buffer overflow: required {self._replay_offset} elements, "
f"but buffer size is {len(self._slots_buffer_gpu)}"
)
return output
return forward
model.forward = make_forward(model.forward)
def _wrap_layer_forward(self, model):
"""Wrap layer forward methods to track layer index and add NVTX markers."""
def make_forward(layer_idx, forward_orig):
@functools.wraps(forward_orig)
def forward(*args, **kwargs):
self._current_layer_idx = layer_idx
if self.mode in [Mode.MARK, Mode.COLLECT]:
with nvtx.annotate(f"layer_wise_benchmarks layer_idx {layer_idx}"):
output = forward_orig(*args, **kwargs)
else:
output = forward_orig(*args, **kwargs)
del self._current_layer_idx
return output
return forward
for idx, layer in enumerate(model.model.layers):
layer.forward = make_forward(idx, layer.forward)
def maybe_collect_or_replay_slots(self, num_slots, token_selected_slots):
"""Collect or replay token_selected_slots data.
Args:
num_slots: Number of slots.
token_selected_slots: Tensor of selected expert slots.
Returns:
Original tensor in COLLECT mode, or replayed tensor in REPLAY mode.
"""
if self.mode == Mode.COLLECT:
return self._collect_slots(num_slots, token_selected_slots)
if self.mode == Mode.REPLAY:
return self._replay_slots(num_slots, token_selected_slots)
return token_selected_slots
def _collect_slots(self, num_slots, token_selected_slots):
"""Collect slots data for current iteration."""
# Skip if model was not wrapped (no layer context available)
if not hasattr(self, "_current_layer_idx"):
return token_selected_slots
if token_selected_slots.dtype != self.SLOTS_DTYPE:
raise ValueError(
f"Unexpected dtype for token_selected_slots: expected {self.SLOTS_DTYPE}, "
f"got {token_selected_slots.dtype}"
)
if self._layer_indices is None or self._current_layer_idx in self._layer_indices:
self._cur_iter_records.append(
(self._current_layer_idx, num_slots, token_selected_slots)
)
return token_selected_slots
def _replay_slots(self, num_slots, token_selected_slots):
"""Replay slots data from recorded buffer."""
# Skip if model was not wrapped (no layer context available)
if not hasattr(self, "_current_layer_idx"):
return token_selected_slots
if token_selected_slots.dtype != self.SLOTS_DTYPE:
raise ValueError(
f"Unexpected dtype for token_selected_slots: expected {self.SLOTS_DTYPE}, "
f"got {token_selected_slots.dtype}"
)
# Record actual metadata for deferred verification in stop()
self._cur_iter_actual_metadata.append(
{
"layer_idx": self._current_layer_idx,
"num_slots": num_slots,
"token_selected_slots_shape": list(token_selected_slots.shape),
}
)
# Immediate validation (does not depend on GPU data, safe during CUDA Graphs)
if self._started:
chunk_metadata = self._cur_iter_metadata[self._replay_chunk_idx]
expected_layer_idx = chunk_metadata["layer_idx"]
expected_num_slots = chunk_metadata["num_slots"]
expected_shape = chunk_metadata["token_selected_slots_shape"]
if self._current_layer_idx != expected_layer_idx:
raise ValueError(
f"Layer index mismatch during replay: expected {expected_layer_idx}, "
f"got {self._current_layer_idx}"
)
if num_slots != expected_num_slots:
raise ValueError(
f"num_slots mismatch during replay: expected {expected_num_slots}, "
f"got {num_slots}"
)
if list(token_selected_slots.shape) != expected_shape:
raise ValueError(
f"Shape mismatch during replay: expected {expected_shape}, "
f"got {list(token_selected_slots.shape)}"
)
else:
if (
num_slots < self.MAX_TOPK_AND_MIN_SLOTS
or token_selected_slots.shape[-1] > self.MAX_TOPK_AND_MIN_SLOTS
):
raise ValueError(
"Invalid initial replayed_slots, please adjust `MAX_TOPK_AND_MIN_SLOTS`"
)
if self._started or torch.cuda.is_current_stream_capturing():
n = token_selected_slots.numel()
buffer = (
self._replay_eager_slots_gpu if self._use_eager_mode else self._slots_buffer_gpu
)
replayed_slots = buffer[self._replay_offset : self._replay_offset + n].view_as(
token_selected_slots
)
self._replay_chunk_idx += 1
self._replay_offset += n
return replayed_slots
return token_selected_slots
def start(self):
"""Start calibration. Call before the profiling loop."""
assert not self._started
self._started = True
if self.mode != Mode.NONE:
logger.info(f"Layer-wise benchmarks: Calibrator started in {self.mode.name} mode")
if self.mode == Mode.COLLECT:
# Per-iteration storage buffers
# CUDA Graphs reuse fixed buffers, so we must copy data out after each step
self._collected_metadata_idx = torch.empty(
self.MAX_COLLECT_ITERATIONS, dtype=torch.long, device="cuda"
)
self._collected_slots_cpu = torch.empty(
self.MAX_COLLECT_ITERATIONS,
self.MAX_SLOTS_BUFFER_SIZE,
dtype=torch.int32,
device="cpu",
pin_memory=True,
)
self._collected_records = []
# Eager mode flag: True when data exceeds MAX_SLOTS_BUFFER_SIZE
# (context phase, not using CUDA Graphs)
self._use_eager_mode = False
if self.mode == Mode.REPLAY and self._replay_verify_metadata:
# Per-iteration storage buffers for deferred metadata verification
# Note: _actual_metadata_list, _actual_metadata_idx_gpu, _metadata_idx_range_gpu
# are created in _init_replay_mode() before CUDA Graph capture
self._collected_actual_metadata_idx = torch.empty(
self.MAX_REPLAY_ITERATIONS, dtype=torch.long, device="cuda"
)
# post_step() runs outside CUDA Graphs, so we can use Python list for iterations
self._collected_iterations = []
def pre_step(self, iteration: int):
"""Prepare for an iteration. Call before model.forward().
Args:
iteration: Current iteration number.
"""
if self.mode == Mode.REPLAY and self._started:
self._cur_iter_metadata = self._replay_db[iteration]["metadata"]
slots_gpu = self._replay_db[iteration]["slots_data_gpu"]
expected_size = sum(
torch.Size(m["token_selected_slots_shape"]).numel() for m in self._cur_iter_metadata
)
if len(slots_gpu) != expected_size:
raise ValueError(
f"Slots data size mismatch for iteration {iteration}: "
f"expected {expected_size}, got {len(slots_gpu)}"
)
if expected_size <= self.MAX_SLOTS_BUFFER_SIZE:
self._use_eager_mode = False
self._slots_buffer_gpu[:expected_size].copy_(slots_gpu, non_blocking=True)
else:
self._use_eager_mode = True
self._replay_eager_slots_gpu = slots_gpu
def post_step(self, iteration: int):
"""Finalize an iteration. Call after model.forward().
Args:
iteration: Current iteration number.
"""
if self.mode == Mode.COLLECT and self._started:
record_idx = len(self._collected_records)
if record_idx >= self.MAX_COLLECT_ITERATIONS:
raise ValueError(
f"Exceeded MAX_COLLECT_ITERATIONS={self.MAX_COLLECT_ITERATIONS}. "
"Increase the limit or reduce the profiling iterations."
)
self._collected_metadata_idx[record_idx].copy_(self._metadata_idx_gpu)
if self._use_eager_mode:
# TODO: Avoid synchronization by using async copy
slots_cpu = torch.cat(self._eager_slots_gpu).to("cpu")
else:
slots_cpu = self._collected_slots_cpu[record_idx]
# TODO: Copy only required elements instead of entire buffer
slots_cpu.copy_(self._slots_buffer_gpu, non_blocking=True)
self._collected_records.append(
{
"iteration": iteration,
"slots_cpu": slots_cpu,
}
)
# Reset eager mode for next step
self._use_eager_mode = False
if self.mode == Mode.REPLAY and self._started and self._replay_verify_metadata:
# Record metadata index on GPU (no sync), verification deferred to stop()
record_idx = len(self._collected_iterations)
if record_idx >= self.MAX_REPLAY_ITERATIONS:
raise ValueError(
f"Exceeded MAX_REPLAY_ITERATIONS={self.MAX_REPLAY_ITERATIONS}. "
"Increase the limit or reduce the profiling iterations."
)
self._collected_actual_metadata_idx[record_idx].copy_(
self._actual_metadata_idx_gpu, non_blocking=True
)
# post_step() runs outside CUDA Graphs, so we can use Python list
self._collected_iterations.append(iteration)
def stop(self):
"""Stop calibration and save data. Call after the profiling loop."""
assert self._started
if self.mode == Mode.COLLECT:
self._save_collected_data()
if self.mode == Mode.REPLAY and self._replay_verify_metadata:
self._verify_replay_metadata()
self._started = False
def _save_collected_data(self):
"""Save collected calibration data to file."""
torch.cuda.synchronize()
metadata_idx_list = self._collected_metadata_idx.tolist()
output_records = []
for record_idx, record in enumerate(self._collected_records):
metadata = self._metadata_list[metadata_idx_list[record_idx]]
slots_size = sum(torch.Size(m["token_selected_slots_shape"]).numel() for m in metadata)
slots_data = record["slots_cpu"][:slots_size]
output_records.append(
{
"iteration": record["iteration"],
"metadata": metadata,
"raw_data": base64.b64encode(
zlib.compress(slots_data.numpy().tobytes())
).decode(),
}
)
# Gather from all ranks and save on rank 0
all_records = self._dist.allgather(output_records)
if self._dist.rank == 0:
with open(self._file_path, "w") as f:
json.dump(
{
"world_size": self._dist.world_size,
"all_ranks_records": all_records,
},
f,
)
if self.mode != Mode.NONE:
logger.info(f"Layer-wise benchmarks: Calibrator saved data to {self._file_path}")
def _verify_replay_metadata(self):
"""Verify that replayed metadata matches actual execution."""
torch.cuda.synchronize()
record_count = len(self._collected_iterations)
chunk_count = 0
actual_idx_list = self._collected_actual_metadata_idx[:record_count].tolist()
for record_idx, (actual_idx, iteration) in enumerate(
zip(actual_idx_list, self._collected_iterations)
):
actual_metadata = self._actual_metadata_list[actual_idx]
expected_metadata = self._replay_db[iteration]["metadata"]
if len(actual_metadata) != len(expected_metadata):
raise ValueError(
f"Metadata length mismatch at record {record_idx}: "
f"actual {len(actual_metadata)} chunks, expected {len(expected_metadata)} chunks"
)
for chunk_idx, (actual_chunk, expected_chunk) in enumerate(
zip(actual_metadata, expected_metadata)
):
if actual_chunk["layer_idx"] != expected_chunk["layer_idx"]:
raise ValueError(
f"Layer index mismatch at record {record_idx}, chunk {chunk_idx}: "
f"actual layer_idx={actual_chunk['layer_idx']}, "
f"expected layer_idx={expected_chunk['layer_idx']}"
)
if actual_chunk["num_slots"] != expected_chunk["num_slots"]:
raise ValueError(
f"num_slots mismatch at record {record_idx}, chunk {chunk_idx}: "
f"actual num_slots={actual_chunk['num_slots']}, "
f"expected num_slots={expected_chunk['num_slots']}"
)
if (
actual_chunk["token_selected_slots_shape"]
!= expected_chunk["token_selected_slots_shape"]
):
raise ValueError(
f"Shape mismatch at record {record_idx}, chunk {chunk_idx}: "
f"actual shape={actual_chunk['token_selected_slots_shape']}, "
f"expected shape={expected_chunk['token_selected_slots_shape']}"
)
chunk_count += 1
logger.info(
f"Layer-wise benchmarks: Replay metadata verification passed for {record_count} iterations"
f" and {chunk_count} chunks"
)
def get_replay_iteration_range(self):
"""Get the valid iteration range for REPLAY mode.
Returns a tuple (start_iter, stop_iter) representing the range of iterations
that can be replayed. This method verifies that the iterations form a
contiguous range. Cross-rank iteration consistency is verified in init().
Returns:
tuple: (start_iter, stop_iter) where iterations are in [start_iter, stop_iter]
Raises:
ValueError: If mode is not REPLAY or iterations are not a contiguous range.
"""
if self.mode != Mode.REPLAY:
raise ValueError(
f"get_replay_iteration_range() is only valid in REPLAY mode, "
f"current mode is {self.mode.name}"
)
# Verify iterations form a contiguous range
local_iterations = sorted(self._replay_db.keys())
start_iter = local_iterations[0]
stop_iter = local_iterations[-1]
if local_iterations != list(range(start_iter, stop_iter + 1)):
raise ValueError("Iterations are not a contiguous range")
return start_iter, stop_iter
_calibrator = Calibrator()
def get_calibrator():
"""Get the global calibrator instance."""
return _calibrator

View File

@ -441,8 +441,24 @@ class Runner:
checkpoint_dir=pretrained_model_name_or_path, checkpoint_loader=checkpoint_loader
)
self.layers = [model.model.layers[i] for i in layer_indices]
def forward(position_ids, hidden_states, attn_metadata, residual, **kwargs):
# TODO: to be more general, we should call DecoderModel.forward
residual_fusion = hasattr(model.model.layers[layer_indices[0]], "next_layer_layernorm")
for layer_idx in layer_indices:
layer = model.model.layers[layer_idx]
if residual_fusion:
hidden_states, residual = layer(
position_ids, hidden_states, attn_metadata, residual, **kwargs
)
else:
hidden_states = layer(position_ids, hidden_states, attn_metadata, **kwargs)
return hidden_states, residual
model.forward = forward
self.model_config = model.model_config
self.model = model
self.layer_indices = layer_indices
@staticmethod
@contextlib.contextmanager
@ -643,27 +659,20 @@ class Runner:
kwargs["mamba_metadata"] = mamba_metadata
def run_pack(*, check=False):
output = hidden_states, residual
with model_extra_attrs(self.model_config.extra_attrs):
get_model_extra_attrs()["attention_metadata"] = weakref.ref(attn_metadata)
with torch.inference_mode():
# TODO: to be more general, we should call DecoderModel.forward
for layer in self.layers:
residual_fusion = hasattr(layer, "next_layer_layernorm")
if residual_fusion:
output = layer(
position_ids, output[0], attn_metadata, output[1], **kwargs
)
else:
output = layer(position_ids, output[0], attn_metadata, **kwargs), None
hidden_states_out, residual_out = self.model(
position_ids, hidden_states, attn_metadata, residual, **kwargs
)
if check:
if output[0].isnan().any():
if hidden_states_out.isnan().any():
raise ValueError("Has nan, please fix weights initialization")
if output[0].isinf().any():
if hidden_states_out.isinf().any():
raise ValueError("Has inf, please fix weights initialization")
if (output[0] == 0).sum() > 0.5 * output[0].numel():
if (hidden_states_out == 0).sum() > 0.5 * hidden_states_out.numel():
raise ValueError("Too many zeros, please fix weights initialization")
return output
return hidden_states_out, residual_out
return run_pack
@ -690,10 +699,13 @@ class Runner:
else 0
)
moe_modules = []
for layer in self.layers:
for layer_idx in self.layer_indices:
layer = self.model.model.layers[layer_idx]
if layer.__class__.__name__ == "NemotronHLayer":
if layer.layer_type == "E":
moe_modules.append(layer.mixer.experts)
elif layer.__class__.__name__ in ["GatedMLP"]:
pass
else:
moe_modules.append(layer.mlp.experts)

View File

@ -87,6 +87,7 @@ l0_b200:
- unittest/tools/test_layer_wise_benchmarks.py::test_deepseek_r1_ctx_dep[1]
- unittest/tools/test_layer_wise_benchmarks.py::test_nemotron_gen_dep[1]
- unittest/tools/test_layer_wise_benchmarks.py::test_qwen3_next_gen_tep[1]
- unittest/tools/test_layer_wise_benchmarks.py::test_performance_alignment[1]
- unittest/_torch/modeling/test_modeling_exaone4.py::TestEXAONE4::test_llm_load_1_FP8
- unittest/kv_cache_manager_v2_tests/
- condition:

View File

@ -231,6 +231,10 @@ methods:
annotation: int
default: 1000
status: prototype
layer_wise_benchmarks_config:
annotation: tensorrt_llm.llmapi.llm_args.LayerwiseBenchmarksConfig
default: null
status: prototype
model_kwargs:
annotation: Optional[Dict[str, Any]]
default: null

View File

@ -179,3 +179,23 @@ def test_qwen3_next_gen_tep(llm_root, world_size):
["python3", "parse.py", "--profile-dir", profile_dir, f"--world-size={world_size}"],
cwd=llm_root / "examples" / "layer_wise_benchmarks",
)
@pytest.mark.parametrize("world_size", [1, 4])
def test_performance_alignment(llm_root, world_size):
if torch.cuda.device_count() < world_size:
pytest.skip(f"needs {world_size:d} GPUs to run this test")
model_root = llm_models_root(check=True)
profile_dir = f"profiles/test_performance_alignment_{world_size}"
check_call(
[
"./sample_performance_alignment.sh",
],
cwd=llm_root / "examples" / "layer_wise_benchmarks",
env={
**os.environ,
"MODEL": model_root / "DeepSeek-V3-Lite" / "nvfp4_moe_only",
"NP": f"{world_size:d}",
"PROFILE_DIR": profile_dir,
},
)