TensorRT-LLMs/examples/layer_wise_benchmarks/parse_e2e.py
Tailing Yuan 4345636b04
[None][chore] Clean up layer-wise benchmarks code (#11092)
Signed-off-by: Tailing Yuan <yuantailing@gmail.com>
2026-01-29 14:29:37 -05:00

298 lines
12 KiB
Python

import argparse
import bisect
import json
import re
import sqlite3
from pathlib import Path
from typing import NamedTuple
import numpy as np
import pandas as pd
from parser_utils import (
kernel_short_name,
lazy_convert_sqlite,
shortest_common_supersequence,
warned_names,
)
class IterInfo(NamedTuple):
"""Represents an iteration's timing information."""
start: int
end: int
iter_id: int
class LayerInfo(NamedTuple):
"""Represents a layer's timing information within an iteration."""
start: int
end: int
layer_idx: int
class KernelQueryResult(NamedTuple):
"""Represents a kernel query result for e2e parsing.
Sorted by runtime_start for consistent ordering.
"""
runtime_start: int
graph_node_id: int | None
kernel_start: int
kernel_end: int
demangled_name: int # String ID reference
def comma_separated_ints(s: str) -> list[int]:
return [int(x) for x in s.split(",")]
parser = argparse.ArgumentParser()
parser.add_argument("--eager-trace", type=str, required=True)
parser.add_argument("--graph-trace", type=str)
parser.add_argument("--target-ctx-reqs", type=int, default=0)
parser.add_argument("--target-gen-reqs", type=int)
parser.add_argument("--layer-indices", type=comma_separated_ints, required=True)
parser.add_argument("--warmup-times", type=int, default=5)
group = parser.add_mutually_exclusive_group()
group.add_argument("--error-on-unknown-kernel", action="store_true", dest="error_on_unknown_kernel")
group.add_argument(
"--no-error-on-unknown-kernel", action="store_false", dest="error_on_unknown_kernel"
)
parser.set_defaults(error_on_unknown_kernel=False)
parser.add_argument("--output", "-o", type=str)
args = parser.parse_args()
if not args.eager_trace.endswith(".nsys-rep"):
parser.error("Please provide a .nsys-rep file for the --eager-trace option.")
if args.graph_trace is not None and not args.graph_trace.endswith(".nsys-rep"):
parser.error("Please provide a .nsys-rep file for the --graph-trace option.")
print(args)
def is_gemm(name: str) -> bool:
return "nvjet" in name or "gemm" in name.lower()
eager_nsys_rep_file_path = Path(args.eager_trace)
# For CTX phase which does not use CUDA Graphs, analyze the eager trace instead.
# Here we do not change the identifier name "graph_*" for convenience.
graph_nsys_rep_file_path = Path(args.graph_trace or args.eager_trace)
eager_sqlite_file_path = eager_nsys_rep_file_path.parent / (
eager_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite"
)
graph_sqlite_file_path = graph_nsys_rep_file_path.parent / (
graph_nsys_rep_file_path.name[: -len(".nsys-rep")] + ".sqlite"
)
lazy_convert_sqlite(eager_nsys_rep_file_path, eager_sqlite_file_path)
lazy_convert_sqlite(graph_nsys_rep_file_path, graph_sqlite_file_path)
eager_conn = sqlite3.connect(f"file:{eager_sqlite_file_path}?mode=ro", uri=True)
graph_conn = sqlite3.connect(f"file:{graph_sqlite_file_path}?mode=ro", uri=True)
query = "SELECT * FROM ENUM_NSYS_EVENT_TYPE"
df = pd.read_sql_query(query, eager_conn)
eager_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist()
df = pd.read_sql_query(query, graph_conn)
graph_event_id_NvtxPushPopRange = df[df["name"] == "NvtxPushPopRange"].iloc[0]["id"].tolist()
query = """SELECT T1.start, T1.end, T2.value AS text
FROM NVTX_EVENTS AS T1
JOIN StringIds AS T2 ON T1.textId = T2.id
WHERE eventType = ?"""
df = pd.read_sql_query(query, eager_conn, params=(eager_event_id_NvtxPushPopRange,))
target_ctx_reqs = args.target_ctx_reqs
target_gen_reqs = args.target_gen_reqs
if target_gen_reqs is None:
if target_ctx_reqs == 0:
for _, _, text in df.itertuples(index=False):
if m := re.match(
r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text
):
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs:
target_gen_reqs = gen_reqs
break
else:
raise ValueError("Cannot determine target_gen_reqs")
else:
target_gen_reqs = 0
print(f"{target_ctx_reqs=} {target_gen_reqs=}")
eager_iters: list[IterInfo] = []
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text):
iter_id = int(m.group(1))
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs:
eager_iters.append(IterInfo(start, end, iter_id))
eager_iters = sorted(eager_iters)[args.warmup_times :]
iter_id_list = [it.iter_id for it in eager_iters]
print("Iters (eager)", *iter_id_list)
per_iter_eager_layers: list[list[LayerInfo]] = [[] for _ in iter_id_list]
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^layer_wise_benchmarks layer_idx (\d+)$", text):
layer_idx = int(m.group(1))
iter_idx = bisect.bisect(eager_iters, (start,)) - 1
if iter_idx < 0 or end > eager_iters[iter_idx].end:
continue
assert end <= eager_iters[iter_idx].end, "Not belong to any iter"
per_iter_eager_layers[iter_idx].append(LayerInfo(start, end, layer_idx))
layer_idx_list = [layer.layer_idx for layer in per_iter_eager_layers[0]]
print("Layers (eager)", *layer_idx_list)
for eager_layers in per_iter_eager_layers:
assert [layer.layer_idx for layer in eager_layers] == layer_idx_list, "inconsistent layer idx"
df = pd.read_sql_query(query, graph_conn, params=(graph_event_id_NvtxPushPopRange,))
graph_iters: list[IterInfo] = []
for start, end, text in df.itertuples(index=False):
if m := re.match(r"^\[Executor\] _forward_step (\d+): (\d+) ctx reqs, (\d+) gen reqs", text):
iter_id = int(m.group(1))
ctx_reqs = int(m.group(2))
gen_reqs = int(m.group(3))
if ctx_reqs == target_ctx_reqs and gen_reqs == target_gen_reqs:
graph_iters.append(IterInfo(start, end, iter_id))
graph_iters = sorted(graph_iters)[args.warmup_times :]
graph_iter_id_list = [it.iter_id for it in graph_iters]
print("Iters (graph)", *graph_iter_id_list)
if iter_id_list != graph_iter_id_list:
raise ValueError("The ID of iterations do not match")
def query_kernels(conn: sqlite3.Connection, iters: list[IterInfo]) -> list[list[KernelQueryResult]]:
query = """SELECT name FROM sqlite_master WHERE type = ?"""
df = pd.read_sql_query(query, conn, params=("table",))
tables = df["name"].tolist()
unified_subquery = """SELECT T1.start, T1.end, T1.demangledName, T1.correlationId, T1.graphNodeId
FROM CUPTI_ACTIVITY_KIND_KERNEL AS T1"""
if "CUPTI_ACTIVITY_KIND_MEMCPY" in tables:
unified_subquery += """ UNION ALL
SELECT T2.start, T2.end, -2 AS demangledName, T2.correlationId, T2.graphNodeId
FROM CUPTI_ACTIVITY_KIND_MEMCPY AS T2"""
if "CUPTI_ACTIVITY_KIND_MEMSET" in tables:
unified_subquery += """ UNION ALL
SELECT T3.start, T3.end, -3 AS demangledName, T3.correlationId, T3.graphNodeId
FROM CUPTI_ACTIVITY_KIND_MEMSET AS T3"""
query = f"""SELECT unified.start, unified.end, unified.graphNodeId, unified.demangledName,
R.start AS runtime_start, R.end AS runtime_end
FROM ({unified_subquery}) AS unified
JOIN CUPTI_ACTIVITY_KIND_RUNTIME AS R ON unified.correlationId = R.correlationId"""
df = pd.read_sql_query(query, conn)
per_iter_kernels: list[list[KernelQueryResult]] = [[] for _ in iters]
for (
kernel_start,
kernel_end,
graph_node_id,
demangled_name,
runtime_start,
runtime_end,
) in df.itertuples(index=False):
iter_idx = bisect.bisect(iters, (runtime_start,)) - 1
if iter_idx < 0 or runtime_end > iters[iter_idx].end:
continue
per_iter_kernels[iter_idx].append(
KernelQueryResult(
runtime_start, graph_node_id, kernel_start, kernel_end, demangled_name
)
)
for kernels in per_iter_kernels:
kernels.sort(key=lambda k: (k.runtime_start, k.graph_node_id))
return per_iter_kernels
eager_per_iter_kernels = query_kernels(eager_conn, eager_iters)
graph_per_iter_kernels = query_kernels(graph_conn, graph_iters)
print("#Kernels (eager)", *[len(kernels) for kernels in eager_per_iter_kernels])
print("#Kernels (graph)", *[len(kernels) for kernels in graph_per_iter_kernels])
for eager_kernels, graph_kernels in zip(eager_per_iter_kernels, graph_per_iter_kernels):
assert all(
kernel.demangled_name == eager_per_iter_kernels[0][i].demangled_name
for i, kernel in enumerate(eager_kernels)
), "eager kernels change across iterations"
assert all(
kernel.demangled_name == graph_per_iter_kernels[0][i].demangled_name
for i, kernel in enumerate(graph_kernels)
), "graph kernels change across iterations"
query = "SELECT * FROM StringIds"
df = pd.read_sql_query(query, eager_conn)
eager_string_ids = dict(zip(df["id"], df["value"]))
eager_string_ids.update({-2: "Memcpy", -3: "Memset"})
df = pd.read_sql_query(query, graph_conn)
graph_string_ids = dict(zip(df["id"], df["value"]))
graph_string_ids.update({-2: "Memcpy", -3: "Memset"})
eager_conn.close()
graph_conn.close()
eager_kernel_names = [
eager_string_ids[kernel.demangled_name] for kernel in eager_per_iter_kernels[0]
]
graph_kernel_names = [
graph_string_ids[kernel.demangled_name] for kernel in graph_per_iter_kernels[0]
]
super_kernel_names = shortest_common_supersequence(eager_kernel_names, graph_kernel_names)
print(f"#Kernels (supersequence) {len(super_kernel_names)}")
eager_per_layer_kernels: list[list[int]] = [[] for _ in layer_idx_list]
for i, eager_kernel in enumerate(eager_per_iter_kernels[0]):
eager_layer_idx = bisect.bisect(per_iter_eager_layers[0], (eager_kernel.runtime_start,)) - 1
if (
eager_layer_idx < 0
or eager_kernel.runtime_start > per_iter_eager_layers[0][eager_layer_idx].end
):
continue
eager_per_layer_kernels[eager_layer_idx].append(i)
eager2super: list[int] = []
j = 0
for eager_kernel_name in eager_kernel_names:
while eager_kernel_name != super_kernel_names[j]:
j += 1
eager2super.append(j)
j += 1
super_per_layer_starts = [eager2super[indices[0]] for indices in eager_per_layer_kernels]
super_per_layer_ends = [eager2super[indices[-1]] for indices in eager_per_layer_kernels]
graph_per_layer_kernels: list[list[int]] = [[] for _ in layer_idx_list]
j = 0
for i, graph_kernel_name in enumerate(graph_kernel_names):
while graph_kernel_name != super_kernel_names[j]:
j += 1
layer_idx = bisect.bisect(super_per_layer_starts, j) - 1
if layer_idx >= 0 and j <= super_per_layer_ends[layer_idx]:
graph_per_layer_kernels[layer_idx].append(i)
j += 1
timeline: list[dict] = []
first_kernel_idx = min(graph_per_layer_kernels[layer_idx][0] for layer_idx in args.layer_indices)
for layer_idx in args.layer_indices:
for kernel_idx in graph_per_layer_kernels[layer_idx]:
duration_list: list[int] = []
end_list: list[int] = []
for iter_idx in range(len(graph_per_iter_kernels)):
layer_start_time = graph_per_iter_kernels[iter_idx][first_kernel_idx].kernel_start
kernel_start_time = graph_per_iter_kernels[iter_idx][kernel_idx].kernel_start
kernel_end_time = graph_per_iter_kernels[iter_idx][kernel_idx].kernel_end
duration_list.append(kernel_end_time - kernel_start_time)
end_list.append(kernel_end_time - layer_start_time)
timeline.append(
{
"name": graph_kernel_names[kernel_idx],
"duration": np.mean(duration_list).tolist(),
"end": np.mean(end_list).tolist(),
}
)
print(f"{'Kernel':40s} {'Duration':>8s} {'End':>8s}")
print("-" * (40 + 1 + 8 + 1 + 8))
for entry in timeline:
print(
f"{kernel_short_name(entry['name'])[:40]:40s} "
f"{entry['duration'] / 1000.0:-8.1f} "
f"{entry['end'] / 1000.0:-8.1f}"
)
if args.error_on_unknown_kernel and warned_names:
raise ValueError("Unknown kernel names encountered")
if args.output:
if not args.output.endswith(".json"):
raise ValueError("Output file name must be *.json")
with open(args.output, "w") as f:
json.dump([{"name": "parse_e2e", "timeline": timeline}], f)