# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from functools import partial from typing import Callable, Optional, Union import cuda.bindings.driver as cuda_driver import cuda.bindings.runtime as cuda_runtime import cutlass.base_dsl.jit_executor from cupti import cupti from cutlass.cute.testing import JitArguments class CuptiProfiler: """A class for managing CUPTI profiling measurements with start, stop, and duration methods. This class provides a clean interface for measuring CUDA kernel execution times using CUPTI (CUDA Profiling Tools Interface). It encapsulates the complexity of buffer management, callback registration, and activity tracking. Example usage: profiler = CuptiProfiler() profiler.start() # ... run your CUDA kernels ... profiler.stop() duration = profiler.get_duration() # Returns total duration in milliseconds """ def __init__(self, buffer_size: int = 8 * 1024 * 1024): """Initialize the CUPTI profiler. Args: buffer_size: Size of the CUPTI buffer in bytes (default: 8MB) """ self.buffer_size = buffer_size self.timings = [] self._is_active = False self._buffer_requested_callback = None self._buffer_completed_callback = None def _buffer_requested(self): """Internal callback for CUPTI buffer requests.""" max_num_records = 0 return self.buffer_size, max_num_records def _buffer_completed(self, activities: list): """Internal callback for processing completed CUPTI activities.""" for activity in activities: start = activity.start if hasattr(activity, "start") else "nil" end = activity.end if hasattr(activity, "end") else "nil" duration = end - start if start != "nil" and end != "nil" else "nil" name = activity.name[:100] if hasattr(activity, "name") else "unknown" # Convert to milliseconds if duration != "nil": self.timings.append((name, duration / 1e6)) # print(f"Activity: {name}, Duration: {duration / 1e6} ms") def start(self): """Start CUPTI profiling. Enables CUPTI activity tracking for concurrent kernels and registers the necessary callbacks for buffer management. Raises: ValueError: If CUPTI activity cannot be enabled """ if self._is_active: raise RuntimeError("CUPTI profiler is already active") # Clear previous timings self.timings = [] try: cupti.activity_enable(cupti.ActivityKind.CONCURRENT_KERNEL) except cupti.cuptiError as e: raise ValueError( f"Error while enabling Activity Kind {cupti.ActivityKind.CONCURRENT_KERNEL.name}: {e}" ) # Register callbacks self._buffer_requested_callback = self._buffer_requested self._buffer_completed_callback = partial(self._buffer_completed) cupti.activity_register_callbacks( self._buffer_requested_callback, self._buffer_completed_callback ) self._is_active = True def stop(self): """Stop CUPTI profiling. Flushes all activities, disables CUPTI tracking, and finalizes the profiler. This method should be called after the kernels you want to measure have completed. """ if not self._is_active: raise RuntimeError("CUPTI profiler is not active") # Flush all activities and cleanup cupti.activity_flush_all(0) cupti.activity_disable(cupti.ActivityKind.CONCURRENT_KERNEL) cupti.finalize() self._is_active = False def get_duration(self) -> float: """Get the total duration of all measured activities in milliseconds. Returns: Total duration in milliseconds. Returns 0.0 if no activities were recorded. """ return sum(timing[1] for timing in self.timings) def _cuda_success(err: Union[tuple, cuda_runtime.cudaError_t, cuda_driver.CUresult], message: str): """Helper function to check CUDA API errors.""" if isinstance(err, tuple): _cuda_success(err[0], message) elif isinstance(err, cuda_runtime.cudaError_t): error_message = cuda_runtime.cudaGetErrorString(err)[1].decode("utf-8") if err != cuda_runtime.cudaError_t.cudaSuccess: raise RuntimeError(f"{message} : {error_message}") elif isinstance(err, cuda_driver.CUresult): if err != cuda_driver.CUresult.CUDA_SUCCESS: error_message = cuda_driver.cuGetErrorString(err)[1].decode("utf-8") raise RuntimeError(f"{message} : {error_message}") else: raise TypeError(f"{err} is an unexpected type : it should be a cudaError_t or CUresult") def _does_kernel_use_stream(kernel: Callable, stream: cuda_driver.CUstream, *args, **kwargs): """Check if the kernel uses the provided non-default stream. It does this by capturing the stream and then checking if any kernels were launched. :param kernel: The kernel to check :type kernel: Callable :param stream: The stream to check :type stream: cuda_driver.CUstream :return: True if the kernel uses the stream, False otherwise :rtype: bool """ assert int(stream) != int(cuda_driver.CUstream_flags.CU_STREAM_DEFAULT), ( "Stream must be a non-default stream" ) err = cuda_runtime.cudaStreamBeginCapture( stream, cuda_runtime.cudaStreamCaptureMode.cudaStreamCaptureModeThreadLocal ) _cuda_success(err, "Error on stream capture") kernel(*args, **kwargs) err, graph = cuda_runtime.cudaStreamEndCapture(stream) _cuda_success(err, "Error on stream capture") # Get number of nodes in warmup graph to check it matches what is expected err, _, num_nodes = cuda_runtime.cudaGraphGetNodes(graph) _cuda_success(err, "Error on querying graph") return num_nodes > 0 def benchmark( callable: Callable, *, warmup_iterations: int = 10, iterations: int = 100, stream: Optional[cuda_driver.CUstream] = None, kernel_arguments: Optional[JitArguments] = None, workspace_generator: Optional[Callable[[], JitArguments]] = None, workspace_count: int = 1, use_cuda_graphs: bool = False, use_cupti: bool = False, ) -> float: """Benchmarks a callable function with the specified parameters. For example, .. code-block:: python from cutlass.cute.testing import benchmark @cute.jit def user_function(a: cute.Tensor, b: cute.Tensor, c: cute.Tensor, stream: cuda_driver.CUstream): # contents of the function pass time_us = benchmark(user_function, kernel_arguments=JitArguments(a, b, c, stream) warmup_iterations=10, iterations=100 stream=stream) To prevent skewing results by repeately accessing the L2 cache, use the workspace_count and workspace_generator parameters to cycle through a number of different workspaces. .. code-block:: python from cutlass.cute.testing import benchmark @cute.jit def user_function(a: cute.Tensor, b: cute.Tensor, c: cute.Tensor): # contents of the function pass def workspace_generator(): # create a, b, and c return JitArguments(a, b, c) time_us = benchmark( user_function, workspace_generator=workspace_generator, workspace_count=10, warmup_iterations=10000, iterations=1000, ) To benchmark you may always configure the function being profiled (callable), the warmup iterations, and the number of profiling iterations. Whenever the kernel being benchmarked runs in a non-default stream, the stream must be provided through the stream parameter. To use CUDA graphs, the callable must be a compiled @cute.jit annotated function. When using CUDA graphs, the kernel must be launched in a non-default stream. :param callable: The function to benchmark :type callable: Callable :param warmup_iterations: Number of warmup iterations, defaults to 10 :type warmup_iterations: int, optional :param iterations: Number of benchmark iterations, defaults to 100 :type iterations: int, optional :param stream: Stream kernel is launched in, defaults to CUDA stream default :type stream: CUstream, None :param kernel_arguments: Kernel arguments to launch callable with, defaults to None :type kernel_arguments: JitArguments, None :param workspace_generator: Function that returns kernel arguments, defaults to None :type workspace_generator: Callable :param workspace_count: Number of workspaces (arguments) to loop through, looping through enough workspaces will keep the L2 cache cold :type workspace_count: int, optional :param use_cuda_graphs: Whether to use cuda graphs, defaults to False :type use_cuda_graphs: bool, optional :return: The benchmark time in microseconds :rtype: float """ if stream is None: stream = cuda_driver.CUstream(cuda_driver.CUstream_flags.CU_STREAM_DEFAULT) if workspace_count < 1: raise ValueError("workspace_count must be at least 1") float("nan") if workspace_generator is None: # If no workspace generator is provided, we need a single workspace if workspace_count != 1: raise ValueError("Need a single workspace if not providing a generator") # If no workspace generator is provided, we need a kernel_argument if kernel_arguments is None: raise ValueError("Please pass a kernel argument if not providing a generator") def workspace_generator(): return kernel_arguments workspaces = [workspace_generator() for _ in range(workspace_count)] for workspace in workspaces: if not isinstance(workspace, JitArguments): raise TypeError( "workspace_generator and/or kernel_arguments should use JitArguments type" ) def _loop_and_call_kernel(iterations: int, workspace_index: int = 0): for _ in range(iterations): current_workspace = workspaces[workspace_index] callable(*current_workspace.args, **current_workspace.kwargs) workspace_index = (workspace_index + 1) % workspace_count return workspace_index # Create CUDA events for timing err, start_event = cuda_driver.cuEventCreate(cuda_driver.CUevent_flags.CU_EVENT_DEFAULT) _cuda_success(err, "Error on creating event") err, end_event = cuda_driver.cuEventCreate(cuda_driver.CUevent_flags.CU_EVENT_DEFAULT) _cuda_success(err, "Error on creating event") elapsed_time = float("nan") if use_cuda_graphs: # Check if the callable is a JitCompiledFunction or JitExecutor # These are functions that can be called to launch kernels compiled_types = ( cutlass.base_dsl.jit_executor.JitCompiledFunction, cutlass.base_dsl.jit_executor.JitExecutor, ) if not isinstance(callable, compiled_types): raise TypeError("Function must be precompiled to be used with CUDA Graphs") # Check if the stream is a non-default stream if int(stream) == int(cuda_driver.CUstream_flags.CU_STREAM_DEFAULT): raise ValueError( "Measuring with CUDA Graphs requires executing in a non-default stream" ) workspace_index = 0 # Capture warmup graph err = cuda_runtime.cudaStreamBeginCapture( stream, cuda_runtime.cudaStreamCaptureMode.cudaStreamCaptureModeThreadLocal ) _cuda_success(err, "Error on stream capture") workspace_index = _loop_and_call_kernel(warmup_iterations) err, gwarm = cuda_runtime.cudaStreamEndCapture(stream) _cuda_success(err, "Error on stream capture") # Get number of nodes in warmup graph to check it matches what is expected err, _, num_nodes = cuda_runtime.cudaGraphGetNodes(gwarm) _cuda_success(err, "Error on querying graph") # Assertion is >= since we may launch multiple kernels in one host function if num_nodes < warmup_iterations: raise ValueError( "CUDA stream passed to benchmark does not match the stream the kernel was launched in" ) # Capture profiling graph err = cuda_runtime.cudaStreamBeginCapture( stream, cuda_runtime.cudaStreamCaptureMode.cudaStreamCaptureModeThreadLocal ) _cuda_success(err, "Error on stream capture") _loop_and_call_kernel(iterations, workspace_index) err, gprofile = cuda_runtime.cudaStreamEndCapture(stream) _cuda_success(err, "Error on stream capture") # Instantiate graphs err, gwarm = cuda_runtime.cudaGraphInstantiate(gwarm, 0) _cuda_success(err, "Error on graph instantiation") err, gprofile = cuda_runtime.cudaGraphInstantiate(gprofile, 0) _cuda_success(err, "Error on graph instantiation") # Launch warmup graph err = cuda_runtime.cudaGraphLaunch(gwarm, stream) _cuda_success(err, "Error on graph launch") # Record start time err = cuda_driver.cuEventRecord(start_event, stream) _cuda_success(err, "Error on recording event") # Launch profiling graph err = cuda_runtime.cudaGraphLaunch(gprofile, stream) _cuda_success(err, "Error on graph launch") # Record end time err = cuda_driver.cuEventRecord(end_event, stream) _cuda_success(err, "Error on recording event") err = cuda_driver.cuEventSynchronize(end_event) _cuda_success(err, "Error on synchronizing event") # Get elapsed time err, elapsed_time = cuda_driver.cuEventElapsedTime(start_event, end_event) _cuda_success(err, "Error on querying event") # Destroy graphs err = cuda_runtime.cudaGraphExecDestroy(gwarm) _cuda_success(err, "Error on destroying graph") err = cuda_runtime.cudaGraphExecDestroy(gprofile) _cuda_success(err, "Error on destroying graph") elif use_cupti: # Use the new CuptiProfiler class profiler = CuptiProfiler() # Warmup workspace_index = _loop_and_call_kernel(warmup_iterations) profiler.start() _loop_and_call_kernel(iterations, workspace_index) # Synchronize device err = cuda_runtime.cudaDeviceSynchronize() _cuda_success(err, "Error on synchronizing device") profiler.stop() elapsed_time = profiler.get_duration() else: if int(stream) != int( cuda_driver.CUstream_flags.CU_STREAM_DEFAULT ) and not _does_kernel_use_stream( callable, stream, *workspaces[0].args, **workspaces[0].kwargs ): raise ValueError( "CUDA stream passed to benchmark does not match the stream the kernel was launched in" ) # Not using graphs # Warmup workspace_index = _loop_and_call_kernel(warmup_iterations) # Record start event err = cuda_driver.cuEventRecord(start_event, stream) _cuda_success(err, "Error on recording event") _loop_and_call_kernel(iterations, workspace_index) # Record end event err = cuda_driver.cuEventRecord(end_event, stream) _cuda_success(err, "Error on recording event") # Synchronize end event err = cuda_driver.cuEventSynchronize(end_event) _cuda_success(err, "Error on synchronizing event") err, elapsed_time = cuda_driver.cuEventElapsedTime(start_event, end_event) _cuda_success(err, "Error on querying event") # Destroy events err = cuda_driver.cuEventDestroy(start_event) _cuda_success(err, "Error on destroying event") err = cuda_driver.cuEventDestroy(end_event) _cuda_success(err, "Error on destroying event") return elapsed_time / iterations * 1e3