diff --git a/examples/layer_wise_benchmarks/README.md b/examples/layer_wise_benchmarks/README.md index 426dee02fd..7bdc794a9b 100644 --- a/examples/layer_wise_benchmarks/README.md +++ b/examples/layer_wise_benchmarks/README.md @@ -171,6 +171,121 @@ You will receive three reports, each containing kernel timing statistics grouped 2. A CSV report at `profiles/report_np4_rank0.csv` 3. An HTML report at `profiles/report_np4_rank0.html` +## Performance alignment between end-to-end performance and layer-wise benchmarks + +An overall example can be found in `sample_performance_alignment.sh`. Here is an abstract of the main steps. + +1. Run end-to-end serving in **COLLECT** mode, and capture nsys profiles. This step generates a calibration file. + + Please meet the following requirements. + + 1. Add the following fields to `config.yaml`. + + ```yaml + layer_wise_benchmarks_config: + calibration_mode: COLLECT + calibration_file_path: profiles/calibration_data.json + ``` + + 2. Set `TLLM_PROFILE_START_STOP` to a range that can capture some iterations (typically tens of iterations) of GEN phase. Ensure every iteration has the same batch size. Please capture 5 more iterations at beginning, because the first 5 iterations are regarded as warm-ups and will be dropped by the parser by default. + + 3. Capture per-rank nsys profiles, and every rank should produce a separate file. + + You need to put `nsys profile` behind `mpirun` or `srun`. To minimize profile overhead and file size, there is no need to capture samples and GPU metrics. + + If you use `trtllm-serve` or `trtllm-bench`, please follow the following command order. If you use `examples/disaggregated/slurm/benchmark/submit.py`, setting `gen_profile_range` is enough. + + ```bash + NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \ + nsys profile \ + -t cuda,nvtx \ + --cpuctxsw none --cuda-event-trace false \ + --cuda-graph-trace node \ + -c cudaProfilerApi --capture-range-end stop \ + -o profiles/report_e2e_collect_rank%q{RANK}.nsys-rep \ + --force-overwrite true \ + trtllm-llmapi-launch \ + trtllm-bench \ + --model ... + ``` + + 4. To be more precise, set the same `TLLM_AUTOTUNER_CACHE_PATH` for all the steps. The autotuner cache file should be generated by Step 1, and be reused by Step 2 and Step 3. + +2. If the end-to-end serving uses CUDA Graphs, run Step 1 again in **MARK** mode without CUDA Graphs, and also capture nsys profiles. + + The differences are as follows. + + 1. Add the following fields to `config.yaml`. + + ```yaml + cuda_graph_config: null + layer_wise_benchmarks_config: + calibration_mode: MARK + ``` + + 2. Change the paths of profiles. The recommended argument is `-o profiles/report_e2e_mark_rank%q{RANK}.nsys-rep`. + +3. Run layer-wise benchmarks with the calibration file obtained by Step 1. + + ```bash + NP=4 ./mpi_launch.sh ./run.sh config_gen.yaml \ + --model "$LLM_MODELS_ROOT/DeepSeek-R1/DeepSeek-R1-0528-FP4-v2" \ + --load-format AUTO \ + --layer-indices 5,6,7 \ + --batch-size 32 \ + --seq-len-q 1 \ + --seq-len-kv-cache 2090 \ + --balance-method NotModified \ + --replay-file-path profiles/calibration_data.json \ + --replay-start 47 \ + --replay-stop 67 + ``` + + Here are explanations of every argument: + + | Argument/Parameter | Explanation | + |-------------------|-------------| + | `NP=4` | Should match the end-to-end run. | + | `--load-format AUTO` | Instruct the benchmark to load model weights instead of initializing random weights. | + | `--layer-indices 5,6,7` | A list of contiguous layers you want to calibrate. | + | `--batch-size 32` | Should match the end-to-end run. | + | `--seq-len-q 1` | Should match (1+MTP) of the end-to-end run. | + | `--seq-len-kv-cache 2090` | Estimation of the average context length for iterations you captured. The first 5 iterations should be excluded from the estimation, because they will be dropped by parser. | + | `--replay-file-path` | The calibration file obtained by Step 1. | + | `--replay-start` and `--replay-stop` | Should match the end-to-end `TLLM_PROFILE_START_STOP`. Do not replay the first 5 iterations, because they will be dropped by parser. | + +4. Parse end-to-end profiles with `parse_e2e.py`, and parse layer-wise benchmarks profiles with `parse.py`. + + ```bash + seq 0 $((NP - 1)) | xargs -I% python3 parse_e2e.py \ + --eager-trace profiles/report_e2e_mark_rank%.nsys-rep \ + --graph-trace profiles/report_e2e_collect_rank%.nsys-rep \ + --layer-indices 5,6,7 \ + --warmup-times 5 \ + -o profiles/report_e2e_collect_rank%.json + seq 0 $((NP - 1)) | xargs -I% python3 parse.py \ + --world-size $NP \ + --rank % + ``` + +5. Run `correlation.py` to generate the correlation report. + + ```bash + python3 correlation.py \ + --reference profiles/report_e2e_collect_rank0.json \ + $(seq 1 $((NP - 1)) | xargs -I% echo "--target profiles/report_e2e_collect_rank%.json") \ + $(seq 0 $((NP - 1)) | xargs -I% echo "--target profiles/report_np${NP}_rank%.json") \ + -o profiles/correlation.html + ``` + + Please find `profiles/correlation.html` for the report. + +Limitations: + +1. Pipeline parallelism is not supported. +2. MoE backends CUTLASS and WIDEEP are supported. +3. Only tested with GEN phase and attention DP. + ## Developer utilities 1. Less startup time when debug a model diff --git a/examples/layer_wise_benchmarks/template.html b/examples/layer_wise_benchmarks/breakdown_template.html similarity index 100% rename from examples/layer_wise_benchmarks/template.html rename to examples/layer_wise_benchmarks/breakdown_template.html diff --git a/examples/layer_wise_benchmarks/correlation.py b/examples/layer_wise_benchmarks/correlation.py new file mode 100644 index 0000000000..9d7ce8f32b --- /dev/null +++ b/examples/layer_wise_benchmarks/correlation.py @@ -0,0 +1,96 @@ +import argparse +import json +from pathlib import Path + +import jinja2 +from parser_utils import kernel_short_name, shortest_common_supersequence + +# Parse cmdline +parser = argparse.ArgumentParser() +parser.add_argument("--reference", type=str, required=True) +parser.add_argument("--target", action="append", type=str, required=True) +parser.add_argument("--output", "-o", type=str, required=True) +args = parser.parse_args() +print(args) + +with open(args.reference) as f: + ref_data_all = json.load(f) +if len(ref_data_all) != 1: + raise ValueError("Ambiguous reference data") +ref_data = ref_data_all[0] +ref_kernel_names = [o["name"] for o in ref_data["timeline"]] +data = [] +data.append( + { + "series": f"reference: {ref_data['name']}", + "points": [ + { + "x": i + 1, + "name": kernel_short_name(o["name"]), + "duration": o["duration"] / 1000, + "end": o["end"] / 1000, + } + for i, o in enumerate(ref_data["timeline"]) + ], + } +) + +for target_id, target in enumerate(args.target): + with open(target) as f: + tgt_data_all = json.load(f) + for timeline_id, tgt_data in enumerate(tgt_data_all): + tgt_kernel_names = [o["name"] for o in tgt_data["timeline"]] + sup_kernel_names = shortest_common_supersequence(ref_kernel_names, tgt_kernel_names) + + x_sup = [] + j = 0 + for sup_kernel_name in sup_kernel_names: + if j < len(ref_kernel_names) and sup_kernel_name == ref_kernel_names[j]: + x_sup.append(j + 1) + j += 1 + else: + x_sup.append(None) + print(f"target {target_id} timeline {timeline_id} {x_sup=}") + + x_tgt = [] + j = 0 + for tgt_kernel_name in tgt_kernel_names: + while sup_kernel_names[j] != tgt_kernel_name: + j += 1 + x_tgt.append(x_sup[j]) + j += 1 + if x_tgt[0] is None: + x_tgt[0] = min(1, min(x for x in x_tgt if x is not None) - 1) + if x_tgt[-1] is None: + x_tgt[-1] = max(len(ref_kernel_names), max(x for x in x_tgt if x is not None) + 1) + top = 0 + while top < len(x_tgt) - 1: + next_top = top + 1 + while x_tgt[next_top] is None: + next_top += 1 + for i in range(top + 1, next_top): + x_tgt[i] = x_tgt[top] + (x_tgt[next_top] - x_tgt[top]) * (i - top) / ( + next_top - top + ) + top = next_top + print(f"target {target_id} timeline {timeline_id} {x_tgt=}") + + data.append( + { + "series": f"target{target_id}: {tgt_data['name']}", + "points": [ + { + "x": x, + "name": kernel_short_name(o["name"]), + "duration": o["duration"] / 1000, + "end": o["end"] / 1000, + } + for x, o in zip(x_tgt, tgt_data["timeline"]) + ], + } + ) + +loader = jinja2.FileSystemLoader(Path(__file__).parent) +template = jinja2.Environment(loader=loader).get_template("correlation_template.html") +with open(args.output, "w") as f: + f.write(template.render(rawData=data)) diff --git a/examples/layer_wise_benchmarks/correlation_template.html b/examples/layer_wise_benchmarks/correlation_template.html new file mode 100644 index 0000000000..2fa14cd0ee --- /dev/null +++ b/examples/layer_wise_benchmarks/correlation_template.html @@ -0,0 +1,152 @@ + + + + + + Correlation + + + + +

CUDA Kernel Correlation

+
+
+ + + diff --git a/examples/layer_wise_benchmarks/middleware/mpi_env_from_ompi b/examples/layer_wise_benchmarks/middleware/mpi_env_from_ompi new file mode 100755 index 0000000000..0dcdb57cc8 --- /dev/null +++ b/examples/layer_wise_benchmarks/middleware/mpi_env_from_ompi @@ -0,0 +1,10 @@ +#!/bin/bash + +set -euo pipefail + +export WORLD_SIZE=$OMPI_COMM_WORLD_SIZE +export RANK=$OMPI_COMM_WORLD_RANK +export LOCAL_RANK=$OMPI_COMM_WORLD_LOCAL_RANK +export NODE_RANK=$OMPI_COMM_WORLD_NODE_RANK + +"$@" diff --git a/examples/layer_wise_benchmarks/parse.py b/examples/layer_wise_benchmarks/parse.py index c878574da6..086d6fcd19 100644 --- a/examples/layer_wise_benchmarks/parse.py +++ b/examples/layer_wise_benchmarks/parse.py @@ -4,13 +4,17 @@ import csv import json import re import sqlite3 -import subprocess -import sys from pathlib import Path import jinja2 import numpy as np import pandas as pd +from parser_utils import ( + kernel_short_name, + lazy_convert_sqlite, + shortest_common_supersequence, + warned_names, +) # Parse cmdline parser = argparse.ArgumentParser() @@ -32,66 +36,6 @@ if (args.file_path is None) == (args.world_size is None): parser.error("Please specify exactly one of --file-path and --world-size.") print(args) - -def lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path): - if ( - not sqlite_file_path.is_file() - or nsys_rep_file_path.stat().st_mtime > sqlite_file_path.stat().st_mtime - ): - subprocess.check_call( - [ - "nsys", - "export", - "--type", - "sqlite", - "-o", - sqlite_file_path, - "--force-overwrite=true", - nsys_rep_file_path, - ] - ) - - -def shortest_common_supersequence(a, b): - # Merge two lists into their shortest common supersequence, - # so that both `a` and `b` are subsequences of the result. - # Uses dynamic programming to compute the shortest common supersequence, then reconstructs it. - m, n = len(a), len(b) - dp = [[0] * (n + 1) for _ in range(m + 1)] - for i in range(m + 1): - dp[i][0] = i - for j in range(n + 1): - dp[0][j] = j - for i in range(1, m + 1): - for j in range(1, n + 1): - if a[i - 1] == b[j - 1]: - dp[i][j] = dp[i - 1][j - 1] + 1 - else: - dp[i][j] = min(dp[i - 1][j] + 1, dp[i][j - 1] + 1) - # Backtrack to build the merged sequence - res = [] - i, j = m, n - while i > 0 and j > 0: - if a[i - 1] == b[j - 1]: - res.append(a[i - 1]) - i -= 1 - j -= 1 - elif dp[i - 1][j] < dp[i][j - 1]: - res.append(a[i - 1]) - i -= 1 - else: - res.append(b[j - 1]) - j -= 1 - while i > 0: - res.append(a[i - 1]) - i -= 1 - while j > 0: - res.append(b[j - 1]) - j -= 1 - res.reverse() - return res - - if args.file_path is not None: nsys_rep_file_path = Path(args.file_path) if not nsys_rep_file_path.name.endswith(".nsys-rep"): @@ -106,6 +50,9 @@ csv_file_path = nsys_rep_file_path.parent / (nsys_rep_file_path.name[: -len(".ns html_file_path = nsys_rep_file_path.parent / ( nsys_rep_file_path.name[: -len(".nsys-rep")] + ".html" ) +json_file_path = nsys_rep_file_path.parent / ( + nsys_rep_file_path.name[: -len(".nsys-rep")] + ".json" +) lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path) conn = sqlite3.connect(f"file:{sqlite_file_path}?mode=ro", uri=True) @@ -133,7 +80,7 @@ for start, text in df.itertuples(index=False): problem_start.append(start) problem_set.append( { - "spec": json.loads(text[len("layer_wise_benchmarks problem_spec") :]), + "spec": json.loads(text[len("layer_wise_benchmarks problem_spec ") :]), "text": "", "runs": [], "runs_end": [], @@ -145,15 +92,19 @@ for start, text in df.itertuples(index=False): query = """SELECT T1.start, T1.end, T2.value AS text FROM NVTX_EVENTS AS T1 JOIN StringIds AS T2 ON T1.textId = T2.id - WHERE eventType = ? AND T2.value NOT LIKE ? AND T2.value NOT LIKE ? AND domainId != ?""" + WHERE eventType = ? AND T2.value NOT LIKE ? AND domainId != ?""" df = pd.read_sql_query( query, conn, - params=(event_id_NvtxPushPopRange, "layer_wise_benchmarks %", "[DG]%", nccl_domain_id), + params=(event_id_NvtxPushPopRange, "[DG]%", nccl_domain_id), ) for start, end, text in df.itertuples(index=False): problem_id = bisect.bisect(problem_start, start) - 1 - assert problem_id != -1 + if text.startswith("layer_wise_benchmarks "): + if text != "layer_wise_benchmarks ignore": + continue + else: + assert problem_id != -1 if re.match(r"b=\d+ s=\d+ ", text): problem_set[problem_id]["text"] = text problem_set[problem_id]["runs"].append(start) @@ -216,7 +167,9 @@ for ( for range_id in ranges: problem["kernel_count_per_range"][range_id] += 1 range_names = [problem["ranges"][i][2] for i in ranges] - if args.module is None or args.module in range_names: + if ( + args.module is None or args.module in range_names + ) and "layer_wise_benchmarks ignore" not in range_names: kernel_list.append( ( problem_id, @@ -235,6 +188,7 @@ for ( query = "SELECT * FROM StringIds" df = pd.read_sql_query(query, conn) string_ids = dict(zip(df["id"], df["value"])) +string_ids.update({-2: "Memcpy", -3: "Memset"}) conn.close() @@ -275,75 +229,13 @@ for problem_id in range(len(kernels)): seq = [demangledName for demangledName, _, _, _ in kernels[problem_id][run_id]] assert seq == required_seq - -parser_keywords = [ - ("cuBLASGemm", "nvjet"), - ("cutlassGroupGemm", "cutlass::device_kernel::launch_clamp_scalar("), - ("torchCompare", "at::native::::CompareFunctor<"), - ("torchCopy", "at::native::bfloat16_copy_kernel_cuda"), - ("torchCopy", "at::native::direct_copy_kernel_cuda("), - ("torchFill", "at::native::FillFunctor"), - ("torchIndexPut", "at::native::index_put_kernel_impl<"), - ("torchMul", "at::native::binary_internal::MulFunctor<"), - ("torchPow", "at::native::::pow_tensor_scalar_kernel_impl<"), - ("torchReduceSum", ["at::native::reduce_kernel<", "at::native::sum_functor<"]), - ("torchSigmoid", "at::native::sigmoid_kernel_cuda"), - ("torchWhere", "at::native::::where_kernel_impl("), -] -warned_names = set() - - -def parse_kernel_name(demangledName): - if demangledName == -2: - return "Memcpy" - if demangledName == -3: - return "Memset" - name = string_ids[demangledName] - for dst, src in parser_keywords: - if not isinstance(src, (tuple, list)): - src = [src] - if all(keyword in name for keyword in src): - return dst - if re.search(r"at::native::.*elementwise_kernel<", name): - if name not in warned_names: - print(f"Not parsed torch kernel name: {name}", file=sys.stderr) - warned_names.add(name) - assert "!unnamed!" not in name - name = name.replace("", "!unnamed!") - if "<" in name: - name = name[: name.index("<")] - if "(" in name: - name = name[: name.index("(")] - if "::" in name: - name = name[name.rindex("::") + 2 :] - name = name.replace("!unnamed!", "") - return name - - converted_seqs = [] +warmup_times = run_args["warmup_times"] if args.warmup_times is None else args.warmup_times for runs in kernels: - warmup_times = run_args["warmup_times"] if args.warmup_times is None else args.warmup_times converted_seq = [] # Kernel time for i, (demangledName, _, _, ranges) in enumerate(runs[0]): - name = parse_kernel_name(demangledName) + name = kernel_short_name(string_ids[demangledName]) category = (*ranges, name) time_list = [run[i][2] - run[i][1] for run in runs] t = np.mean(time_list[warmup_times:]).tolist() @@ -399,6 +291,7 @@ csv_data = [["", *[problem["text"] for problem in problem_set]]] js_data = [] js_stack = [js_data] max_title_len = max((len(title) - 1) * 3 + len(title[-1][:40]) for title in merged_title) +print("-" * (max_title_len + 1 + 6 * len(problem_set))) for title, time_data in zip(merged_title, merged_data): while stack != list(title[: len(stack)]): level_title = stack[-1] @@ -458,7 +351,7 @@ for problem in problem_set: innermost_children = innermost_children[-1]["children"] innermost_children.append({"name": problem["text"]}) loader = jinja2.FileSystemLoader(Path(__file__).parent) -template = jinja2.Environment(loader=loader).get_template("template.html") +template = jinja2.Environment(loader=loader).get_template("breakdown_template.html") with html_file_path.open("w") as f: configText = ( "Run:\n" @@ -481,3 +374,26 @@ if args.query is not None: query + " " * (max_title_len - len(query)), *[f"{x / 1000:-6.1f}" for x in query_matched], ) + +correlation = [] +for problem, runs in zip(problem_set, kernels): + timeline = [] + for i, (demangledName, _, _, _) in enumerate(runs[0]): + name = string_ids[demangledName] + duration_list = [run[i][2] - run[i][1] for run in runs] + end_list = [run[i][2] - run[0][1] for run in runs] + timeline.append( + { + "name": name, + "duration": np.mean(duration_list[warmup_times:]).tolist(), + "end": np.mean(end_list[warmup_times:]).tolist(), + } + ) + correlation.append( + { + "name": problem["text"], + "timeline": timeline, + } + ) +with json_file_path.open("w") as f: + json.dump(correlation, f) diff --git a/examples/layer_wise_benchmarks/parse_e2e.py b/examples/layer_wise_benchmarks/parse_e2e.py new file mode 100644 index 0000000000..d55d5aae7d --- /dev/null +++ b/examples/layer_wise_benchmarks/parse_e2e.py @@ -0,0 +1,247 @@ +import argparse +import bisect +import json +import re +import sqlite3 +from pathlib import Path + +import numpy as np +import pandas as pd +from parser_utils import ( + kernel_short_name, + lazy_convert_sqlite, + shortest_common_supersequence, + warned_names, +) + + +def comma_separated_ints(s): + return [int(x) for x in s.split(",")] + + +parser = argparse.ArgumentParser() +parser.add_argument("--eager-trace", type=str, required=True) +parser.add_argument("--graph-trace", type=str) +parser.add_argument("--target-ctx-reqs", type=int, default=0) +parser.add_argument("--target-gen-reqs", type=int) +parser.add_argument("--layer-indices", type=comma_separated_ints, required=True) +parser.add_argument("--warmup-times", type=int, default=5) +group = parser.add_mutually_exclusive_group() +group.add_argument("--error-on-unknown-kernel", action="store_true", dest="error_on_unknown_kernel") +group.add_argument( + "--no-error-on-unknown-kernel", action="store_false", dest="error_on_unknown_kernel" +) +parser.set_defaults(error_on_unknown_kernel=False) +parser.add_argument("--output", "-o", type=str) +args = parser.parse_args() +if not args.eager_trace.endswith(".nsys-rep"): + parser.error("Please provide a .nsys-rep file for the --eager-trace option.") +if args.graph_trace is not None and not args.graph_trace.endswith(".nsys-rep"): + parser.error("Please provide a .nsys-rep file for the --graph-trace option.") +print(args) + + +def is_gemm(name): + return "nvjet" in name or "gemm" in name.lower() + + +eager_nsys_rep_file_path = Path(args.eager_trace) +# For CTX phase which does not use CUDA Graphs, analysis the eager trace instead. +# Here we do not change the identifier name "graph_*" for convenience. +graph_nsys_rep_file_path = Path(args.graph_trace or args.eager_trace) +eager_sqlite_file_path = eager_nsys_rep_file_path.parent / ( + eager_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite" +) +graph_sqlite_file_path = graph_nsys_rep_file_path.parent / ( + graph_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite" +) +lazy_convert_sqlite(eager_nsys_rep_file_path, eager_sqlite_file_path) +lazy_convert_sqlite(graph_nsys_rep_file_path, graph_sqlite_file_path) +eager_conn = sqlite3.connect(f"file:{eager_sqlite_file_path}?mode=ro", uri=True) +graph_conn = sqlite3.connect(f"file:{graph_sqlite_file_path}?mode=ro", uri=True) + +query = "SELECT * FROM ENUM_NSYS_EVENT_TYPE" +df = pd.read_sql_query(query, eager_conn) +eager_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist() +df = pd.read_sql_query(query, graph_conn) +graph_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist() + +query = """SELECT T1.start, T1.end, T2.value AS text + FROM NVTX_EVENTS AS T1 + JOIN StringIds AS T2 ON T1.textId = T2.id + WHERE eventType = ?""" +df = pd.read_sql_query(query, eager_conn, params=(eager_event_id_NvtxPushPopRange,)) +target_ctx_reqs = args.target_ctx_reqs +target_gen_reqs = args.target_gen_reqs +if target_gen_reqs is None: + if target_ctx_reqs == 0: + for _, _, text in df.itertuples(index=False): + if m := re.match( + r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text + ): + ctx_reqs = int(m.group(2)) + gen_reqs = int(m.group(3)) + if ctx_reqs == target_ctx_reqs: + target_gen_reqs = gen_reqs + break + else: + raise ValueError("Cannot determine target_gen_reqs") + else: + target_gen_reqs = 0 +print(f"{target_ctx_reqs=} {target_gen_reqs=}") +eager_iters = [] +for start, end, text in df.itertuples(index=False): + if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text): + it = int(m.group(1)) + ctx_reqs = int(m.group(2)) + gen_reqs = int(m.group(3)) + if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs: + eager_iters.append((start, end, it)) +eager_iters = sorted(eager_iters)[args.warmup_times :] +iter_list = [t[2] for t in eager_iters] +print("Iters (eager)", *iter_list) +per_iter_eager_layers = [[] for _ in iter_list] +for start, end, text in df.itertuples(index=False): + if m := re.match(r"^layer_wise_benchmarks layer_idx (\d+)$", text): + layer_idx = int(m.group(1)) + it_idx = bisect.bisect(eager_iters, (start,)) - 1 + if it_idx < 0 or end > eager_iters[it_idx][1]: + continue + assert end <= eager_iters[it_idx][1], "Not belong to any iter" + per_iter_eager_layers[it_idx].append((start, end, it_idx, layer_idx)) +layer_list = [t[3] for t in per_iter_eager_layers[0]] +print("Layers (eager)", *layer_list) +for eager_layers in per_iter_eager_layers: + assert [t[3] for t in eager_layers] == layer_list, "inconsistent layer idx" +df = pd.read_sql_query(query, graph_conn, params=(graph_event_id_NvtxPushPopRange,)) +graph_iters = [] +for start, end, text in df.itertuples(index=False): + if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text): + it = int(m.group(1)) + ctx_reqs = int(m.group(2)) + gen_reqs = int(m.group(3)) + if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs: + graph_iters.append((start, end, it)) +graph_iters = sorted(graph_iters)[args.warmup_times :] +graph_iter_list = [t[2] for t in graph_iters] +print("Iters (graph)", *graph_iter_list) +if iter_list != graph_iter_list: + raise ValueError("The ID of iterations do not match") + + +def query_kernels(conn, iters): + query = """SELECT name FROM sqlite_master WHERE type = ?""" + df = pd.read_sql_query(query, conn, params=("table",)) + tables = df["name"].tolist() + unified_subquery = """SELECT T1.start, T1.end, T1.demangledName, T1.correlationId, T1.graphNodeId + FROM CUPTI_ACTIVITY_KIND_KERNEL AS T1""" + if "CUPTI_ACTIVITY_KIND_MEMCPY" in tables: + unified_subquery += """ UNION ALL + SELECT T2.start, T2.end, -2 AS demangledName, T2.correlationId, T2.graphNodeId + FROM CUPTI_ACTIVITY_KIND_MEMCPY AS T2""" + if "CUPTI_ACTIVITY_KIND_MEMSET" in tables: + unified_subquery += """ UNION ALL + SELECT T3.start, T3.end, -3 AS demangledName, T3.correlationId, T3.graphNodeId + FROM CUPTI_ACTIVITY_KIND_MEMSET AS T3""" + query = f"""SELECT unified.start, unified.end, unified.graphNodeId, unified.demangledName, + R.start AS runtime_start, R.end AS runtime_end + FROM ({unified_subquery}) AS unified + JOIN CUPTI_ACTIVITY_KIND_RUNTIME AS R ON unified.correlationId = R.correlationId""" + df = pd.read_sql_query(query, conn) + per_iter_kernels = [[] for _ in iters] + for start, end, graphNodeId, demangledName, runtime_start, runtime_end in df.itertuples( + index=False + ): + it_idx = bisect.bisect(iters, (runtime_start,)) - 1 + if it_idx < 0 or runtime_end > iters[it_idx][1]: + continue + per_iter_kernels[it_idx].append((runtime_start, graphNodeId, start, end, demangledName)) + for kernels in per_iter_kernels: + kernels.sort() + return per_iter_kernels + + +eager_per_iter_kernels = query_kernels(eager_conn, eager_iters) +graph_per_iter_kernels = query_kernels(graph_conn, graph_iters) +print("#Kernels (eager)", *[len(kernels) for kernels in eager_per_iter_kernels]) +print("#Kernels (graph)", *[len(kernels) for kernels in graph_per_iter_kernels]) +for eager_kernels, graph_kernels in zip(eager_per_iter_kernels, graph_per_iter_kernels): + assert all(a[4] == eager_per_iter_kernels[0][i][4] for i, a in enumerate(eager_kernels)), ( + "eager kernels change across iterations" + ) + assert all(a[4] == graph_per_iter_kernels[0][i][4] for i, a in enumerate(graph_kernels)), ( + "graph kernels change across iterations" + ) + +query = "SELECT * FROM StringIds" +df = pd.read_sql_query(query, eager_conn) +eager_string_ids = dict(zip(df["id"], df["value"])) +eager_string_ids.update({-2: "Memcpy", -3: "Memset"}) +df = pd.read_sql_query(query, graph_conn) +graph_string_ids = dict(zip(df["id"], df["value"])) +graph_string_ids.update({-2: "Memcpy", -3: "Memset"}) + +eager_conn.close() +graph_conn.close() + +eager_kernel_names = [eager_string_ids[kernel[4]] for kernel in eager_per_iter_kernels[0]] +graph_kernel_names = [graph_string_ids[kernel[4]] for kernel in graph_per_iter_kernels[0]] +super_kernel_names = shortest_common_supersequence(eager_kernel_names, graph_kernel_names) +print(f"#Kernels (supersequence) {len(super_kernel_names)}") +eager_per_layer_kernels = [[] for _ in layer_list] +for i, eager_kernel in enumerate(eager_per_iter_kernels[0]): + eager_layers_idx = bisect.bisect(per_iter_eager_layers[0], (eager_kernel[0],)) - 1 + if eager_layers_idx < 0 or eager_kernel[0] > per_iter_eager_layers[0][eager_layers_idx][1]: + continue + eager_per_layer_kernels[eager_layers_idx].append(i) +eager2super = [] +j = 0 +for i, eager_kernel_name in enumerate(eager_kernel_names): + while eager_kernel_name != super_kernel_names[j]: + j += 1 + eager2super.append(j) + j += 1 +super_per_layer_starts = [eager2super[a[0]] for a in eager_per_layer_kernels] +super_per_layer_ends = [eager2super[a[-1]] for a in eager_per_layer_kernels] +graph_per_layer_kernels = [[] for _ in layer_list] +j = 0 +for i, graph_kernel_name in enumerate(graph_kernel_names): + while graph_kernel_name != super_kernel_names[j]: + j += 1 + layer_idx = bisect.bisect(super_per_layer_starts, j) - 1 + if layer_idx >= 0 and j <= super_per_layer_ends[layer_idx]: + graph_per_layer_kernels[layer_idx].append(i) + j += 1 +timeline = [] +first_kernel_idx = min(graph_per_layer_kernels[layer_idx][0] for layer_idx in args.layer_indices) +for layer_idx in args.layer_indices: + for kernel_idx in graph_per_layer_kernels[layer_idx]: + duration_list = [] + end_list = [] + for it_idx in range(len(graph_per_iter_kernels)): + layer_start_time = graph_per_iter_kernels[it_idx][first_kernel_idx][2] + kernel_start_time = graph_per_iter_kernels[it_idx][kernel_idx][2] + kernel_end_time = graph_per_iter_kernels[it_idx][kernel_idx][3] + duration_list.append(kernel_end_time - kernel_start_time) + end_list.append(kernel_end_time - layer_start_time) + timeline.append( + { + "name": graph_kernel_names[kernel_idx], + "duration": np.mean(duration_list).tolist(), + "end": np.mean(end_list).tolist(), + } + ) +print(f"{'Kernel':40s} {'Duration':>8s} {'End':>8s}") +print("-" * (40 + 1 + 8 + 1 + 8)) +for o in timeline: + print( + f"{kernel_short_name(o['name'])[:40]:40s} {o['duration'] / 1000.0:-8.1f} {o['end'] / 1000.0:-8.1f}" + ) +if args.error_on_unknown_kernel and warned_names: + raise ValueError("Unknown kernel names encountered") + +if args.output: + if not args.output.endswith(".json"): + raise ValueError("Output file name must be *.json") + with open(args.output, "w") as f: + json.dump([{"name": "parse_e2e", "timeline": timeline}], f) diff --git a/examples/layer_wise_benchmarks/parser_utils.py b/examples/layer_wise_benchmarks/parser_utils.py new file mode 100644 index 0000000000..95491cc2f7 --- /dev/null +++ b/examples/layer_wise_benchmarks/parser_utils.py @@ -0,0 +1,216 @@ +import re +import subprocess +import sys + +import numpy as np + + +def lazy_convert_sqlite(nsys_rep_file_path, sqlite_file_path): + if ( + not sqlite_file_path.is_file() + or nsys_rep_file_path.stat().st_mtime > sqlite_file_path.stat().st_mtime + ): + subprocess.check_call( + [ + "nsys", + "export", + "--type", + "sqlite", + "-o", + sqlite_file_path, + "--force-overwrite=true", + nsys_rep_file_path, + ] + ) + + +parser_keywords = [ + ("cuBLASGemm", "nvjet"), + ("cutlassGroupGemm", "cutlass::device_kernel::launch_clamp_scalar("), + ("torchCompare", "at::native::::CompareFunctor<"), + ("torchCopy", "at::native::bfloat16_copy_kernel_cuda"), + ("torchCopy", "at::native::direct_copy_kernel_cuda("), + ("torchDiv", "at::native::binary_internal::DivFunctor<"), + ("torchFill", "at::native::FillFunctor"), + ("torchIndexPut", "at::native::index_put_kernel_impl<"), + ("torchMul", "at::native::binary_internal::MulFunctor<"), + ("torchPow", "at::native::::pow_tensor_scalar_kernel_impl<"), + ("torchReduceSum", ["at::native::reduce_kernel<", "at::native::sum_functor<"]), + ("torchScatterGather", "void at::native::_scatter_gather_elementwise_kernel<"), + ("torchSigmoid", "at::native::sigmoid_kernel_cuda"), + ("torchWhere", "at::native::::where_kernel_impl("), +] +warned_names = set() + + +def kernel_short_name(name): + for dst, src in parser_keywords: + if not isinstance(src, (tuple, list)): + src = [src] + if all(keyword in name for keyword in src): + return dst + if re.search(r"at::native::.*elementwise_kernel<", name): + if name not in warned_names: + print(f"Not parsed torch kernel name: {name}", file=sys.stderr) + warned_names.add(name) + assert "!unnamed!" not in name + name = name.replace("", "!unnamed!") + if "<" in name: + name = name[: name.index("<")] + if "(" in name: + name = name[: name.index("(")] + if "::" in name: + name = name[name.rindex("::") + 2 :] + name = name.replace("!unnamed!", "") + return name + + +def shortest_common_supersequence(a, b): + # Merge two lists into their shortest common supersequence, + # so that both `a` and `b` are subsequences of the result. + # Uses dynamic programming to compute the shortest common supersequence, then reconstructs it. + m, n = len(a), len(b) + dp = [[0] * (n + 1) for _ in range(m + 1)] + for i in range(m + 1): + dp[i][0] = i + for j in range(n + 1): + dp[0][j] = j + for i in range(1, m + 1): + for j in range(1, n + 1): + if a[i - 1] == b[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + 1 + else: + dp[i][j] = min(dp[i - 1][j] + 1, dp[i][j - 1] + 1) + # Backtrack to build the merged sequence + res = [] + i, j = m, n + while i > 0 and j > 0: + if a[i - 1] == b[j - 1]: + res.append(a[i - 1]) + i -= 1 + j -= 1 + elif dp[i - 1][j] < dp[i][j - 1]: + res.append(a[i - 1]) + i -= 1 + else: + res.append(b[j - 1]) + j -= 1 + while i > 0: + res.append(a[i - 1]) + i -= 1 + while j > 0: + res.append(b[j - 1]) + j -= 1 + res.reverse() + return res + + +try: + import numba + + numba_installed = True +except ImportError: + numba_installed = False + +if numba_installed: + # The core computation function: compiled to machine code by Numba. + # 'nopython=True' ensures it runs entirely without the Python interpreter for max speed. + @numba.jit(nopython=True) + def _core_scs(a_ids, b_ids): + m = len(a_ids) + n = len(b_ids) + + # Use a NumPy array instead of a Python list of lists. + # This creates a continuous memory block, similar to int dp[m+1][n+1] in C. + dp = np.zeros((m + 1, n + 1), dtype=np.int32) + + # 1. Initialize boundaries + # Corresponds to: dp[i][0] = i + for i in range(m + 1): + dp[i, 0] = i + # Corresponds to: dp[0][j] = j + for j in range(n + 1): + dp[0, j] = j + + # 2. Fill the DP table + for i in range(1, m + 1): + for j in range(1, n + 1): + if a_ids[i - 1] == b_ids[j - 1]: + dp[i, j] = dp[i - 1, j - 1] + 1 + else: + val1 = dp[i - 1, j] + 1 + val2 = dp[i, j - 1] + 1 + if val1 < val2: + dp[i, j] = val1 + else: + dp[i, j] = val2 + + # 3. Backtrack to reconstruct the result + # dp[m, n] holds the total length of the shortest common supersequence. + res_len = dp[m, n] + + # Pre-allocate the result array. + # Filling a pre-allocated array is much faster than appending to a list. + res_ids = np.empty(res_len, dtype=np.int32) + k = res_len - 1 # Index for writing into res_ids + + i, j = m, n + while i > 0 and j > 0: + if a_ids[i - 1] == b_ids[j - 1]: + res_ids[k] = a_ids[i - 1] + i -= 1 + j -= 1 + elif dp[i - 1, j] < dp[i, j - 1]: + res_ids[k] = a_ids[i - 1] + i -= 1 + else: + res_ids[k] = b_ids[j - 1] + j -= 1 + k -= 1 + + while i > 0: + res_ids[k] = a_ids[i - 1] + i -= 1 + k -= 1 + + while j > 0: + res_ids[k] = b_ids[j - 1] + j -= 1 + k -= 1 + + return res_ids + + def shortest_common_supersequence(a, b): + # 1. Build a mapping table (String -> Int) + # Extract unique tokens from both lists + unique_tokens = list(set(a) | set(b)) + token_to_id = {token: i for i, token in enumerate(unique_tokens)} + id_to_token = {i: token for i, token in enumerate(unique_tokens)} + + # 2. Convert input lists to NumPy integer arrays + a_ids = np.array([token_to_id[x] for x in a], dtype=np.int32) + b_ids = np.array([token_to_id[x] for x in b], dtype=np.int32) + + # 3. Call the JIT-compiled core function + # The first time this runs, it will compile (takes ~200ms). Subsequent runs are instant. + res_ids = _core_scs(a_ids, b_ids) + + # 4. Convert the result back to strings (Int -> String) + return [id_to_token[idx] for idx in res_ids] diff --git a/examples/layer_wise_benchmarks/run.py b/examples/layer_wise_benchmarks/run.py index a560cff958..5a82610ffb 100644 --- a/examples/layer_wise_benchmarks/run.py +++ b/examples/layer_wise_benchmarks/run.py @@ -13,7 +13,9 @@ from tensorrt_llm._torch.autotuner import AutoTuner, autotune from tensorrt_llm._torch.modules.multi_stream_utils import with_multi_stream from tensorrt_llm._utils import local_mpi_rank, mpi_rank, mpi_world_size from tensorrt_llm.logger import logger -from tensorrt_llm.tools.layer_wise_benchmarks import BalanceMethod, Runner, mark_ranges +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator +from tensorrt_llm.tools.layer_wise_benchmarks.mark_utils import mark_ranges +from tensorrt_llm.tools.layer_wise_benchmarks.runner import BalanceMethod, Runner def comma_separated_ints(s): @@ -79,6 +81,15 @@ parser.add_argument("--seq-len-q", type=comma_separated_ints, dest="seq_len_q_li parser.add_argument("--seq-len-kv-cache", type=comma_separated_ints, dest="seq_len_kv_cache_list") parser.add_argument("--balance-method", type=str) parser.add_argument("--balance-ratio", type=comma_separated_floats, dest="balance_ratio_list") +# Calibration +parser.add_argument("--replay-file-path", type=str) +parser.add_argument("--replay-start-iter", type=int) +parser.add_argument("--replay-stop-iter", type=int) +group = parser.add_mutually_exclusive_group() +group.add_argument("--replay-verify-metadata", action="store_true", dest="replay_verify_metadata") +group.add_argument( + "--no-replay-verify-metadata", action="store_false", dest="replay_verify_metadata" +) # Schedule parser.add_argument("--warmup-times", type=int, default=20) parser.add_argument("--run-times", type=int, default=100) @@ -131,6 +142,10 @@ if args.enable_autotuner is None: args.enable_autotuner = True if args.use_cuda_graph is None: args.use_cuda_graph = False +if (args.replay_start_iter is None) != (args.replay_stop_iter is None): + parser.error("Both --replay-start-iter and --replay-stop-iter must be provided or none") +if args.replay_verify_metadata is None: + args.replay_verify_metadata = True print(args) # MPI args @@ -179,6 +194,27 @@ runner = Runner( ) logger.info("Layer-wise benchmarks: Create runner ... Done") +calibrator = get_calibrator() +if args.replay_file_path: + calibrator.init( + "REPLAY", + args.replay_file_path, + args.layer_indices, + replay_verify_metadata=args.replay_verify_metadata, + mapping=mapping, + ) + if args.replay_start_iter is None: + replay_start_iter, replay_stop_iter = calibrator.get_replay_iteration_range() + else: + replay_start_iter, replay_stop_iter = args.replay_start_iter, args.replay_stop_iter + logger.info( + f"Layer-wise benchmarks: Replay iteration range [{replay_start_iter}, {replay_stop_iter}]" + ) +else: + calibrator.init("NONE", None, None) + replay_start_iter, replay_stop_iter = 1, 1 # To avoid None in mathematics +calibrator.maybe_wrap_model(runner.model) + # Autotune run_pack = runner.create_run_pack( args.run_type, @@ -320,14 +356,19 @@ for batch_size, seq_len_q, seq_len_kv_cache, balance_ratio in itertools.product( balance_ratio_str = "" if balance_ratio is None else f" balance={balance_ratio:.2g}" nvtx_message = f"b={batch_size} s={seq_len_q} past={seq_len_kv_cache}{balance_ratio_str} NP{world_size}" + calibrator.start() for i in range(args.warmup_times + args.run_times): events[i].record() + replay_iter = replay_start_iter + i % (replay_stop_iter - replay_start_iter + 1) + calibrator.pre_step(replay_iter) with nvtx.annotate(nvtx_message): if args.use_cuda_graph: g.replay() else: run_pack() + calibrator.post_step(replay_iter) events[-1].record() + calibrator.stop() torch.cuda.synchronize() # Print statistics diff --git a/examples/layer_wise_benchmarks/sample_performance_alignment.sh b/examples/layer_wise_benchmarks/sample_performance_alignment.sh new file mode 100755 index 0000000000..b39267c60a --- /dev/null +++ b/examples/layer_wise_benchmarks/sample_performance_alignment.sh @@ -0,0 +1,144 @@ +#!/bin/bash + +set -euo pipefail + +# Common settings and preparation + +MODEL="${MODEL:-$LLM_MODELS_ROOT/DeepSeek-R1/DeepSeek-R1-0528-FP4-v2}" +NP=${NP:-4} +BATCH_SIZE=32 + +export PROFILE_DIR="${PROFILE_DIR:-profiles}" +export TLLM_AUTOTUNER_CACHE_PATH="$PROFILE_DIR/sample_performance_alignment_cache.json" + +mkdir -p -- "$PROFILE_DIR" +mkdir -p -- "$(dirname "$TLLM_AUTOTUNER_CACHE_PATH")" + +python3 ../../benchmarks/cpp/prepare_dataset.py \ + --tokenizer "$MODEL" \ + --stdout \ + --random-seed 42 \ + token-norm-dist \ + --num-requests $((BATCH_SIZE*NP)) \ + --input-mean 2048 \ + --input-stdev 0 \ + --output-mean 256 \ + --output-stdev 0 \ + >/tmp/dataset.jsonl + +# Step 1 + +rm -f -- "$TLLM_AUTOTUNER_CACHE_PATH" + +cat </tmp/config_collect.yaml +enable_attention_dp: true +layer_wise_benchmarks_config: + calibration_mode: COLLECT + calibration_file_path: "$PROFILE_DIR/calibration_data.json" +moe_config: + backend: CUTLASS +print_iter_log: true +EOF + +TLLM_PROFILE_START_STOP=$((BATCH_SIZE + 10))-$((BATCH_SIZE + 35)) \ +NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \ +nsys profile \ + -t cuda,nvtx \ + --cpuctxsw none --cuda-event-trace false \ + --cuda-graph-trace node \ + -c cudaProfilerApi --capture-range-end stop \ + -o "$PROFILE_DIR/report_e2e_collect_rank%q{RANK}.nsys-rep" \ + --force-overwrite true \ +trtllm-llmapi-launch \ +trtllm-bench \ + --model deepseek-ai/DeepSeek-V3 \ + --model_path "$MODEL" \ + throughput \ + --tp $NP \ + --ep $NP \ + --warmup 0 \ + --dataset /tmp/dataset.jsonl \ + --max_batch_size $BATCH_SIZE \ + --max_num_tokens 3072 \ + --disable_chunked_context \ + --num_requests $((BATCH_SIZE*NP)) \ + --concurrency $((BATCH_SIZE*NP)) \ + --config /tmp/config_collect.yaml + +# Step 2 + +cat </tmp/config_mark.yaml +cuda_graph_config: null +enable_attention_dp: true +layer_wise_benchmarks_config: + calibration_mode: MARK +moe_config: + backend: CUTLASS +print_iter_log: true +EOF + +TLLM_PROFILE_START_STOP=$((BATCH_SIZE + 10))-$((BATCH_SIZE + 35)) \ +NP=$NP ./mpi_launch.sh middleware/mpi_env_from_ompi \ +nsys profile \ + -t cuda,nvtx \ + --cpuctxsw none --cuda-event-trace false \ + --cuda-graph-trace node \ + -c cudaProfilerApi --capture-range-end stop \ + -o "$PROFILE_DIR/report_e2e_mark_rank%q{RANK}.nsys-rep" \ + --force-overwrite true \ +trtllm-llmapi-launch \ +trtllm-bench \ + --model deepseek-ai/DeepSeek-V3 \ + --model_path "$MODEL" \ + throughput \ + --tp $NP \ + --ep $NP \ + --warmup 0 \ + --dataset /tmp/dataset.jsonl \ + --max_batch_size $BATCH_SIZE \ + --max_num_tokens 3072 \ + --disable_chunked_context \ + --num_requests $((BATCH_SIZE*NP)) \ + --concurrency $((BATCH_SIZE*NP)) \ + --config /tmp/config_mark.yaml + +# Step 3 + +NP=$NP ./mpi_launch.sh ./run.sh config_gen.yaml \ + --model "$MODEL" \ + --load-format AUTO \ + --layer-indices 5,6,7 \ + --batch-size $BATCH_SIZE \ + --seq-len-q 1 \ + --seq-len-kv-cache $((2049 + (BATCH_SIZE / 2 + 25) * 1)) \ + --balance-method NotModified \ + --replay-file-path "$PROFILE_DIR/calibration_data.json" \ + --replay-start $((BATCH_SIZE + 10 + 5)) \ + --replay-stop $((BATCH_SIZE + 35)) + +# Step 4 + +seq 0 $((NP - 1)) | xargs -I% python3 parse_e2e.py \ + --eager-trace "$PROFILE_DIR/report_e2e_mark_rank%.nsys-rep" \ + --graph-trace "$PROFILE_DIR/report_e2e_collect_rank%.nsys-rep" \ + --layer-indices 5,6,7 \ + --warmup-times 5 \ + -o "$PROFILE_DIR/report_e2e_collect_rank%.json" +seq 0 $((NP - 1)) | xargs -I% python3 parse.py \ + --profile-dir "$PROFILE_DIR" \ + --world-size $NP \ + --rank % + +# Step 5 + +targets=() +for i in $(seq 1 $((NP - 1))); do + targets+=(--target "$PROFILE_DIR/report_e2e_collect_rank$i.json") +done +for i in $(seq 0 $((NP - 1))); do + targets+=(--target "$PROFILE_DIR/report_np${NP}_rank$i.json") +done +python3 correlation.py \ + --reference "$PROFILE_DIR/report_e2e_collect_rank0.json" \ + "${targets[@]}" \ + -o "$PROFILE_DIR/correlation.html" diff --git a/examples/layer_wise_benchmarks/slurm_alloc.sh b/examples/layer_wise_benchmarks/slurm_alloc.sh index cb25f57fc3..2350fa1dae 100755 --- a/examples/layer_wise_benchmarks/slurm_alloc.sh +++ b/examples/layer_wise_benchmarks/slurm_alloc.sh @@ -12,6 +12,7 @@ salloc -A "$ACCOUNT" \ -p "$PARTITION" \ -N "$NODES" \ --segment "$NODES" \ + -J "$ACCOUNT-tensorrt_llm.layer_wise_benchmarks" \ $EXTRA_ARGS \ -t "$TIME" \ --no-shell \ diff --git a/examples/layer_wise_benchmarks/slurm_init_containers.sh b/examples/layer_wise_benchmarks/slurm_init_containers.sh index 7e97505bc1..83215561c7 100755 --- a/examples/layer_wise_benchmarks/slurm_init_containers.sh +++ b/examples/layer_wise_benchmarks/slurm_init_containers.sh @@ -16,7 +16,7 @@ NODES=$(squeue -j $SLURM_JOB_ID -h -o "%D") if [ -z "${CONTAINER_IMAGE:-}" ]; then # Read Docker image from current_image_tags.properties - MACHINE="$(uname -m)" + MACHINE="$(srun -N 1 uname -m)" if [ "$MACHINE" == "x86_64" ]; then DOCKER_IMAGE=$(source "$TRTLLM_ROOT/jenkins/current_image_tags.properties" && echo $LLM_DOCKER_IMAGE) elif [ "$MACHINE" == "aarch64" ]; then diff --git a/tensorrt_llm/_torch/modules/fused_moe/configurable_moe.py b/tensorrt_llm/_torch/modules/fused_moe/configurable_moe.py index f1de1b752b..31eb7b5ac5 100644 --- a/tensorrt_llm/_torch/modules/fused_moe/configurable_moe.py +++ b/tensorrt_llm/_torch/modules/fused_moe/configurable_moe.py @@ -39,6 +39,7 @@ from tensorrt_llm._torch.modules.fused_moe.routing import BaseMoeRoutingMethod from tensorrt_llm._torch.utils import AuxStreamType, EventType, Fp4QuantizedTensor from tensorrt_llm.logger import logger from tensorrt_llm.models.modeling_utils import QuantConfig +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator from .communication import ( AllGatherReduceScatter, @@ -609,6 +610,9 @@ class ConfigurableMoE(MoE): if token_selected_slots is not None: ExpertStatistic.set_layer(self.layer_idx) ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots) + token_selected_slots = get_calibrator().maybe_collect_or_replay_slots( + self.num_slots, token_selected_slots + ) # ========== Step 3.5: Communication Prepare Phase (BEFORE quantization) ========== # NVLINK two-sided has a prepare phase to gather EPLB statistics diff --git a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_cutlass.py b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_cutlass.py index 446cbec3a0..64bbeb7481 100755 --- a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_cutlass.py +++ b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_cutlass.py @@ -8,6 +8,7 @@ import torch from tensorrt_llm._mnnvl_utils import MnnvlMemory, MnnvlMoe from tensorrt_llm._torch.distributed.moe_alltoall import MoeAlltoAll from tensorrt_llm.logger import logger +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator from ...distributed import allgather from ...expert_statistic import ExpertStatistic @@ -539,6 +540,8 @@ class CutlassFusedMoE(MoE): # If load balancer is enabled, the statistics are collected from expert slot IDs. ExpertStatistic.set_layer(self.layer_idx) ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots) + token_selected_slots = get_calibrator().maybe_collect_or_replay_slots( + self.num_slots, token_selected_slots) if self.apply_router_weight_on_input: assert x.dtype != torch.float8_e4m3fn, "Current workaround for apply_router_weight_on_input does not support fp8 input" diff --git a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py index cdcee92236..f5345b61b5 100755 --- a/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py +++ b/tensorrt_llm/_torch/modules/fused_moe/fused_moe_wide_ep.py @@ -8,6 +8,7 @@ from tensorrt_llm._mnnvl_utils import MnnvlMemory, MnnvlMoe, MoEAlltoallInfo from tensorrt_llm._utils import is_sm_100f, local_mpi_size from tensorrt_llm.logger import logger from tensorrt_llm.mapping import Mapping +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator from ...distributed import allgather, reducescatter from ...expert_statistic import ExpertStatistic @@ -439,6 +440,8 @@ class WideEPMoE(MoE): # If load balancer is enabled, the statistics are collected from expert slot IDs. ExpertStatistic.set_layer(self.layer_idx) ExpertStatistic.maybe_add_info(self.num_slots, token_selected_slots) + token_selected_slots = get_calibrator().maybe_collect_or_replay_slots( + self.num_slots, token_selected_slots) use_allgather = not use_all_to_all diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor.py b/tensorrt_llm/_torch/pyexecutor/py_executor.py index 59ce223d0d..0f5b53bb20 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor.py @@ -35,6 +35,7 @@ from tensorrt_llm.llmapi.llm_args import PeftCacheConfig from tensorrt_llm.logger import logger from tensorrt_llm.mapping import CpType from tensorrt_llm.runtime.generation import CUASSERT +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator from ..distributed import Distributed from ..models.modeling_utils import DecoderModelForCausalLM @@ -736,8 +737,11 @@ class PyExecutor: record_shapes=True, with_modules=True) + calibrator = get_calibrator() + def profile_step(): nonlocal it, enabled, start_time, start_event_1, end_event_1, start_event_2, end_event_2, prev_device_step_time + calibrator.post_step(it) if it in self.profile_stop_iters and not self.is_warmup: assert enabled, "Inconsistent CUDA profiling state" if enable_torch_trace: @@ -746,6 +750,7 @@ class PyExecutor: logger.info(f"Profiling stopped at iteration {it}, " f"trace saved to {torch_trace_path}") torch.cuda.cudart().cudaProfilerStop() + calibrator.stop() enabled = False if start_time is not None and self.print_log and self.dist.rank == 0: @@ -786,11 +791,13 @@ class PyExecutor: if it in self.profile_start_iters and not self.is_warmup: assert not enabled, "Inconsistent CUDA profiling state" + calibrator.start() torch.cuda.cudart().cudaProfilerStart() if enable_torch_trace: torch_profiler.start() logger.info(f"Profiling started at iteration {it}.") enabled = True + calibrator.pre_step(it) start_time = time.time() if it % 2 == 0: if start_event_1 is None: @@ -812,6 +819,7 @@ class PyExecutor: logger.info(f"Profiling stopped at iteration {it}, " f"trace saved to {torch_trace_path}") torch.cuda.cudart().cudaProfilerStop() + calibrator.stop() def _get_init_iter_stats(self, num_new_active_requests, new_active_requests_queue_latency_ms): diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py index 50feb71943..43fd9c6898 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor_creator.py @@ -24,6 +24,7 @@ from tensorrt_llm.llmapi.tokenizer import (TokenizerBase, from tensorrt_llm.logger import logger from tensorrt_llm.mapping import Mapping from tensorrt_llm.quantization import QuantAlgo +from tensorrt_llm.tools.layer_wise_benchmarks import get_calibrator from ..attention_backend.interface import AttentionRuntimeFeatures from ..attention_backend.trtllm import TrtllmAttention @@ -354,6 +355,15 @@ def create_py_executor( validate_feature_combination(llm_args, model_engine, llm_args.sampler_type) + calibrator = get_calibrator() + layer_wise_benchmarks_config = llm_args.layer_wise_benchmarks_config + calibrator.init(layer_wise_benchmarks_config.calibration_mode, + layer_wise_benchmarks_config.calibration_file_path, + layer_wise_benchmarks_config.calibration_layer_indices, + mapping=mapping, + dist=dist) + model_engine.model = calibrator.maybe_wrap_model(model_engine.model) + if has_draft_model_engine: with allocation_scope(ExecutorMemoryType.MODEL_ENGINE_DRAFT, RestoreMode.PINNED): diff --git a/tensorrt_llm/llmapi/llm_args.py b/tensorrt_llm/llmapi/llm_args.py index 48940d10e1..19d345d0d6 100644 --- a/tensorrt_llm/llmapi/llm_args.py +++ b/tensorrt_llm/llmapi/llm_args.py @@ -840,6 +840,37 @@ class KvCacheConnectorConfig(StrictBaseModel): ..., description="The class name of the worker within the module.") +class LayerwiseBenchmarksConfig(StrictBaseModel): + """ + Configuration for layer-wise benchmarks calibration. + """ + calibration_mode: Literal["NONE", "MARK", "COLLECT"] = Field( + default="NONE", + description= + "Instruct the layer-wise benchmarks calibrator to work on MARK mode, or COLLECT mode", + status="prototype") + + calibration_file_path: Optional[str] = Field( + default=None, + description= + "The file path which the layer-wise benchmarks calibrator saves to or loads from", + status="prototype") + + calibration_layer_indices: Optional[List[int]] = Field( + default=None, + description= + "Layer indices to filter. If None, all layers are collected in COLLECT mode.", + status="prototype") + + @model_validator(mode='after') + def validate_calibration_file_path(self) -> 'LayerwiseBenchmarksConfig': + if self.calibration_mode == "COLLECT" and not self.calibration_file_path: + raise ValueError( + f"Expect calibration_file_path not to be empty when work on {self.calibration_mode} mode" + ) + return self + + class MedusaDecodingConfig(DecodingBaseConfig): medusa_choices: Optional[List[List[int]]] = None num_medusa_heads: Optional[int] = None @@ -2935,6 +2966,7 @@ class TorchLlmArgs(BaseLlmArgs): 'NCCL_SYMMETRIC']] = Field(default='AUTO', description="Allreduce strategy to use.", status="beta") + checkpoint_loader: Optional[object] = Field( default=None, description= @@ -3011,6 +3043,11 @@ class TorchLlmArgs(BaseLlmArgs): status="prototype", ) + layer_wise_benchmarks_config: LayerwiseBenchmarksConfig = Field( + default_factory=LayerwiseBenchmarksConfig, + description="Configuration for layer-wise benchmarks calibration.", + status="prototype") + @property def quant_config(self) -> QuantConfig: if self._quant_config is None: diff --git a/tensorrt_llm/tools/layer_wise_benchmarks/__init__.py b/tensorrt_llm/tools/layer_wise_benchmarks/__init__.py index f68e088422..50e8f1e83d 100644 --- a/tensorrt_llm/tools/layer_wise_benchmarks/__init__.py +++ b/tensorrt_llm/tools/layer_wise_benchmarks/__init__.py @@ -1,4 +1,3 @@ -from .mark_utils import mark_ranges -from .runner import BalanceMethod, Runner +from .calibrator import get_calibrator -__all__ = ["BalanceMethod", "Runner", "mark_ranges"] +__all__ = ["get_calibrator"] diff --git a/tensorrt_llm/tools/layer_wise_benchmarks/calibrator.py b/tensorrt_llm/tools/layer_wise_benchmarks/calibrator.py new file mode 100644 index 0000000000..187336cec0 --- /dev/null +++ b/tensorrt_llm/tools/layer_wise_benchmarks/calibrator.py @@ -0,0 +1,716 @@ +import base64 +import functools +import json +import zlib +from enum import Enum +from typing import Optional + +import nvtx +import torch + +from tensorrt_llm.logger import logger + + +class Mode(Enum): + NONE = 1 + MARK = 2 + COLLECT = 3 + REPLAY = 4 + + +class Calibrator: + """Calibrator for layer-wise benchmarks with MoE expert routing data. + + The calibrator operates in one of the following modes: + NONE: Disabled, no-op. + MARK: Add NVTX markers for correlating E2E and layer-wise benchmarks. + COLLECT: Collect `token_selected_slots` data and save to a file. + REPLAY: Load `token_selected_slots` data from a file for replay. + + Lifecycle: + init() -> maybe_wrap_model() -> [CUDA Graph capture] -> start() -> + [pre_step() -> forward() -> post_step()] x N -> stop() + + Design Notes: + To ensure CUDA Graphs compatibility, `token_selected_slots` tensors are + copied to a fixed GPU buffer during graph capture/replay. The actual data + is then transferred to per-iteration storage in `post_step()`. + + Since `model.forward()` does not execute during CUDA Graphs replay, we + cannot access Python metadata objects directly. Instead, we copy an integer + index (`metadata_idx`) pointing to a pre-built metadata list. + + For REPLAY mode, metadata verification is deferred to `stop()` to avoid + GPU synchronization during profiling. Metadata indices are recorded on GPU + in `post_step()` and compared after `cuda.synchronize()` in `stop()`. + + Tensor Locations (COLLECT mode): + - `_metadata_idx_gpu`: GPU scalar, current iteration's index into `_metadata_list` + - `_metadata_idx_range_gpu`: GPU [0..MAX_NUM_METADATA), pre-generated indices for GPU copy + - `_slots_buffer_gpu`: GPU, fixed buffer for CUDA Graphs compatibility + - `_collected_metadata_idx`: GPU [MAX_COLLECT_ITERATIONS], per-iteration metadata indices + - `_collected_slots_cpu`: CPU (pinned), per-iteration slots data + - `_eager_slots_gpu`: GPU, list of tensors for non-CG (context) phase + + Tensor Locations (REPLAY mode): + - `_actual_metadata_idx_gpu`: GPU scalar, current iteration's actual metadata index + - `_metadata_idx_range_gpu`: GPU [0..MAX_NUM_METADATA), pre-generated indices + - `_slots_buffer_gpu`: GPU, fixed buffer for CUDA Graphs compatibility + - `_collected_actual_metadata_idx`: GPU [MAX_REPLAY_ITERATIONS], actual metadata indices + - `_collected_iterations`: Python list, iterations for expected metadata lookup + - `_replay_eager_slots_gpu`: GPU, eager slots data for large context phase + """ + + # Maximum number of int32 elements per iteration for CUDA Graphs buffer + MAX_SLOTS_BUFFER_SIZE = 4 * 1024 * 1024 + # Maximum number of metadata supported + MAX_NUM_METADATA = 1024 * 1024 + # Maximum iterations during collect phase + MAX_COLLECT_ITERATIONS = 101 + # Maximum iterations during replay phase + MAX_REPLAY_ITERATIONS = 1024 * 1024 + # Data type for token_selected_slots + SLOTS_DTYPE = torch.int32 + + def __init__(self): + self.mode = Mode.NONE + self._started = False + + def init( + self, + mode: str, + file_path: str, + layer_indices: list[int], + *, + replay_verify_metadata: Optional[bool] = None, + mapping=None, + dist=None, + ): + """Initialize the calibrator. + + Args: + mode: One of "NONE", "MARK", "COLLECT", "REPLAY". + file_path: Path to the calibration data file. + layer_indices: Optional list of layer indices to filter. + COLLECT mode: If None, all layers are collected. + REPLAY mode: Cannot be None. + replay_verify_metadata: Whether to verify actual metadata in REPLAY mode matches calibration data. + mapping: Tensor parallel mapping containing rank and world_size. + dist: Distributed communication wrapper. + """ + if self.mode != Mode.NONE: + raise ValueError("double init") + + self.mode = Mode[mode] + + if self.mode == Mode.COLLECT: + self._init_collect_mode(file_path, layer_indices, dist) + + if self.mode == Mode.REPLAY: + if replay_verify_metadata is None: + raise ValueError("missing replay_verify_metadata") + self._init_replay_mode(file_path, layer_indices, replay_verify_metadata, mapping) + + def _init_collect_mode(self, file_path, layer_indices, dist): + """Initialize buffers for COLLECT mode.""" + self._file_path = file_path + self._layer_indices = layer_indices + self._dist = dist + + # Metadata list that `_metadata_idx_gpu` indexes into + self._metadata_list = [] + + # GPU buffers for CUDA Graphs compatibility: + # - Copy from `_metadata_idx_range_gpu[idx]` to `_metadata_idx_gpu` + # - Copy flattened slots to `_slots_buffer_gpu` + self._metadata_idx_gpu = torch.empty((), dtype=torch.long, device="cuda") + self._metadata_idx_range_gpu = torch.arange( + self.MAX_NUM_METADATA, dtype=torch.long, device="cuda" + ) + self._slots_buffer_gpu = torch.empty( + self.MAX_SLOTS_BUFFER_SIZE, + dtype=self.SLOTS_DTYPE, + device="cuda", + ) + + def _init_replay_mode(self, file_path, layer_indices, replay_verify_metadata, mapping): + """Initialize replay database from file.""" + with open(file_path) as f: + data = json.load(f) + + if data["world_size"] != mapping.world_size: + raise ValueError( + f"World size mismatch: file has {data['world_size']}, " + f"but current world_size is {mapping.world_size}" + ) + + # Verify all ranks have the same iterations (using data from file, not dist) + all_ranks_iterations = [ + [record["iteration"] for record in rank_records] + for rank_records in data["all_ranks_records"] + ] + if any(iters != all_ranks_iterations[0] for iters in all_ranks_iterations): + raise ValueError("Iterations mismatch across ranks in calibration file") + + start_layer_idx = layer_indices[0] + end_layer_idx = layer_indices[-1] + if list(layer_indices) != list(range(start_layer_idx, end_layer_idx + 1)): + raise ValueError("Invalid layer_indices") + + self._replay_db = {} + for record in data["all_ranks_records"][mapping.rank]: + iteration = record["iteration"] + metadata = record["metadata"] + raw_data = torch.frombuffer( + zlib.decompress(base64.b64decode(record["raw_data"])), + dtype=self.SLOTS_DTYPE, + ) + + # Filter metadata for target layers + start_idx = 0 + while metadata[start_idx]["layer_idx"] < start_layer_idx: + start_idx += 1 + end_idx = start_idx + while end_idx < len(metadata) and metadata[end_idx]["layer_idx"] <= end_layer_idx: + end_idx += 1 + + # Calculate data offsets for filtered range + offset = sum( + torch.Size(m["token_selected_slots_shape"]).numel() for m in metadata[:start_idx] + ) + length = sum( + torch.Size(m["token_selected_slots_shape"]).numel() + for m in metadata[start_idx:end_idx] + ) + filtered_slots = raw_data[offset : offset + length] + + self._replay_db[iteration] = { + "metadata": metadata[start_idx:end_idx], + "slots_data_gpu": filtered_slots.to("cuda"), + } + + # Placeholder buffer for CUDA Graphs (actual data copied in pre_step) + self.MAX_TOPK_AND_MIN_SLOTS = 32 + self._slots_buffer_gpu = ( + torch.arange( + self.MAX_SLOTS_BUFFER_SIZE, + dtype=self.SLOTS_DTYPE, + device="cuda", + ) + % self.MAX_TOPK_AND_MIN_SLOTS + ) + self._use_eager_mode = False + + # GPU buffers for deferred metadata verification (must be created before CUDA Graph capture) + # These are used during forward() which may be captured into CUDA Graphs + self._replay_verify_metadata = replay_verify_metadata + if self._replay_verify_metadata: + self._actual_metadata_list = [] + self._actual_metadata_idx_gpu = torch.empty((), dtype=torch.long, device="cuda") + self._metadata_idx_range_gpu = torch.arange( + self.MAX_NUM_METADATA, dtype=torch.long, device="cuda" + ) + + def maybe_wrap_model(self, model): + """Wrap model and layer forward methods for data collection/replay. + + Args: + model: The model to wrap. + + Returns: + The wrapped model. + """ + if self.mode == Mode.COLLECT: + self._wrap_model_forward_for_collect(model) + + if self.mode == Mode.REPLAY: + self._wrap_model_forward_for_replay(model) + + # Wrap layer forward methods for all active modes + if self.mode in [Mode.MARK, Mode.COLLECT, Mode.REPLAY]: + self._wrap_layer_forward(model) + + return model + + def _wrap_model_forward_for_collect(self, model): + """Wrap model.forward() for COLLECT mode.""" + + def make_forward(forward_orig): + @functools.wraps(forward_orig) + def forward(*args, **kwargs): + # Initialize per-iteration collection buffer + self._cur_iter_records = [] + + output = forward_orig(*args, **kwargs) + + # Build metadata from collected records + metadata = [ + { + "layer_idx": layer_idx, + "num_slots": num_slots, + "token_selected_slots_shape": list(slots.shape), + } + for layer_idx, num_slots, slots in self._cur_iter_records + ] + + # Store metadata and copy index to GPU (for CUDA Graphs compatibility) + metadata_idx = len(self._metadata_list) + self._metadata_list.append(metadata) + if metadata_idx >= len(self._metadata_idx_range_gpu): + raise ValueError( + f"Metadata index overflow: {metadata_idx} >= {len(self._metadata_idx_range_gpu)}. " + f"Increase MAX_NUM_METADATA if more iterations are needed." + ) + self._metadata_idx_gpu.copy_(self._metadata_idx_range_gpu[metadata_idx]) + + # Copy slots data to fixed buffer or use eager mode + total_size = sum(slots.numel() for _, _, slots in self._cur_iter_records) + + if total_size <= self.MAX_SLOTS_BUFFER_SIZE: + # Small data: copy to fixed GPU buffer (CUDA Graphs compatible) + if self._cur_iter_records: + torch.cat( + [slots.flatten() for _, _, slots in self._cur_iter_records], + out=self._slots_buffer_gpu[:total_size], + ) + else: + # Large data (context phase): use eager collection + # Context phase does not use CUDA Graphs, so this is safe + assert not torch.cuda.is_current_stream_capturing() + self._use_eager_mode = True + self._eager_slots_gpu = [ + slots.flatten() for _, _, slots in self._cur_iter_records + ] + + del self._cur_iter_records + return output + + return forward + + model.forward = make_forward(model.forward) + + def _wrap_model_forward_for_replay(self, model): + """Wrap model.forward() for REPLAY mode.""" + + def make_forward(forward_orig): + @functools.wraps(forward_orig) + def forward(*args, **kwargs): + if torch.cuda.is_current_stream_capturing(): + assert not self._use_eager_mode, ( + "Eager mode is not compatible with CUDA Graphs capturing" + ) + + self._replay_chunk_idx = 0 + self._replay_offset = 0 + + # Build metadata for verification during this forward pass + self._cur_iter_actual_metadata = [] + + output = forward_orig(*args, **kwargs) + + # Store metadata and copy index to GPU (for deferred verification) + if self._replay_verify_metadata: + metadata_idx = len(self._actual_metadata_list) + self._actual_metadata_list.append(self._cur_iter_actual_metadata) + if metadata_idx >= len(self._metadata_idx_range_gpu): + raise ValueError( + f"Metadata index overflow: {metadata_idx} >= {len(self._metadata_idx_range_gpu)}. " + f"Increase MAX_NUM_METADATA if more iterations are needed." + ) + with nvtx.annotate("layer_wise_benchmarks ignore"): + self._actual_metadata_idx_gpu.copy_( + self._metadata_idx_range_gpu[metadata_idx] + ) + + # Verify all replay data was consumed + if self._started and self._replay_chunk_idx != len(self._cur_iter_metadata): + raise ValueError( + f"Unused replay data: chunks [{self._replay_chunk_idx}:{len(self._cur_iter_metadata)}] " + f"were not consumed during forward pass" + ) + + if torch.cuda.is_current_stream_capturing(): + if self._replay_offset > len(self._slots_buffer_gpu): + raise ValueError( + f"Slots buffer overflow: required {self._replay_offset} elements, " + f"but buffer size is {len(self._slots_buffer_gpu)}" + ) + + return output + + return forward + + model.forward = make_forward(model.forward) + + def _wrap_layer_forward(self, model): + """Wrap layer forward methods to track layer index and add NVTX markers.""" + + def make_forward(layer_idx, forward_orig): + @functools.wraps(forward_orig) + def forward(*args, **kwargs): + self._current_layer_idx = layer_idx + + if self.mode in [Mode.MARK, Mode.COLLECT]: + with nvtx.annotate(f"layer_wise_benchmarks layer_idx {layer_idx}"): + output = forward_orig(*args, **kwargs) + else: + output = forward_orig(*args, **kwargs) + + del self._current_layer_idx + return output + + return forward + + for idx, layer in enumerate(model.model.layers): + layer.forward = make_forward(idx, layer.forward) + + def maybe_collect_or_replay_slots(self, num_slots, token_selected_slots): + """Collect or replay token_selected_slots data. + + Args: + num_slots: Number of slots. + token_selected_slots: Tensor of selected expert slots. + + Returns: + Original tensor in COLLECT mode, or replayed tensor in REPLAY mode. + """ + if self.mode == Mode.COLLECT: + return self._collect_slots(num_slots, token_selected_slots) + + if self.mode == Mode.REPLAY: + return self._replay_slots(num_slots, token_selected_slots) + + return token_selected_slots + + def _collect_slots(self, num_slots, token_selected_slots): + """Collect slots data for current iteration.""" + # Skip if model was not wrapped (no layer context available) + if not hasattr(self, "_current_layer_idx"): + return token_selected_slots + + if token_selected_slots.dtype != self.SLOTS_DTYPE: + raise ValueError( + f"Unexpected dtype for token_selected_slots: expected {self.SLOTS_DTYPE}, " + f"got {token_selected_slots.dtype}" + ) + + if self._layer_indices is None or self._current_layer_idx in self._layer_indices: + self._cur_iter_records.append( + (self._current_layer_idx, num_slots, token_selected_slots) + ) + + return token_selected_slots + + def _replay_slots(self, num_slots, token_selected_slots): + """Replay slots data from recorded buffer.""" + # Skip if model was not wrapped (no layer context available) + if not hasattr(self, "_current_layer_idx"): + return token_selected_slots + + if token_selected_slots.dtype != self.SLOTS_DTYPE: + raise ValueError( + f"Unexpected dtype for token_selected_slots: expected {self.SLOTS_DTYPE}, " + f"got {token_selected_slots.dtype}" + ) + + # Record actual metadata for deferred verification in stop() + self._cur_iter_actual_metadata.append( + { + "layer_idx": self._current_layer_idx, + "num_slots": num_slots, + "token_selected_slots_shape": list(token_selected_slots.shape), + } + ) + + # Immediate validation (does not depend on GPU data, safe during CUDA Graphs) + if self._started: + chunk_metadata = self._cur_iter_metadata[self._replay_chunk_idx] + expected_layer_idx = chunk_metadata["layer_idx"] + expected_num_slots = chunk_metadata["num_slots"] + expected_shape = chunk_metadata["token_selected_slots_shape"] + + if self._current_layer_idx != expected_layer_idx: + raise ValueError( + f"Layer index mismatch during replay: expected {expected_layer_idx}, " + f"got {self._current_layer_idx}" + ) + if num_slots != expected_num_slots: + raise ValueError( + f"num_slots mismatch during replay: expected {expected_num_slots}, " + f"got {num_slots}" + ) + if list(token_selected_slots.shape) != expected_shape: + raise ValueError( + f"Shape mismatch during replay: expected {expected_shape}, " + f"got {list(token_selected_slots.shape)}" + ) + else: + if ( + num_slots < self.MAX_TOPK_AND_MIN_SLOTS + or token_selected_slots.shape[-1] > self.MAX_TOPK_AND_MIN_SLOTS + ): + raise ValueError( + "Invalid initial replayed_slots, please adjust `MAX_TOPK_AND_MIN_SLOTS`" + ) + + if self._started or torch.cuda.is_current_stream_capturing(): + n = token_selected_slots.numel() + buffer = ( + self._replay_eager_slots_gpu if self._use_eager_mode else self._slots_buffer_gpu + ) + replayed_slots = buffer[self._replay_offset : self._replay_offset + n].view_as( + token_selected_slots + ) + + self._replay_chunk_idx += 1 + self._replay_offset += n + return replayed_slots + + return token_selected_slots + + def start(self): + """Start calibration. Call before the profiling loop.""" + assert not self._started + self._started = True + + if self.mode != Mode.NONE: + logger.info(f"Layer-wise benchmarks: Calibrator started in {self.mode.name} mode") + + if self.mode == Mode.COLLECT: + # Per-iteration storage buffers + # CUDA Graphs reuse fixed buffers, so we must copy data out after each step + self._collected_metadata_idx = torch.empty( + self.MAX_COLLECT_ITERATIONS, dtype=torch.long, device="cuda" + ) + self._collected_slots_cpu = torch.empty( + self.MAX_COLLECT_ITERATIONS, + self.MAX_SLOTS_BUFFER_SIZE, + dtype=torch.int32, + device="cpu", + pin_memory=True, + ) + self._collected_records = [] + + # Eager mode flag: True when data exceeds MAX_SLOTS_BUFFER_SIZE + # (context phase, not using CUDA Graphs) + self._use_eager_mode = False + + if self.mode == Mode.REPLAY and self._replay_verify_metadata: + # Per-iteration storage buffers for deferred metadata verification + # Note: _actual_metadata_list, _actual_metadata_idx_gpu, _metadata_idx_range_gpu + # are created in _init_replay_mode() before CUDA Graph capture + self._collected_actual_metadata_idx = torch.empty( + self.MAX_REPLAY_ITERATIONS, dtype=torch.long, device="cuda" + ) + # post_step() runs outside CUDA Graphs, so we can use Python list for iterations + self._collected_iterations = [] + + def pre_step(self, iteration: int): + """Prepare for an iteration. Call before model.forward(). + + Args: + iteration: Current iteration number. + """ + if self.mode == Mode.REPLAY and self._started: + self._cur_iter_metadata = self._replay_db[iteration]["metadata"] + slots_gpu = self._replay_db[iteration]["slots_data_gpu"] + + expected_size = sum( + torch.Size(m["token_selected_slots_shape"]).numel() for m in self._cur_iter_metadata + ) + if len(slots_gpu) != expected_size: + raise ValueError( + f"Slots data size mismatch for iteration {iteration}: " + f"expected {expected_size}, got {len(slots_gpu)}" + ) + + if expected_size <= self.MAX_SLOTS_BUFFER_SIZE: + self._use_eager_mode = False + self._slots_buffer_gpu[:expected_size].copy_(slots_gpu, non_blocking=True) + else: + self._use_eager_mode = True + self._replay_eager_slots_gpu = slots_gpu + + def post_step(self, iteration: int): + """Finalize an iteration. Call after model.forward(). + + Args: + iteration: Current iteration number. + """ + if self.mode == Mode.COLLECT and self._started: + record_idx = len(self._collected_records) + if record_idx >= self.MAX_COLLECT_ITERATIONS: + raise ValueError( + f"Exceeded MAX_COLLECT_ITERATIONS={self.MAX_COLLECT_ITERATIONS}. " + "Increase the limit or reduce the profiling iterations." + ) + self._collected_metadata_idx[record_idx].copy_(self._metadata_idx_gpu) + + if self._use_eager_mode: + # TODO: Avoid synchronization by using async copy + slots_cpu = torch.cat(self._eager_slots_gpu).to("cpu") + else: + slots_cpu = self._collected_slots_cpu[record_idx] + # TODO: Copy only required elements instead of entire buffer + slots_cpu.copy_(self._slots_buffer_gpu, non_blocking=True) + + self._collected_records.append( + { + "iteration": iteration, + "slots_cpu": slots_cpu, + } + ) + + # Reset eager mode for next step + self._use_eager_mode = False + + if self.mode == Mode.REPLAY and self._started and self._replay_verify_metadata: + # Record metadata index on GPU (no sync), verification deferred to stop() + record_idx = len(self._collected_iterations) + if record_idx >= self.MAX_REPLAY_ITERATIONS: + raise ValueError( + f"Exceeded MAX_REPLAY_ITERATIONS={self.MAX_REPLAY_ITERATIONS}. " + "Increase the limit or reduce the profiling iterations." + ) + self._collected_actual_metadata_idx[record_idx].copy_( + self._actual_metadata_idx_gpu, non_blocking=True + ) + # post_step() runs outside CUDA Graphs, so we can use Python list + self._collected_iterations.append(iteration) + + def stop(self): + """Stop calibration and save data. Call after the profiling loop.""" + assert self._started + + if self.mode == Mode.COLLECT: + self._save_collected_data() + + if self.mode == Mode.REPLAY and self._replay_verify_metadata: + self._verify_replay_metadata() + + self._started = False + + def _save_collected_data(self): + """Save collected calibration data to file.""" + torch.cuda.synchronize() + + metadata_idx_list = self._collected_metadata_idx.tolist() + output_records = [] + + for record_idx, record in enumerate(self._collected_records): + metadata = self._metadata_list[metadata_idx_list[record_idx]] + slots_size = sum(torch.Size(m["token_selected_slots_shape"]).numel() for m in metadata) + slots_data = record["slots_cpu"][:slots_size] + + output_records.append( + { + "iteration": record["iteration"], + "metadata": metadata, + "raw_data": base64.b64encode( + zlib.compress(slots_data.numpy().tobytes()) + ).decode(), + } + ) + + # Gather from all ranks and save on rank 0 + all_records = self._dist.allgather(output_records) + + if self._dist.rank == 0: + with open(self._file_path, "w") as f: + json.dump( + { + "world_size": self._dist.world_size, + "all_ranks_records": all_records, + }, + f, + ) + + if self.mode != Mode.NONE: + logger.info(f"Layer-wise benchmarks: Calibrator saved data to {self._file_path}") + + def _verify_replay_metadata(self): + """Verify that replayed metadata matches actual execution.""" + torch.cuda.synchronize() + + record_count = len(self._collected_iterations) + chunk_count = 0 + actual_idx_list = self._collected_actual_metadata_idx[:record_count].tolist() + + for record_idx, (actual_idx, iteration) in enumerate( + zip(actual_idx_list, self._collected_iterations) + ): + actual_metadata = self._actual_metadata_list[actual_idx] + expected_metadata = self._replay_db[iteration]["metadata"] + + if len(actual_metadata) != len(expected_metadata): + raise ValueError( + f"Metadata length mismatch at record {record_idx}: " + f"actual {len(actual_metadata)} chunks, expected {len(expected_metadata)} chunks" + ) + + for chunk_idx, (actual_chunk, expected_chunk) in enumerate( + zip(actual_metadata, expected_metadata) + ): + if actual_chunk["layer_idx"] != expected_chunk["layer_idx"]: + raise ValueError( + f"Layer index mismatch at record {record_idx}, chunk {chunk_idx}: " + f"actual layer_idx={actual_chunk['layer_idx']}, " + f"expected layer_idx={expected_chunk['layer_idx']}" + ) + if actual_chunk["num_slots"] != expected_chunk["num_slots"]: + raise ValueError( + f"num_slots mismatch at record {record_idx}, chunk {chunk_idx}: " + f"actual num_slots={actual_chunk['num_slots']}, " + f"expected num_slots={expected_chunk['num_slots']}" + ) + if ( + actual_chunk["token_selected_slots_shape"] + != expected_chunk["token_selected_slots_shape"] + ): + raise ValueError( + f"Shape mismatch at record {record_idx}, chunk {chunk_idx}: " + f"actual shape={actual_chunk['token_selected_slots_shape']}, " + f"expected shape={expected_chunk['token_selected_slots_shape']}" + ) + chunk_count += 1 + + logger.info( + f"Layer-wise benchmarks: Replay metadata verification passed for {record_count} iterations" + f" and {chunk_count} chunks" + ) + + def get_replay_iteration_range(self): + """Get the valid iteration range for REPLAY mode. + + Returns a tuple (start_iter, stop_iter) representing the range of iterations + that can be replayed. This method verifies that the iterations form a + contiguous range. Cross-rank iteration consistency is verified in init(). + + Returns: + tuple: (start_iter, stop_iter) where iterations are in [start_iter, stop_iter] + + Raises: + ValueError: If mode is not REPLAY or iterations are not a contiguous range. + """ + if self.mode != Mode.REPLAY: + raise ValueError( + f"get_replay_iteration_range() is only valid in REPLAY mode, " + f"current mode is {self.mode.name}" + ) + + # Verify iterations form a contiguous range + local_iterations = sorted(self._replay_db.keys()) + start_iter = local_iterations[0] + stop_iter = local_iterations[-1] + if local_iterations != list(range(start_iter, stop_iter + 1)): + raise ValueError("Iterations are not a contiguous range") + + return start_iter, stop_iter + + +_calibrator = Calibrator() + + +def get_calibrator(): + """Get the global calibrator instance.""" + return _calibrator diff --git a/tensorrt_llm/tools/layer_wise_benchmarks/runner.py b/tensorrt_llm/tools/layer_wise_benchmarks/runner.py index 637a75ef35..ce6b20c37f 100644 --- a/tensorrt_llm/tools/layer_wise_benchmarks/runner.py +++ b/tensorrt_llm/tools/layer_wise_benchmarks/runner.py @@ -441,8 +441,24 @@ class Runner: checkpoint_dir=pretrained_model_name_or_path, checkpoint_loader=checkpoint_loader ) - self.layers = [model.model.layers[i] for i in layer_indices] + def forward(position_ids, hidden_states, attn_metadata, residual, **kwargs): + # TODO: to be more general, we should call DecoderModel.forward + residual_fusion = hasattr(model.model.layers[layer_indices[0]], "next_layer_layernorm") + for layer_idx in layer_indices: + layer = model.model.layers[layer_idx] + if residual_fusion: + hidden_states, residual = layer( + position_ids, hidden_states, attn_metadata, residual, **kwargs + ) + else: + hidden_states = layer(position_ids, hidden_states, attn_metadata, **kwargs) + return hidden_states, residual + + model.forward = forward + self.model_config = model.model_config + self.model = model + self.layer_indices = layer_indices @staticmethod @contextlib.contextmanager @@ -643,27 +659,20 @@ class Runner: kwargs["mamba_metadata"] = mamba_metadata def run_pack(*, check=False): - output = hidden_states, residual with model_extra_attrs(self.model_config.extra_attrs): get_model_extra_attrs()["attention_metadata"] = weakref.ref(attn_metadata) with torch.inference_mode(): - # TODO: to be more general, we should call DecoderModel.forward - for layer in self.layers: - residual_fusion = hasattr(layer, "next_layer_layernorm") - if residual_fusion: - output = layer( - position_ids, output[0], attn_metadata, output[1], **kwargs - ) - else: - output = layer(position_ids, output[0], attn_metadata, **kwargs), None + hidden_states_out, residual_out = self.model( + position_ids, hidden_states, attn_metadata, residual, **kwargs + ) if check: - if output[0].isnan().any(): + if hidden_states_out.isnan().any(): raise ValueError("Has nan, please fix weights initialization") - if output[0].isinf().any(): + if hidden_states_out.isinf().any(): raise ValueError("Has inf, please fix weights initialization") - if (output[0] == 0).sum() > 0.5 * output[0].numel(): + if (hidden_states_out == 0).sum() > 0.5 * hidden_states_out.numel(): raise ValueError("Too many zeros, please fix weights initialization") - return output + return hidden_states_out, residual_out return run_pack @@ -690,10 +699,13 @@ class Runner: else 0 ) moe_modules = [] - for layer in self.layers: + for layer_idx in self.layer_indices: + layer = self.model.model.layers[layer_idx] if layer.__class__.__name__ == "NemotronHLayer": if layer.layer_type == "E": moe_modules.append(layer.mixer.experts) + elif layer.__class__.__name__ in ["GatedMLP"]: + pass else: moe_modules.append(layer.mlp.experts) diff --git a/tests/integration/test_lists/test-db/l0_b200.yml b/tests/integration/test_lists/test-db/l0_b200.yml index 55b3bdd982..9c95a4f221 100644 --- a/tests/integration/test_lists/test-db/l0_b200.yml +++ b/tests/integration/test_lists/test-db/l0_b200.yml @@ -87,6 +87,7 @@ l0_b200: - unittest/tools/test_layer_wise_benchmarks.py::test_deepseek_r1_ctx_dep[1] - unittest/tools/test_layer_wise_benchmarks.py::test_nemotron_gen_dep[1] - unittest/tools/test_layer_wise_benchmarks.py::test_qwen3_next_gen_tep[1] + - unittest/tools/test_layer_wise_benchmarks.py::test_performance_alignment[1] - unittest/_torch/modeling/test_modeling_exaone4.py::TestEXAONE4::test_llm_load_1_FP8 - unittest/kv_cache_manager_v2_tests/ - condition: diff --git a/tests/unittest/api_stability/references/llm.yaml b/tests/unittest/api_stability/references/llm.yaml index 861ffdfe7d..fab224c41c 100644 --- a/tests/unittest/api_stability/references/llm.yaml +++ b/tests/unittest/api_stability/references/llm.yaml @@ -231,6 +231,10 @@ methods: annotation: int default: 1000 status: prototype + layer_wise_benchmarks_config: + annotation: tensorrt_llm.llmapi.llm_args.LayerwiseBenchmarksConfig + default: null + status: prototype model_kwargs: annotation: Optional[Dict[str, Any]] default: null diff --git a/tests/unittest/tools/test_layer_wise_benchmarks.py b/tests/unittest/tools/test_layer_wise_benchmarks.py index 23310b6854..38dc18091d 100644 --- a/tests/unittest/tools/test_layer_wise_benchmarks.py +++ b/tests/unittest/tools/test_layer_wise_benchmarks.py @@ -179,3 +179,23 @@ def test_qwen3_next_gen_tep(llm_root, world_size): ["python3", "parse.py", "--profile-dir", profile_dir, f"--world-size={world_size}"], cwd=llm_root / "examples" / "layer_wise_benchmarks", ) + + +@pytest.mark.parametrize("world_size", [1, 4]) +def test_performance_alignment(llm_root, world_size): + if torch.cuda.device_count() < world_size: + pytest.skip(f"needs {world_size:d} GPUs to run this test") + model_root = llm_models_root(check=True) + profile_dir = f"profiles/test_performance_alignment_{world_size}" + check_call( + [ + "./sample_performance_alignment.sh", + ], + cwd=llm_root / "examples" / "layer_wise_benchmarks", + env={ + **os.environ, + "MODEL": model_root / "DeepSeek-V3-Lite" / "nvfp4_moe_only", + "NP": f"{world_size:d}", + "PROFILE_DIR": profile_dir, + }, + )