Compare commits

...

29 Commits

Author SHA1 Message Date
David Addison
81463c58d0 NCCL_TESTS_VERSION 2.17.8 2026-01-06 15:00:17 -08:00
David Addison
7278698c1b Clarified use of Mebibytes and Gibibytes for sizes 2026-01-06 14:59:17 -08:00
Katie Gioioso
2656c58421 NCCL_TESTS_VERSION 2.17.7 2025-12-30 20:18:25 +00:00
Katie Gioioso
070d17528c refactor comm init 2025-12-30 20:18:25 +00:00
Katie Gioioso
332e61896f device api 2.28 is not compatible with 2.29. Check versions and print error if there is a mismatch 2025-12-30 20:18:25 +00:00
Katie Gioioso
24874bdaa8 Compatibility with 2.29 device API: use NCCL_DEV_COMM_REQUIREMENTS_INTIIALIZER, query properties to check for device api support 2025-12-30 20:18:24 +00:00
David Addison
7106245178 Add include of <limits> due to compilation error 2025-12-30 20:13:13 +00:00
David Addison
760c467f12 Add memory usage report option
Use -M 1 to dump library memory usage information
2025-12-30 20:12:58 +00:00
David Addison
4bc314aa27 Add README.md text for -J option 2025-11-21 11:31:48 -08:00
David Addison
51f2e7ed7c Remove trailing WS when timestamp option not used 2025-11-03 11:23:52 -08:00
David Addison
da0b547b1b NCCL_TESTS_VERSION 2.17.6 2025-10-28 10:22:08 -07:00
David Addison
e2af90af76 Add new report_timestamps option to README.md 2025-10-28 10:21:58 -07:00
David Addison
a62c975681 Add option to suffix a timestamp to each perf line
Based on code from yakovdyadkin & Scott Moe in MR 349

Adds -S 1 option to suffix each performance report line with
a timestamp. Format is "%Y-%m-%d %H:%M:%S"

This is especially useful when using the -N 0 option and looking
for hangs or failure events.
2025-10-28 10:11:23 -07:00
David Addison
0bb567cc02 NCCL_TESTS_VERSION 2.17.5 2025-10-28 09:34:56 -07:00
Shane Snyder
013c49e930 add necessary ifdef guards for device API tests 2025-10-28 09:34:26 -07:00
Shane Snyder
f66d20e360 add runtime guards for ncclAlltoAll() 2025-10-28 09:32:17 -07:00
David Addison
3744121a2d NCCL_TESTS_VERSION 2.17.4 2025-10-24 17:11:08 -07:00
David Addison
9641693e9b Add PRINT of nccl-tests, NCCL header and library versions 2025-10-24 17:10:57 -07:00
Shane Snyder
9829ea42b5 add GIN-based device API kernels to alltoall
- add GIN-only A2A kernel implementation
- add hybrid LSA+GIN A2A kernel implementation
- update perf test cases to expose a function for setting
  devCommRequirements for each device implementation and
  simplify devCommCreate code path to use this directly instead
  of complex fallback logic
- add missing call to devCommDestroy
2025-10-24 17:10:34 -07:00
David Addison
00f52811b8 Add support for JSON output to perf test framework
This adds support for writing structured information about the run to a JSON file.

Enable with -J <filename>.json

If the target JSON filename already exists then an incrementing numeric suffix will be
added to create <filename>.json.<n>
2025-10-17 12:01:25 -07:00
Stephen Sachs
abc46770a9 Check if sufficient GPUs are available
The CUDA error message "Test CUDA failure util.cu:706 'invalid device ordinal'"
is not as helpful. Test this explicitly and guide the user.
2025-10-02 15:48:13 -07:00
Sylvain Jeaugey
9a5c15461a Fix compilation for old NCCL versions
Fix compilation failure on ctaPolicy with NCCL <= 2.26.
Fix compilation failure on local_register with NCCL <= 2.18.
Fix ctaPolicy behavior if the tests are compiled with NCCL <= 2.26
but run with NCCL >= 2.27.
2025-09-05 09:15:06 -07:00
David Addison
e12dbb0a14 Update to align with the NCCL 2.28 release
Added Device API infrastructure and example kernels
Two new command line arguments:

  -D <num> device kernel implementation to use <0/1/2/3/4>
  -V <num> number of CTAs to launch device kernels with

Added new CTA Policy command line option:

  -x <policy> set the CTA Policy <0/1/2>
2025-09-04 17:23:22 -07:00
David Addison
c2cb96faac Update NVCUFLAGS and CXXFLAGS to use -std=c++14 2025-08-29 14:55:31 -07:00
David Addison
f2015cbe82 Modified warmup to run for more message sizes
Loops between minBytes and maxBytes doubling size each time

Reduced default warmup iteration count to 1 (was 5)
2025-08-25 13:57:51 -07:00
David Addison
fae7cb4727
Merge pull request #316 from martin-belanger/print-program-name
Print the name of the program being executed before and after test output
2025-07-24 14:58:54 -07:00
David Addison
6edafa0a9c Add extra reserved space during maxBytes calculation
Also, don't allow minBytes > maxBytes
2025-07-23 16:19:37 -07:00
David Addison
def2d3689c Minor fix to Makefile
Move comments to separate lines
2025-07-23 16:04:30 -07:00
Martin Belanger
dafb70408d Print the name of the program being executed
One thing missing from the stdout of each performance test is
the name of the test that is actually being run.

This patch adds 2 new messages to the stdout. At the beginning
of the execution of a test (e.g. sendrecv_perf) we will now
see this message:

  Collective test starting: sendrecv_perf

And at the end, we will now see this:

  Collective test concluded: sendrecv_perf

This is needed when running several tests consecutively and we're
trying to parse the stdout to collect the results.

For example, using a Python script to parse the stdout, one could
retrieve the results for each test and plot them on a graph. This
patch makes it easier to implement such a script.

Signed-off-by: Martin Belanger <martin.belanger@dell.com>
2025-06-03 11:43:02 -04:00
19 changed files with 2444 additions and 281 deletions

View File

@ -32,13 +32,14 @@ NCCL tests can run on multiple processes, multiple threads, and multiple CUDA de
### Quick examples
Run on single node with 8 GPUs (`-g 8`), scanning from 8 Bytes to 128MBytes :
Run on single node with 8 GPUs (`-g 8`), scanning from 8 Bytes to 128MiB (Mebibytes), doubling between each test (`-f 2`) :
```shell
$ ./build/all_reduce_perf -b 8 -e 128M -f 2 -g 8
```
Run 64 MPI processes on nodes with 8 GPUs each, for a total of 64 GPUs spread across 8 nodes :
Run 64 MPI processes on nodes with 8 GPUs each, for a total of 64 GPUs spread across 8 nodes.
Scanning from 8 Bytes to 32GiB (Gibibytes), doubling between each test (`-f 2`).
(NB: The nccl-tests binaries must be compiled with `MPI=1` for this case)
```shell
@ -57,10 +58,10 @@ All tests support the same set of arguments :
* `-t,--nthreads <num threads>` number of threads per process. Default : 1.
* `-g,--ngpus <GPUs per thread>` number of gpus per thread. Default : 1.
* Sizes to scan
* `-b,--minbytes <min size in bytes>` minimum size to start with. Default : 32M.
* `-e,--maxbytes <max size in bytes>` maximum size to end at. Default : 32M.
* Increments can be either fixed or a multiplication factor. Only one of those should be used
* `-i,--stepbytes <increment size>` fixed increment between sizes. Default : 1M.
* `-b,--minbytes <min size in bytes>` minimum size to start with. Default : 32M (Mebibytes).
* `-e,--maxbytes <max size in bytes>` maximum size to end at. Default : 32M (Mebibytes).
* Increments can be either fixed or a multiplication factor. Only one of those should be used.
* `-i,--stepbytes <increment size>` fixed increment between sizes. Default : 1M (Mebibytes).
* `-f,--stepfactor <increment factor>` multiplication factor between sizes. Default : disabled.
* NCCL operations arguments
* `-o,--op <sum/prod/min/max/avg/all>` Specify which reduction operation to perform. Only relevant for reduction operations like Allreduce, Reduce or ReduceScatter. Default : Sum.
@ -68,7 +69,7 @@ All tests support the same set of arguments :
* `-r,--root <root/all>` Specify which root to use. Only for operations with a root like broadcast or reduce. Default : 0.
* Performance
* `-n,--iters <iteration count>` number of iterations. Default : 20.
* `-w,--warmup_iters <warmup iteration count>` number of warmup iterations (not timed). Default : 5.
* `-w,--warmup_iters <warmup iteration count>` number of warmup iterations (not timed). Default : 1.
* `-m,--agg_iters <aggregation count>` number of operations to aggregate together in each iteration. Default : 1.
* `-N,--run_cycles <cycle count>` run & print each cycle. Default : 1; 0=infinite.
* `-a,--average <0/1/2/3>` Report performance as an average across all ranks (MPI=1 only). <0=Rank0,1=Avg,2=Min,3=Max>. Default : 1.
@ -79,6 +80,8 @@ All tests support the same set of arguments :
* `-G,--cudagraph <num graph launches>` Capture iterations as a CUDA graph and then replay specified number of times. Default : 0.
* `-C,--report_cputime <0/1>` Report CPU time instead of latency. Default : 0.
* `-R,--local_register <0/1/2>` enable local (1) or symmetric (2) buffer registration on send/recv buffers. Default : 0.
* `-S,--report_timestamps <0/1>` Add timestamp (`"%Y-%m-%d %H:%M:%S"`) to each performance report line. Default : 0.
* `-J,--output_file <file>` Write [JSON] output to filepath. Infer type from suffix (only `json` supported presently).
* `-T,--timeout <time in seconds>` timeout each test after specified number of seconds. Default : disabled.
### Running multiple operations in parallel

View File

@ -5,9 +5,12 @@
#
include common.mk
MPI ?= 0 # Set to 1 to enable MPI support (multi-process/multi-node)
NAME_SUFFIX ?= # e.g. _mpi when using MPI=1
DSO ?= 0 # Set to 1 to create and use libverifiable.so to reduce binary size
# Set to 1 to enable MPI support (multi-process/multi-node)
MPI ?= 0
# e.g. Set to _mpi when using MPI=1
NAME_SUFFIX ?=
# Set to 1 to create and use libverifiable.so to reduce binary size
DSO ?= 0
.PHONY: build clean
@ -45,12 +48,12 @@ include ../verifiable/verifiable.mk
.PRECIOUS: ${DST_DIR}/%.o
${DST_DIR}/%.o: %.cu common.h $(TEST_VERIFIABLE_HDRS)
${DST_DIR}/%.o: %.cu common.h util.h $(TEST_VERIFIABLE_HDRS)
@printf "Compiling %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) -c $<
${DST_DIR}/%$(NAME_SUFFIX).o: %.cu common.h $(TEST_VERIFIABLE_HDRS)
${DST_DIR}/%$(NAME_SUFFIX).o: %.cu common.h util.h $(TEST_VERIFIABLE_HDRS)
@printf "Compiling %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) -c $<
@ -61,12 +64,12 @@ ${DST_DIR}/timer.o: timer.cc timer.h
$(CXX) $(CXXFLAGS) -o $@ -c $<
ifeq ($(DSO), 1)
${DST_DIR}/%_perf$(NAME_SUFFIX): ${DST_DIR}/%.o ${DST_DIR}/common$(NAME_SUFFIX).o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_LIBS)
${DST_DIR}/%_perf$(NAME_SUFFIX): ${DST_DIR}/%.o ${DST_DIR}/common$(NAME_SUFFIX).o ${DST_DIR}/util$(NAME_SUFFIX).o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_LIBS)
@printf "Linking %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) $^ -L$(TEST_VERIFIABLE_BUILDDIR) -lverifiable ${NVLDFLAGS} -Xlinker "--enable-new-dtags" -Xlinker "-rpath,\$$ORIGIN:\$$ORIGIN/verifiable"
else
${DST_DIR}/%_perf$(NAME_SUFFIX):${DST_DIR}/%.o ${DST_DIR}/common$(NAME_SUFFIX).o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_OBJS)
${DST_DIR}/%_perf$(NAME_SUFFIX):${DST_DIR}/%.o ${DST_DIR}/common$(NAME_SUFFIX).o ${DST_DIR}/util$(NAME_SUFFIX).o ${DST_DIR}/timer.o $(TEST_VERIFIABLE_OBJS)
@printf "Linking %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) $^ ${NVLDFLAGS}

View File

@ -43,8 +43,14 @@ void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, doubl
*busBw = baseBw * factor;
}
testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream));
testResult_t AllGatherRunColl(void* sendbuff, size_t sendoffset,void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
NCCLCHECK(ncclAllGather(sptr, rptr, count, type, comm, stream));
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -84,8 +90,8 @@ testResult_t AllGatherRunTest(struct threadArgs* args, int root, ncclDataType_t
}
struct testEngine allGatherEngine = {
AllGatherGetBuffSize,
AllGatherRunTest
.getBuffSize = AllGatherGetBuffSize,
.runTest = AllGatherRunTest
};
#pragma weak ncclTestEngine=allGatherEngine

View File

@ -4,8 +4,32 @@
* See LICENSE.txt for license information
************************************************************************/
/*
* AllReduce Performance Test Implementation
*
* This file implements multiple AllReduce kernel variants optimized for different
* use cases within CUDA P2P connectivity.
* These kernels are designed to highlight the device API functionality. As well as how to optimize for best performance.
*
* IMPORTANT: All custom kernels require CUDA P2P connectivity since they require Load-Store Accessible (LSA) memory.
*
* Kernel Selection Strategy:
* - deviceImpl = 0: NCCL's built-in AllReduce implementation (fallback)
* - deviceImpl = 1: allReduceLsaKernel - Basic LSA implementation for demonstration and small message sizes.
* - deviceImpl = 2: allReduceLsaVectorizedKernel - Vectorized LSA for demonstration to achieve performance for large message sizes.
* - deviceImpl = 3: allReduceMultimemKernel - Multi-memory for hardware acceleration. Requires Multimem capable hardware but can offer better performance.
* - deviceImpl = 4: allReduceMultimemVectorizedKernel - Vectorized multi-memory for best performance. Requires Multimem capable hardware but can offer better performance.
*/
#include "cuda_runtime.h"
#include "common.h"
#include <algorithm>
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
#include "nccl_device.h"
#include "vector_types.h"
#include "multimem_ops.h"
constexpr int WARP_SIZE = 32;
#endif
void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, size_t eltSize, int nranks) {
*sendcount = count;
@ -40,9 +64,456 @@ void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, doubl
*busBw = baseBw * factor;
}
testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream));
return testSuccess;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,29,0)
// set devComm reqs for allreduce device kernels
testResult_t AllReduceGetDevCommRequirements(int deviceImpl, ncclDevCommRequirements* reqs, ncclCommProperties_t* commProperties) {
if (!reqs || !commProperties) return testInternalError;
switch(deviceImpl) {
case 1: // allReduceLsaKernel
case 2: // allReduceLsaVectorizedKernel
reqs->lsaBarrierCount = deviceCtaCount;
return testSuccess;
case 3: // allReduceMultimemKernel
case 4: // allReduceMultimemVectorizedKernel
if (!commProperties->multimemSupport) {
fprintf(stderr, "This test requires multimem support, but multimem support is not enabled for this communicator.\n");
return testInternalError;
}
reqs->lsaMultimem = true;
reqs->lsaBarrierCount = deviceCtaCount;
return testSuccess;
default:
return testNotImplemented;
}
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
bool AllReduceGetDevCommRequirements(int deviceImpl, ncclDevCommRequirements* reqs) {
if (!reqs) return false;
memset(reqs, 0, sizeof(*reqs));
switch(deviceImpl) {
case 1: // allReduceLsaKernel
case 2: // allReduceLsaVectorizedKernel
reqs->lsaBarrierCount = deviceCtaCount;
return true;
case 3: // allReduceMultimemKernel
case 4: // allReduceMultimemVectorizedKernelMultimem = true;
reqs->lsaMultimem = true;
reqs->lsaBarrierCount = deviceCtaCount;
return true;
default:
return false;
}
}
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
/*
* Kernel 1: allReduceLsaKernel - Basic LSA-based AllReduce
*
* Purpose: Provides a simple, deterministic AllReduce implementation for small to
* medium message sizes within CUDA P2P connectivity.
*
* Solution: Implements AllReduce using direct peer-to-peer memory access through
* LSA windows. Each rank reads from all other ranks, performs reduction, and
* writes the result back to all ranks using cooperative thread arrays.
*
* Key Optimizations:
* - LSA barriers for faster synchronization than global barriers
* - Global grid stride loop to distribute work across all ranks
* - Direct peer access within CUDA P2P connectivity for optimal bandwidth
*
* CUDA P2P Connectivity Requirement: CRITICAL - This kernel requires all participating
* ranks to be within the same CUDA P2P connectivity.
*
* Use Case: Small to medium messages (< 1MB) where simplicity and determinism
* are more important than maximum bandwidth.
*/
template <typename T>
__global__ void allReduceLsaKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
const int rank = devComm.rank, nRanks = devComm.nRanks;
const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks);
const int globalNthreads = blockDim.x * gridDim.x * nRanks;
for (size_t offset = globalTid; offset < count; offset += globalNthreads) {
T v = T{0};
for (int peer=0; peer<nRanks; peer++) {
T* sendPtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, peer);
v += sendPtr[offset];
}
for (int peer=0; peer<nRanks; peer++) {
T* recvPtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer);
recvPtr[offset] = v;
}
}
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
/*
* Kernel 2: allReduceLsaVectorizedKernel - Vectorized LSA-based AllReduce
*
* Purpose: Enhanced AllReduce implementation using vectorized memory operations
* and loop unrolling to maximize memory bandwidth utilization for large messages
* within CUDA P2P connectivity.
*
* Solution: Builds upon the basic LSA approach but adds vectorized loads/stores
* and aggressive loop unrolling to achieve higher memory bandwidth. Handles
* misaligned data gracefully while maximizing vectorized throughput. Not necessarily optimal for small message sizes.
*
* Key Optimizations:
* - Vectorized loads/stores for improved memory bandwidth (128-bit operations)
* - Loop unrolling to reduce loop overhead and improve instruction-level parallelism
* - Warp-coalesced memory access patterns for optimal memory controller utilization
* - Graceful handling of misaligned data with scalar fallback, comes at the cost of higher latency if not required.
*
* CUDA P2P Connectivity Requirement: CRITICAL - Same as basic LSA kernel. Requires
* CUDA P2P connectivity due to LSA memory access patterns.
*
* Use Case: Large messages where maximum memory bandwidth is
* critical and data alignment can be optimized.
*/
template <typename T>
__global__ void allReduceLsaVectorizedKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
// Compile time vector type and vector size mapping
using TN = typename VectorTypeMapping<T>::Type;
constexpr int VECTOR_FACTOR = sizeof(TN)/sizeof(T);
constexpr int UNROLL_FACTOR = 128/sizeof(TN); // Same as before 128 Bytes per thread
const int rank = devComm.rank, nRanks = devComm.nRanks;
const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks);
const int globalNthreads = blockDim.x * gridDim.x * nRanks;
// Since we use vector types, the non-vector allocated memory is not necessarily aligned.
const size_t alignment_offset = (sendoffset % sizeof(TN)) / sizeof(T);
const size_t aligned_count = count - alignment_offset;
const size_t vector_count = aligned_count / VECTOR_FACTOR;
const size_t remainder = aligned_count % VECTOR_FACTOR;
// As before
const int elements_per_block = globalNthreads * UNROLL_FACTOR;
const int num_blocks = vector_count / elements_per_block;
const int warp_id = globalTid / WARP_SIZE;
const int lane_id = globalTid % WARP_SIZE;
const int warp_offset = warp_id * WARP_SIZE * UNROLL_FACTOR;
const int lane_offset = lane_id;
const int warp_lane_offset = warp_offset + lane_offset;
// Handle misaligned elements first using scalar operations. Grid stride loop with scalar handling
if (alignment_offset > 0) {
for (size_t offset = globalTid; offset < alignment_offset; offset += globalNthreads) {
T v_scalar = T{0};
for (int peer=0; peer<nRanks; peer++) {
T* remotePtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, peer);
v_scalar += remotePtr[offset];
}
for (int peer=0; peer<nRanks; peer++) {
T* remotePtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer);
remotePtr[offset] = v_scalar;
}
}
}
// Handle vectorized memory that can be handled in whole chunks (no if)
for (int block = 0; block < num_blocks; block += 1) {
TN v[UNROLL_FACTOR] = {TN{0}};
const size_t block_offset = block * globalNthreads * UNROLL_FACTOR;
for (int peer=0; peer<nRanks; peer++) {
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
// Uses TN* as pointer type for vectorized pointer arithmatic
// The pointer is also adjusted for misalignment
TN* remotePtr = (TN*)ncclGetLsaPointer(sendwin, sendoffset + alignment_offset * sizeof(T), peer);
v[i] = vectorAdd(v[i], remotePtr[offset]);
}
}
for (int peer=0; peer<nRanks; ++peer) {
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
TN* remotePtr = (TN*)ncclGetLsaPointer(recvwin, recvoffset + alignment_offset * sizeof(T), peer);
remotePtr[offset] = v[i];
}
}
}
// Handle the last partial vectorized block, but with if conditions
const int block = num_blocks;
TN v[UNROLL_FACTOR] = {TN{0}};
const size_t block_offset = block * globalNthreads * UNROLL_FACTOR;
for (int peer=0; peer<nRanks; peer++) {
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
if (offset < vector_count) {
TN* remotePtr = (TN*)ncclGetLsaPointer(sendwin, sendoffset + alignment_offset * sizeof(T), peer);
v[i] = vectorAdd(v[i], remotePtr[offset]);
}
}
}
for (int peer=0; peer<nRanks; ++peer) {
#pragma unroll
for(int i=0; i < UNROLL_FACTOR; i++){
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
if (offset < vector_count) {
TN* remotePtr = (TN*)ncclGetLsaPointer(recvwin, recvoffset + alignment_offset * sizeof(T), peer);
remotePtr[offset] = v[i];
}
}
}
// Since the data doesn't have to be perfectly aligned with the vector size, we need to handle remaining elements.
if (remainder > 0) {
const size_t remainder_start = alignment_offset + vector_count * VECTOR_FACTOR;
const int globalTid_remainder = globalTid;
const int globalNthreads_remainder = globalNthreads;
for (size_t offset = globalTid_remainder; offset < remainder; offset += globalNthreads_remainder) {
T v_scalar = 0;
const size_t actual_offset = remainder_start + offset;
for (int peer=0; peer<nRanks; peer++) {
T* remotePtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, peer);
v_scalar += remotePtr[actual_offset];
}
for (int peer=0; peer<nRanks; peer++) {
T* remotePtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer);
remotePtr[actual_offset] = v_scalar;
}
}
}
// Sync
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
/*
* Kernel 3: allReduceMultimemKernel - Multi-memory Hardware-Accelerated AllReduce
*
* Purpose: High-performance AllReduce implementation using multi-memory primitives
* that leverage hardware acceleration for memory operations, significantly reducing
* SM utilization while maintaining high bandwidth within CUDA P2P connectivity.
*
* Solution: Replaces the O(Nrank) peer loop approach with hardware-accelerated
* multi-memory operations. The kernel initiates CUDA P2P reductions directly through
* hardware, eliminating the need for explicit peer-to-peer communication loops.
*
* Key Optimizations:
* - Multi-memory primitives for hardware-accelerated operations
* - Eliminates O(Nrank) scaling by using hardware reduction capabilities
* - Hardware-assisted memory synchronization and reduction
*
* CUDA P2P Connectivity Requirement: CRITICAL - Requires CUDA P2P connectivity and
* multi-memory support. Hardware acceleration is only available within the
* same CUDA P2P connectivity where multi-memory operations can be performed.
*
* Use Case: Large CUDA P2P connectivity where scaling to more ranks is desired.
*
* Hardware Requirements: Hopper+ architecture with multi-memory support enabled.
*/
template <typename T>
__global__ void allReduceMultimemKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamTagLsa(), blockIdx.x, true };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
const int rank = devComm.rank, nRanks = devComm.nRanks;
const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks);
const int globalNthreads = blockDim.x * gridDim.x * nRanks;
T* send_ptr = reinterpret_cast<T*>(ncclGetLsaMultimemPointer(sendwin, sendoffset, devComm));
T* recv_ptr = reinterpret_cast<T*>(ncclGetLsaMultimemPointer(recvwin, recvoffset, devComm));
for (size_t offset=globalTid; offset < count; offset += globalNthreads) {
if (offset < count) {
T v = multimemLoadSum<T,T>(send_ptr + offset);
multimemStore<T,T>(recv_ptr + offset, v);
}
}
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
/*
* Kernel 4: allReduceMultimemVectorizedKernel - Vectorized Multi-memory AllReduce
*
* Purpose: Ultimate performance AllReduce implementation combining multi-memory
* hardware acceleration with vectorized operations and loop unrolling for maximum
* bandwidth utilization within CUDA P2P connectivity.
*
* Solution: Combines the hardware acceleration benefits of multi-memory operations
* with the bandwidth optimization techniques from vectorized kernels. This kernel
* represents the highest performance option for large, aligned data sets.
*
* Key Optimizations:
* - Multi-memory primitives for hardware-accelerated operations
* - Vectorized loads/stores for maximum memory bandwidth (128-bit operations)
* - Aggressive loop unrolling for improved instruction-level parallelism
* - Warp-coalesced memory access patterns for optimal memory controller utilization
* - Hardware-assisted memory synchronization and reduction
* - Graceful handling of misaligned data with scalar fallback
*
* CUDA P2P Connectivity Requirement: CRITICAL - Requires CUDA P2P connectivity and
* multi-memory support. This kernel leverages both P2P locality and hardware
* acceleration for optimal performance.
*
* Hardware Requirements: Hopper+ architecture with multi-memory support enabled.
*
* Performance Note: This kernel provides the best performance for large, aligned
* data sets but requires careful data alignment for optimal vectorization benefits.
*/
template <typename T>
__global__ void allReduceMultimemVectorizedKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamTagLsa(), blockIdx.x, true };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
using TN = typename VectorTypeMapping<T>::Type;
constexpr int VECTOR_FACTOR = sizeof(TN)/sizeof(T);
constexpr int UNROLL_FACTOR = 128/sizeof(TN);
const int rank = devComm.rank, nRanks = devComm.nRanks;
const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks);
const int globalNthreads = blockDim.x * gridDim.x * nRanks;
// Calculate alignment offset to handle misaligned elements first
const size_t alignment_offset = (sendoffset % sizeof(TN)) / sizeof(T);
const size_t aligned_count = count - alignment_offset;
const size_t vector_count = aligned_count / VECTOR_FACTOR;
const size_t remainder = aligned_count % VECTOR_FACTOR;
const int elements_per_block = globalNthreads * UNROLL_FACTOR;
const int num_blocks = vector_count / elements_per_block;
const int warp_id = globalTid / WARP_SIZE;
const int lane_id = globalTid % WARP_SIZE;
const int warp_offset = warp_id * WARP_SIZE * UNROLL_FACTOR;
const int lane_offset = lane_id;
const int warp_lane_offset = warp_offset + lane_offset;
// Multimem pointers that handle scalar access for misaligned and remainder elements
T* send_ptr = reinterpret_cast<T*>(ncclGetLsaMultimemPointer(sendwin, sendoffset, devComm));
T* recv_ptr = reinterpret_cast<T*>(ncclGetLsaMultimemPointer(recvwin, recvoffset, devComm));
// Handle misaligned elements first using scalar operations
if (alignment_offset > 0) {
for (size_t offset = globalTid; offset < max(alignment_offset,count); offset += globalNthreads) {
T v_scalar = multimemLoadSum<T,T>(send_ptr + offset);
multimemStore<T,T>(recv_ptr+offset, v_scalar);
}
}
// separate TN* for 2 reasons. a) alignment offset, b) pointer arithmetic with the vectorized type
TN* send_ptrN = reinterpret_cast<TN*>(ncclGetLsaMultimemPointer(sendwin, sendoffset+alignment_offset*sizeof(T), devComm));
TN* recv_ptrN = reinterpret_cast<TN*>(ncclGetLsaMultimemPointer(recvwin, recvoffset+alignment_offset*sizeof(T), devComm));
// Handle vectorized memory that can be handled in whole chunks (no if)
for (int block = 0; block < num_blocks; block += 1) {
TN v[UNROLL_FACTOR] = {TN{0}};
const size_t block_offset = block * globalNthreads * UNROLL_FACTOR;
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
v[i] = multimemLoadSum<T,TN>(reinterpret_cast<T*>(send_ptrN + offset));
}
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
multimemStore<T,TN>(reinterpret_cast<T*>(recv_ptrN+offset), v[i]);
}
}
// Handle the last partial vectorized block, but with if conditions
const int block = num_blocks;
TN v[UNROLL_FACTOR] = {TN{0}};
const size_t block_offset = block * globalNthreads * UNROLL_FACTOR;
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
if (offset < vector_count) {
v[i] = multimemLoadSum<T,TN>(reinterpret_cast<T*>(send_ptrN+offset));
}
}
#pragma unroll
for (int i=0; i < UNROLL_FACTOR; i++) {
const int stride_offset = i * WARP_SIZE;
const size_t offset = warp_lane_offset + block_offset + stride_offset;
if (offset < vector_count) {
multimemStore<T,TN>(reinterpret_cast<T*>(recv_ptrN+offset), v[i]);
}
}
// Handle remainder elements using scalar operations
if (remainder > 0) {
const size_t remainder_start = alignment_offset + vector_count * VECTOR_FACTOR;
const int globalTid_remainder = globalTid;
const int globalNthreads_remainder = globalNthreads;
for (size_t offset = globalTid_remainder; offset < remainder; offset += globalNthreads_remainder) {
const size_t actual_offset = remainder_start + offset;
T v_scalar = multimemLoadSum<T,T>(send_ptr+actual_offset);
multimemStore<T,T>(recv_ptr+actual_offset, v_scalar);
}
}
// Sync
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
#endif
testResult_t AllReduceRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
switch (deviceImpl) {
case 0:
NCCLCHECK(ncclAllReduce(sptr, rptr, count, type, op, comm, stream));
return testSuccess;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
case 1:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceLsaKernel, type, op),
sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
case 2:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceLsaVectorizedKernel, type, op),
sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
case 3:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceMultimemKernel, type, op),
sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
case 4:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceMultimemVectorizedKernel, type, op),
sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
#endif
}
return testNotImplemented;
}
struct testColl allReduceTest = {
@ -94,8 +565,11 @@ testResult_t AllReduceRunTest(struct threadArgs* args, int root, ncclDataType_t
}
struct testEngine allReduceEngine = {
AllReduceGetBuffSize,
AllReduceRunTest
.getBuffSize = AllReduceGetBuffSize,
.runTest = AllReduceRunTest,
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
.getDevCommRequirements = AllReduceGetDevCommRequirements
#endif
};
#pragma weak ncclTestEngine=allReduceEngine

View File

@ -6,6 +6,12 @@
#include "cuda_runtime.h"
#include "common.h"
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
#include "nccl_device.h"
#include "vector_types.h"
#endif
#pragma weak ncclAlltoAll
void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, size_t eltSize, int nranks) {
*paramcount = (count/nranks) & -(16/eltSize);
@ -45,23 +51,293 @@ void AlltoAllGetBw(size_t count, int typesize, double sec, double* algBw, double
*busBw = baseBw * factor;
}
testResult_t AlltoAllRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t rankOffset = count * wordSize(type);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,29,0)
// set devComm reqs for alltoall device kernels
testResult_t AlltoAllGetDevCommRequirements(int deviceImpl, ncclDevCommRequirements* reqs, ncclCommProperties_t* commProperties) {
if (!reqs || !commProperties) return testInternalError;
#if NCCL_MAJOR < 2 || NCCL_MINOR < 7
printf("NCCL 2.7 or later is needed for alltoall. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#else
NCCLCHECK(ncclGroupStart());
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, count, type, r, comm, stream));
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, count, type, r, comm, stream));
switch(deviceImpl) {
case 1: // NvlAlltoAllKernel
case 2: // NvlAlltoAllKernelOptimized
reqs->lsaBarrierCount = deviceCtaCount;
return testSuccess;
case 3: // GinAlltoAllKernel
case 4: // HybridAlltoAllKernel (LSA+GIN)
if (commProperties->ginType == NCCL_GIN_TYPE_NONE) {
fprintf(stderr, "This test requires GIN support, but GIN support is not enabled for this communicator.\n");
return testInternalError;
}
reqs->barrierCount = deviceCtaCount;
reqs->ginSignalCount = deviceCtaCount;
return testSuccess;
default:
return testNotImplemented;
}
NCCLCHECK(ncclGroupEnd());
return testSuccess;
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
// set devComm reqs for alltoall device kernels
bool AlltoAllGetDevCommRequirements(int deviceImpl, ncclDevCommRequirements* reqs) {
if (!reqs) return false;
memset(reqs, 0, sizeof(*reqs));
switch(deviceImpl) {
case 1: // NvlAlltoAllKernel
case 2: // NvlAlltoAllKernelOptimized
reqs->lsaBarrierCount = deviceCtaCount;
return true;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,7)
case 3: // GinAlltoAllKernel
case 4: // HybridAlltoAllKernel (LSA+GIN)
reqs->barrierCount = deviceCtaCount;
reqs->ginSignalCount = deviceCtaCount;
return true;
#endif
default:
return false;
}
}
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
// shared scalar AlltoAll implementation used by both kernels
template <typename T>
__device__ void AlltoAllScalarImpl(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int rank, int nRanks, int tid, int nthreads) {
T* sendPtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, rank);
for (size_t offset = tid; offset < count; offset += nthreads) {
for (int peer = 0; peer < nRanks; peer++) {
T value = sendPtr[peer * count + offset];
T* recvPtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer);
recvPtr[rank * count + offset] = value;
}
}
}
// Device implementation #1 - simple NVL kernel
template <typename T>
__global__ void NvlAlltoAllKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
int rank = devComm.rank, nRanks = devComm.nRanks;
int tid = threadIdx.x + blockDim.x * blockIdx.x;
int nthreads = blockDim.x * gridDim.x;
AlltoAllScalarImpl<T>(sendwin, sendoffset, recvwin, recvoffset, count, rank, nRanks, tid, nthreads);
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
// Device implementation #2 - optimized NVL kernel using vectorization and unrolling
template <typename T>
__global__ void NvlAlltoAllKernelOptimized(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
ncclLsaBarrierSession<ncclCoopCta> bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed);
using TN = typename VectorTypeMapping<T>::Type;
constexpr int VECTOR_FACTOR = sizeof(TN) / sizeof(T);
constexpr int UNROLL_FACTOR = 128/sizeof(TN);
constexpr int PEER_UNROLL = 2;
int rank = devComm.rank, nRanks = devComm.nRanks;
int tid = threadIdx.x + blockDim.x * blockIdx.x;
int nthreads = blockDim.x * gridDim.x;
T* sendPtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, rank);
// alignment check: can we use vectorized operations?
bool canVectorize = (sizeof(TN) > sizeof(T)) && // Only if vectorization helps
(reinterpret_cast<uintptr_t>(sendPtr) % sizeof(TN) == 0) && // Base aligned
((count * sizeof(T)) % sizeof(TN) == 0); // Stride compatible
if (canVectorize) {
size_t vector_count = count / VECTOR_FACTOR;
int elements_per_iteration = nthreads * UNROLL_FACTOR;
// process aligned vectorized elements without bounds checks
size_t aligned_vector_count = (vector_count / elements_per_iteration) * elements_per_iteration;
for (size_t base_offset = tid; base_offset < aligned_vector_count; base_offset += elements_per_iteration) {
// unroll a limited number of peers at a time
for (int peerBase = 0; peerBase < nRanks; peerBase += PEER_UNROLL) {
int peersInGroup = min(PEER_UNROLL, nRanks - peerBase);
#pragma unroll
for (int p = 0; p < peersInGroup; p++) {
int peer = peerBase + p;
TN* sendVecPtr = (TN*)(sendPtr + peer * count);
TN* recvVecPtr = (TN*)((T*)ncclGetLsaPointer(recvwin, recvoffset, peer) + rank * count);
TN values[UNROLL_FACTOR];
// split load/store into separate loops for better overlap and ILP
#pragma unroll
for (int i = 0; i < UNROLL_FACTOR; i++) {
size_t offset = base_offset + i * nthreads;
values[i] = sendVecPtr[offset];
}
#pragma unroll
for (int i = 0; i < UNROLL_FACTOR; i++) {
size_t offset = base_offset + i * nthreads;
recvVecPtr[offset] = values[i];
}
}
}
}
// handle remaining vectorized elements that didn't fit in aligned chunks
for (size_t base_offset = aligned_vector_count + tid; base_offset < vector_count; base_offset += nthreads) {
for (int peer = 0; peer < nRanks; peer++) {
TN* sendVecPtr = (TN*)(sendPtr + peer * count);
TN* recvVecPtr = (TN*)((T*)ncclGetLsaPointer(recvwin, recvoffset, peer) + rank * count);
recvVecPtr[base_offset] = sendVecPtr[base_offset];
}
}
// handle any remaining elements not divisible by vectorization factor
size_t scalar_start = vector_count * VECTOR_FACTOR;
for (size_t offset = scalar_start + tid; offset < count; offset += nthreads) {
for (int peer = 0; peer < nRanks; peer++) {
T value = sendPtr[peer * count + offset];
T* recvPtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer);
recvPtr[rank * count + offset] = value;
}
}
} else {
// simple scalar fallback for unaligned data (identical to simple kernel)
AlltoAllScalarImpl<T>(sendwin, sendoffset, recvwin, recvoffset, count, rank, nRanks, tid, nthreads);
}
bar.sync(ncclCoopCta(), cuda::memory_order_release);
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,7)
template <typename T>
__global__ void GinAlltoAllKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
int ginContext = 0;
unsigned int signalIndex = 0;
ncclGin gin { devComm, ginContext };
uint64_t signalValue = gin.readSignal(signalIndex);
ncclBarrierSession<ncclCoopCta> bar { ncclCoopCta(), ncclTeamTagWorld(), gin, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed, ncclGinFenceLevel::Relaxed);
int tid = threadIdx.x + blockIdx.x * blockDim.x;
int nthreads = blockDim.x * gridDim.x;
/* send to all peers via GIN */
const size_t size = count * sizeof(T);
for (int r=tid; r<devComm.nRanks; r+=nthreads) {
gin.put(ncclTeamWorld(devComm), r,
recvwin, recvoffset + devComm.rank * size,
sendwin, sendoffset + r * size,
size, ncclGin_SignalInc{signalIndex});
}
gin.waitSignal(ncclCoopCta(), signalIndex, signalValue + devComm.nRanks);
gin.flush(ncclCoopCta());
bar.sync(ncclCoopCta(), cuda::memory_order_release, ncclGinFenceLevel::Relaxed);
}
template <typename T>
__global__ void HybridAlltoAllKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) {
int ginContext = 0;
unsigned int signalIndex = 0;
ncclGin gin { devComm, ginContext };
uint64_t signalValue = gin.readSignal(signalIndex);
ncclBarrierSession<ncclCoopCta> bar { ncclCoopCta(), ncclTeamTagWorld(), gin, blockIdx.x };
bar.sync(ncclCoopCta(), cuda::memory_order_relaxed, ncclGinFenceLevel::Relaxed);
int tid = threadIdx.x + blockIdx.x*blockDim.x;
int nthreads = blockDim.x * gridDim.x;
ncclTeam world = ncclTeamWorld(devComm);
ncclTeam lsa = ncclTeamLsa(devComm);
const int startLsa = world.rank - lsa.rank;
const int lsaSize = lsa.nRanks;
/* handle remote peers (i.e., non-LSA) using GIN */
const size_t size = count * sizeof(T);
for (int r = tid; r < startLsa; r += nthreads) {
gin.put(world, r,
recvwin, recvoffset + world.rank * size,
sendwin, sendoffset + r * size,
size, ncclGin_SignalInc{signalIndex});
}
for (int r = startLsa + lsaSize + tid; r < world.nRanks; r += nthreads) {
gin.put(world, r,
recvwin, recvoffset + world.rank * size,
sendwin, sendoffset + r * size,
size, ncclGin_SignalInc{signalIndex});
}
/* handle local peers with LSA */
T* sendLocal = (T*)ncclGetLocalPointer(sendwin, sendoffset);
for (size_t offset = tid; offset < count; offset += nthreads) {
for (int lp = 0; lp < lsa.nRanks; lp++) {
int wr = startLsa + lp;
T* recvPtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, lp);
recvPtr[world.rank * count + offset] = sendLocal[wr * count + offset];
}
}
int numRemotePeers = world.nRanks - lsa.nRanks;
gin.waitSignal(ncclCoopCta(), signalIndex, signalValue + numRemotePeers);
gin.flush(ncclCoopCta());
bar.sync(ncclCoopCta(), cuda::memory_order_release, ncclGinFenceLevel::Relaxed);
}
#endif
#endif
testResult_t AlltoAllRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
if (test_ncclVersion >= NCCL_VERSION(2,28,0)) {
NCCLCHECK(ncclAlltoAll(sptr, rptr, count, type, comm, stream));
return testSuccess;
}
// fall-through to send/recv implementation if ncclAlltoAll is not available
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0)
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
size_t rankOffset = count * wordSize(type);
NCCLCHECK(ncclGroupStart());
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclSend(sptr+r*rankOffset, count, type, r, comm, stream));
NCCLCHECK(ncclRecv(rptr+r*rankOffset, count, type, r, comm, stream));
}
NCCLCHECK(ncclGroupEnd());
#else
printf("NCCL 2.7 or later is needed for alltoall. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#endif
} else {
switch(deviceImpl) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
case 1:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(NvlAlltoAllKernel, type, op), sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
case 2:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(NvlAlltoAllKernelOptimized, type, op), sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,7)
case 3:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(GinAlltoAllKernel, type, op), sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
case 4:
TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(HybridAlltoAllKernel, type, op), sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream));
return testSuccess;
#endif
default:
return testNotImplemented;
}
}
return testSuccess;
}
struct testColl alltoAllTest = {
@ -100,8 +376,11 @@ testResult_t AlltoAllRunTest(struct threadArgs* args, int root, ncclDataType_t t
}
struct testEngine alltoAllEngine = {
AlltoAllGetBuffSize,
AlltoAllRunTest
.getBuffSize = AlltoAllGetBuffSize,
.runTest = AlltoAllRunTest,
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
.getDevCommRequirements = AlltoAllGetDevCommRequirements
#endif
};
#pragma weak ncclTestEngine=alltoAllEngine

View File

@ -39,18 +39,25 @@ void BroadcastGetBw(size_t count, int typesize, double sec, double* algBw, doubl
*busBw = baseBw * factor;
}
testResult_t BroadcastRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
testResult_t BroadcastRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
#if NCCL_MAJOR >= 2 && NCCL_MINOR >= 2
NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, type, root, comm, stream));
NCCLCHECK(ncclBroadcast(sptr, rptr, count, type, root, comm, stream));
#else
if (rank == root) {
NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream));
} else {
NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream));
}
if (rank == root) {
NCCLCHECK(ncclBcast(sptr, count, type, root, comm, stream));
} else {
NCCLCHECK(ncclBcast(rptr, count, type, root, comm, stream));
}
#endif
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -100,8 +107,8 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t
}
struct testEngine broadcastEngine = {
BroadcastGetBuffSize,
BroadcastRunTest
.getBuffSize = BroadcastGetBuffSize,
.runTest = BroadcastRunTest
};
#pragma weak ncclTestEngine=broadcastEngine

View File

@ -8,14 +8,26 @@
#include <pthread.h>
#include <cstdio>
#include <type_traits>
#include <limits>
#include <getopt.h>
#include <libgen.h>
#include <string.h>
#include <ctype.h>
#include "cuda.h"
#include <errno.h> /* program_invocation_short_name */
#include "util.h"
#include "../verifiable/verifiable.h"
#pragma weak ncclCommWindowRegister
#pragma weak ncclCommWindowDeregister
#pragma weak ncclDevCommCreate
#pragma weak ncclDevCommDestroy
#pragma weak ncclCommQueryProperties
#define DIVUP(x, y) \
(((x)+(y)-1)/(y))
int test_ncclVersion = 0; // init'd with ncclGetVersion()
#if NCCL_MAJOR >= 2
@ -67,37 +79,138 @@ int is_main_proc = 0;
thread_local int is_main_thread = 0;
// Command line parameter defaults
static int nThreads = 1;
static int nGpus = 1;
static size_t minBytes = 32*1024*1024;
static size_t maxBytes = 32*1024*1024;
static size_t stepBytes = 1*1024*1024;
static size_t stepFactor = 1;
static int datacheck = 1;
static int warmup_iters = 5;
static int iters = 20;
static int agg_iters = 1;
int nThreads = 1;
int nGpus = 1;
size_t minBytes = 32*1024*1024;
size_t maxBytes = 32*1024*1024;
size_t stepBytes = 1*1024*1024;
size_t stepFactor = 1;
int datacheck = 1;
int warmup_iters = 1;
int iters = 20;
int agg_iters = 1;
static int run_cycles = 1;
static int ncclop = ncclSum;
static int nccltype = ncclFloat;
static int ncclroot = 0;
static int parallel_init = 0;
static int blocking_coll = 0;
int parallel_init = 0;
int blocking_coll = 0;
static int streamnull = 0;
static int timeout = 0;
static int cudaGraphLaunches = 0;
int cudaGraphLaunches = 0;
static int report_cputime = 0;
static int report_timestamps = 0;
static int deviceImpl = 0;
int memory_report = 0;
int deviceCtaCount = 16; // Default number of CTAs for device implementation
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#define LOCAL_REGISTER 1
#define SYMMETRIC_REGISTER 2
static int local_register = 0;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
static int ctaPolicy = -1;
#endif
static int minCudaArch = 1<<30;
#define NUM_BLOCKS 32
enum output_file_type_t {
JSON_FILE_OUTPUT,
UNSPECIFIED_FILE_OUTPUT
};
// Return pointer to extension in `path` if one is found. An extension
// is the last `.` in the `path`, if there is no `/` following the `.`
// and there are characters after `.`.
//
// Therefore: returns 0 if no meaningful extension was found, or returns offset
// into string where extension begins
static const char *getExtension(const char *path) {
if (path == nullptr) return nullptr;
int last_dot = -1;
int last_slash = -1;
int pos;
for (pos = 0; path[pos] != '\0'; ++pos) {
switch (path[pos]) {
case '.':
last_dot = pos;
break;
case '/':
last_slash = pos;
break;
default:
break;
}
}
if (last_dot > last_slash && last_dot + 1 != pos) {
return path + last_dot + 1;
}
return nullptr;
}
static output_file_type_t classifyOutputFile(const char *filename) {
const char *extension = getExtension(filename);
if (extension != nullptr && strcasecmp(extension, "json") == 0) {
return JSON_FILE_OUTPUT;
}
return UNSPECIFIED_FILE_OUTPUT;
}
static void outputFileInit(output_file_type_t output_file_type,
const char *output_file, char argc, char **argv, char **envp) {
switch (output_file_type) {
case JSON_FILE_OUTPUT:
jsonOutputInit(output_file, argc, argv, envp);
break;
case UNSPECIFIED_FILE_OUTPUT:
default:
break;
}
}
static void outputFileFinalize(output_file_type_t output_file_type) {
switch (output_file_type) {
case JSON_FILE_OUTPUT:
jsonOutputFinalize();
break;
case UNSPECIFIED_FILE_OUTPUT:
default:
break;
}
}
testResult_t initComms(ncclComm_t* comms, int nComms, int firstRank, int nRanks, int* cudaDevs, ncclUniqueId& ncclId) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,14,0)
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (ctaPolicy >= 0)
config.CTAPolicy = ctaPolicy;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
config.nvlinkCentricSched = 1;
#endif
#endif
#endif
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nComms; i++) {
int rank = firstRank + i;
CUDACHECK(cudaSetDevice(cudaDevs[i]));
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,14,0)
NCCLCHECK(ncclCommInitRankConfig(comms+i, nRanks, ncclId, rank, &config));
#else
NCCLCHECK(ncclCommInitRank(comms+i, nRanks, ncclId, rank));
#endif
}
NCCLCHECK(ncclGroupEnd());
return testSuccess;
}
// NOTE: We use the binary system, so M=Mebibytes and G=Gibibytes
static double parsesize(const char *value) {
long long int units;
double size;
@ -397,10 +510,22 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
#endif
TESTCHECK(args->collTest->runColl(
(void*)(in_place ? recvBuff + args->sendInplaceOffset*rank : sendBuff),
(void*)(in_place ? recvBuff + args->recvInplaceOffset*rank : recvBuff),
count, type, op, root, args->comms[i], args->streams[i]));
if (deviceImpl == 0) {
TESTCHECK(args->collTest->runColl(
(void*)(in_place ? recvBuff : sendBuff), in_place ? args->sendInplaceOffset*rank : 0,
(void*)recvBuff, in_place ? args->recvInplaceOffset*rank : 0,
count, type, op, root, args->comms[i], args->streams[i], 0));
} else {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
void* sendwin = args->sendRegHandles[i];
void* recvwin = args->recvRegHandles[i];
CUDACHECK(cudaSetDevice(args->gpus[i]));
TESTCHECK(args->collTest->runColl(
(void*)(in_place ? recvwin : sendwin), shift + in_place ? args->sendInplaceOffset*rank : 0,
(void*)recvwin, shift + in_place ? args->recvInplaceOffset*rank : 0,
count, type, op, root, (ncclComm_t)(args->devComms+i), args->streams[i], deviceImpl));
#endif
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
if(opIndex >= ncclNumOps) {
@ -566,19 +691,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
double timeUsec = (report_cputime ? cputimeSec : deltaSec)*1.0E6;
char timeStr[100];
if (timeUsec >= 10000.0) {
sprintf(timeStr, "%7.0f", timeUsec);
} else if (timeUsec >= 100.0) {
sprintf(timeStr, "%7.1f", timeUsec);
} else {
sprintf(timeStr, "%7.2f", timeUsec);
}
if (args->reportErrors) {
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
writeBenchmarkLineBody(timeUsec, algBw, busBw, args->reportErrors, wrongElts, report_cputime, report_timestamps, in_place==0);
args->bw[0] += busBw;
args->bw_count[0]++;
@ -603,97 +716,162 @@ testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char*
// Sync to avoid first-call timeout
Barrier(args);
// Warm-up for large size
setupArgs(args->maxbytes, type, args);
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, type, op, root, 0, iter));
// Warm-up for all sizes (using a stepfactor of 2)
for (size_t size = args->minbytes; size <= args->maxbytes; size = size * 2) {
setupArgs(size, type, args);
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, type, op, root, 0, iter));
}
TESTCHECK(completeColl(args));
}
TESTCHECK(completeColl(args));
// Warm-up for small size
setupArgs(args->minbytes, type, args);
for (int iter = 0; iter < warmup_iters; iter++) {
TESTCHECK(startColl(args, type, op, root, 0, iter));
}
TESTCHECK(completeColl(args));
// Benchmark
long repeat = run_cycles;
do {
for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
setupArgs(size, type, args);
char rootName[100];
sprintf(rootName, "%6i", root);
PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
writeBenchmarkLinePreamble(max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, root);
TESTCHECK(BenchTime(args, type, op, root, 0));
TESTCHECK(BenchTime(args, type, op, root, 1));
PRINT("\n");
writeBenchmarkLineTerminator(iters, "");
}
} while (--repeat);
return testSuccess;
}
static void getGPUMemoryInfo(int64_t* ptotalGpuMem, int64_t* pfreeGpuMem) {
size_t freeGpuMem, totalGpuMem = 0;
cudaMemGetInfo(&freeGpuMem, &totalGpuMem);
if (ptotalGpuMem != nullptr) *ptotalGpuMem = totalGpuMem;
if (pfreeGpuMem != nullptr) *pfreeGpuMem = freeGpuMem;
}
testResult_t threadRunTests(struct threadArgs* args) {
// capture the free memory before
int64_t* totalGpuFreeMem = (int64_t*)calloc(args->nGpus*2, sizeof(int64_t));
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
getGPUMemoryInfo(nullptr, &totalGpuFreeMem[g]);
}
// Set device to the first of our GPUs. If we don't do that, some operations
// will be done on the current GPU (by default : 0) and if the GPUs are in
// exclusive mode those operations will fail.
CUDACHECK(cudaSetDevice(args->gpus[0]));
TESTCHECK(ncclTestEngine.runTest(args, ncclroot, (ncclDataType_t)nccltype, test_typenames[nccltype], (ncclRedOp_t)ncclop, test_opnames[ncclop]));
// Capture the memory used by the GPUs
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
getGPUMemoryInfo(nullptr, &totalGpuFreeMem[g + args->nGpus]);
*args->devMemUsed = std::max(*args->devMemUsed, totalGpuFreeMem[g] - totalGpuFreeMem[g + args->nGpus]);
}
free(totalGpuFreeMem);
return testSuccess;
}
testResult_t threadInit(struct threadArgs* args) {
char hostname[1024];
getHostName(hostname, 1024);
int nranks = args->nProcs*args->nThreads*args->nGpus;
//set main thread again
is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0;
NCCLCHECK(ncclGroupStart());
for (int i=0; i<args->nGpus; i++) {
int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i;
CUDACHECK(cudaSetDevice(args->gpus[i]));
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
jsonIdentifyWriter(is_main_thread);
// Capture GPU memory before initializing the NCCL communicators
int64_t* initFreeGpuMem = (int64_t*)calloc(args->nGpus*3, sizeof(int64_t));
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g]);
}
NCCLCHECK(ncclGroupEnd());
int firstRank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus;
TESTCHECK(initComms(args->comms, args->nGpus, firstRank, nranks, args->gpus, args->ncclId));
// Capture the memory used by the GPUs after initializing the NCCL communicators
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g + args->nGpus]);
*args->initGpuMem = std::max(*args->initGpuMem, initFreeGpuMem[g] - initFreeGpuMem[g + args->nGpus]);
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
NCCLCHECK(ncclGroupStart());
void **sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*args->nGpus) : NULL;
void **recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*args->nGpus) : NULL;
for (int i=0; i<args->nGpus; i++) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) {
NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, (ncclWindow_t*)&sendRegHandles[i], NCCL_WIN_COLL_SYMMETRIC));
NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, (ncclWindow_t*)&recvRegHandles[i], NCCL_WIN_COLL_SYMMETRIC));
NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, (ncclWindow_t*)&args->sendRegHandles[i], NCCL_WIN_COLL_SYMMETRIC));
NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, (ncclWindow_t*)&args->recvRegHandles[i], NCCL_WIN_COLL_SYMMETRIC));
} else
#endif
{
if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &sendRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &recvRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &args->sendRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &args->recvRegHandles[i]));
}
}
NCCLCHECK(ncclGroupEnd());
#endif
// Capture memory used by test buffers
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g + args->nGpus*2]);
args->bufferMemory[args->thread] = std::max(args->bufferMemory[args->thread], initFreeGpuMem[g + args->nGpus] - initFreeGpuMem[g + args->nGpus*2]);
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
/* Create device communicators based on test-specific requirements */
if (deviceImpl) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,29,0)
if (test_ncclVersion < NCCL_VERSION(2,29,0)) {
fprintf(stderr,
"Incompatible NCCL versions. nccl-tests was compiled with NCCL %d, but is running with NCCL %d. "
"The %d Device API is not compatible with versions before 2.29.\n",
NCCL_VERSION_CODE, test_ncclVersion, NCCL_VERSION_CODE);
return testInvalidUsage;
}
ncclDevCommRequirements reqs = NCCL_DEV_COMM_REQUIREMENTS_INITIALIZER;
if (!ncclTestEngine.getDevCommRequirements) {
fprintf(stderr, "Device implementation %d is not supported by this test\n", deviceImpl);
return testNotImplemented;
}
ncclCommProperties commProperties = NCCL_COMM_PROPERTIES_INITIALIZER;
NCCLCHECK(ncclCommQueryProperties(args->comms[0], &commProperties));
TESTCHECK(ncclTestEngine.getDevCommRequirements(deviceImpl, &reqs, &commProperties));
#else
if (test_ncclVersion >= NCCL_VERSION(2,29,0)) {
fprintf(stderr, "Incompatible NCCL versions. nccl-tests was compiled with NCCL 2.28, but is running with NCCL %d. "
"The 2.28 Device API is not compatible with later.\n",
test_ncclVersion);
return testInvalidUsage;
}
ncclDevCommRequirements reqs = {};
if (!ncclTestEngine.getDevCommRequirements ||
!ncclTestEngine.getDevCommRequirements(deviceImpl, &reqs)) {
fprintf(stderr, "Device implementation %d is not supported by this test\n", deviceImpl);
return testNotImplemented;
}
#endif
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus; i++) {
NCCLCHECK(ncclDevCommCreate(args->comms[i], &reqs, args->devComms+i));
}
NCCLCHECK(ncclGroupEnd());
}
// Capture memory used by test buffers
int64_t deviceCommMaxMem = 0;
for (int g = 0; g < args->nGpus; ++g) {
CUDACHECK(cudaSetDevice(args->gpus[g]));
int64_t freeGpuMem;
getGPUMemoryInfo(nullptr, &freeGpuMem);
deviceCommMaxMem = std::max(deviceCommMaxMem, initFreeGpuMem[g + args->nGpus*2] - freeGpuMem);
}
*args->initGpuMem += deviceCommMaxMem;
#endif
free(initFreeGpuMem);
TESTCHECK(threadRunTests(args));
for (int i=0; i<args->nGpus; i++) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) {
NCCLCHECK(ncclCommWindowDeregister(args->comms[i], (ncclWindow_t)sendRegHandles[i]));
NCCLCHECK(ncclCommWindowDeregister(args->comms[i], (ncclWindow_t)recvRegHandles[i]));
} else
#endif
{
if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], sendRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], recvRegHandles[i]));
}
#endif
NCCLCHECK(ncclCommDestroy(args->comms[i]));
}
return testSuccess;
}
@ -722,7 +900,7 @@ testResult_t AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, s
testResult_t run(); // Main function
int main(int argc, char* argv[]) {
int main(int argc, char* argv[], char **envp) {
// Make sure everyline is flushed so that we see the progress of the test
setlinebuf(stdout);
@ -731,7 +909,7 @@ int main(int argc, char* argv[]) {
#else
test_ncclVersion = NCCL_VERSION_CODE;
#endif
//printf("# NCCL_VERSION_CODE=%d ncclGetVersion=%d\n", NCCL_VERSION_CODE, test_ncclVersion);
//printf("# nccl-tests version %s NCCL_VERSION_CODE=%d ncclGetVersion=%d\n", NCCL_TESTS_VERSION, NCCL_VERSION_CODE, test_ncclVersion);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,0,0)
test_opnum = 4;
test_typenum = 9;
@ -756,6 +934,8 @@ int main(int argc, char* argv[]) {
// Parse args
double parsed;
int longindex;
char *output_file = nullptr;
static struct option longopts[] = {
{"nthreads", required_argument, 0, 't'},
{"ngpus", required_argument, 0, 'g'},
@ -777,15 +957,22 @@ int main(int argc, char* argv[]) {
{"timeout", required_argument, 0, 'T'},
{"cudagraph", required_argument, 0, 'G'},
{"report_cputime", required_argument, 0, 'C'},
{"report_timestamps", required_argument, 0, 'S'},
{"output_file", required_argument, 0, 'J'},
{"average", required_argument, 0, 'a'},
{"local_register", required_argument, 0, 'R'},
{"cta_policy", required_argument, 0, 'x'},
{"device_implementation", required_argument, 0, 'D'},
{"device_cta_count", required_argument, 0, 'V'},
{"memory", required_argument, 0, 'M'},
{"help", no_argument, 0, 'h'},
{}
};
while(1) {
int c;
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:N:p:c:o:d:r:z:y:T:hG:C:a:R:", longopts, &longindex);
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:N:p:c:o:d:r:z:y:T:hG:C:a:R:x:D:V:J:S:M:", longopts, &longindex);
if (c == -1)
break;
@ -874,6 +1061,12 @@ int main(int argc, char* argv[]) {
case 'C':
report_cputime = strtol(optarg, NULL, 0);
break;
case 'J':
output_file = strdup(optarg);
break;
case 'S':
report_timestamps = strtol(optarg, NULL, 0);
break;
case 'a':
average = (int)strtol(optarg, NULL, 0);
break;
@ -888,6 +1081,41 @@ int main(int argc, char* argv[]) {
printf("Option -R (register) is not supported before NCCL 2.19. Ignoring\n");
#endif
break;
case 'M':
memory_report = (int)strtol(optarg, NULL, 0);
break;
case 'x':
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
ctaPolicy = (int)strtol(optarg, NULL, 0);
if (ctaPolicy > 1 && test_ncclVersion < NCCL_VERSION(2,28,0)) {
printf("Option -x (cta_policy) %d is not supported before NCCL 2.28. Ignoring\n", ctaPolicy);
ctaPolicy = -1;
}
#else
printf("Option -x (cta_policy) is not supported before NCCL 2.27. Ignoring\n");
#endif
break;
case 'D':
if (test_ncclVersion >= NCCL_VERSION(2,28,0)) {
deviceImpl = (int)strtol(optarg, NULL, 0);
} else {
fprintf(stderr, "Option -D (device implementation) requires NCCL >= 2.28.0\n");
return -1;
}
break;
case 'V':
if (test_ncclVersion >= NCCL_VERSION(2,28,0)) {
deviceCtaCount = (int)strtol(optarg, NULL, 0);
if (deviceCtaCount <= 0 || deviceCtaCount > 128) {
fprintf(stderr, "device_cta_count (-V) must be positive and less than 128, got %d. "
"Using default value 16.\n", deviceCtaCount);
deviceCtaCount = 16;
}
} else {
fprintf(stderr, "Option -V (device CTA count) requires NCCL >= 2.28.0\n");
return -1;
}
break;
case 'h':
default:
if (c != 'h') printf("invalid option '%c'\n", c);
@ -918,8 +1146,14 @@ int main(int argc, char* argv[]) {
"[-T,--timeout <time in seconds>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-S,--report_timestamps <0/1> report timestamps (default 0)] \n\t"
"[-J,--output_file <file> write output to filepath, if accessible. Infer type from suffix (only json supported presently.)] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-R,--local_register <0/1/2> enable local (1) or symmetric (2) buffer registration on send/recv buffers (default: disable (0))] \n\t"
"[-x,--cta_policy <0/1/2> set CTA policy (NCCL_CTA_POLICY_DEFAULT (0), NCCL_CTA_POLICY_EFFICIENCY (1), NCCL_CTA_POLICY_ZERO (2)) (default: do not set)] \n\t"
"[-D,--device_implementation <implementation number> enable device implementation (default: 0, use NCCL implementation; requires -R 2 if > 0)] \n\t"
"[-V,--device_cta_count <number> set number of CTAs for device implementation (default: 16)] \n\t"
"[-M,--memory_report <0/1> enable memory usage report (default: 0)] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
@ -931,10 +1165,29 @@ int main(int argc, char* argv[]) {
(unsigned long long)maxBytes);
return -1;
}
if (deviceImpl > 0 && (local_register != SYMMETRIC_REGISTER)) {
fprintf(stderr, "device implementation (-D > 0) requires enabling symmetric memory registration (-R 2)\n");
return -1;
}
#ifdef MPI_SUPPORT
MPI_Init(&argc, &argv);
#endif
TESTCHECK(run());
const output_file_type_t output_file_type = classifyOutputFile(output_file);
outputFileInit(output_file_type, output_file, argc, argv, envp);
if(output_file) {
free(output_file);
output_file = nullptr;
}
testResult_t result = run();
outputFileFinalize(output_file_type);
TESTCHECK(result);
return 0;
}
@ -1008,49 +1261,22 @@ testResult_t run() {
#endif
is_main_thread = is_main_proc = (proc == 0) ? 1 : 0;
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
PRINT("#\n");
jsonIdentifyWriter(is_main_thread);
PRINT("# Using devices\n");
#define MAX_LINE 2048
char line[MAX_LINE];
int len = 0;
size_t maxMem = ~0;
char* envstr = getenv("NCCL_TESTS_DEVICE");
int gpu0 = envstr ? atoi(envstr) : -1;
for (int i=0; i<nThreads*nGpus; i++) {
int cudaDev = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
int rank = proc*nThreads*nGpus+i;
cudaDeviceProp prop;
CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
len += snprintf(line+len, MAX_LINE-len, "# Rank %2d Group %2d Pid %6d on %10s device %2d [%04x:%02x:%02x] %s\n",
rank, color, getpid(), hostname, cudaDev, prop.pciDomainID, prop.pciBusID, prop.pciDeviceID, prop.name);
maxMem = std::min(maxMem, prop.totalGlobalMem);
testResult_t report_result = writeDeviceReport(&maxMem, localRank, proc, totalProcs, color, hostname, program_invocation_short_name);
if(report_result != testSuccess) {
return report_result;
}
#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
if (proc == 0) {
for (int p = 0; p < totalProcs; p++)
PRINT("%s", lines+MAX_LINE*p);
free(lines);
}
MPI_Allreduce(MPI_IN_PLACE, &maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
#else
PRINT("%s", line);
#endif
// Reserve 1GiB of memory for each 16GiB installed, but limit to a max of 4GiB
const size_t GB = (1ULL << 30);
size_t reserveMem = std::min(DIVUP(maxMem, 16*GB) * 1*GB, 4*GB);
// We need sendbuff, recvbuff, expected (when datacheck enabled), plus 1G for the rest.
size_t memMaxBytes = (maxMem - (1<<30)) / (datacheck ? 3 : 2);
size_t memMaxBytes = (maxMem - reserveMem - 1*GB) / (datacheck ? 3 : 2);
if (maxBytes > memMaxBytes) {
maxBytes = memMaxBytes;
if (minBytes > maxBytes) minBytes = maxBytes;
if (proc == 0) printf("#\n# Reducing maxBytes to %ld due to memory limitation\n", maxBytes);
}
@ -1071,12 +1297,11 @@ testResult_t run() {
ncclTestEngine.getBuffSize(&sendBytes, &recvBytes, (size_t)maxBytes, (size_t)ncclProcs*nGpus*nThreads);
envstr = getenv("NCCL_TESTS_DEVICE");
gpu0 = envstr ? atoi(envstr) : -1;
char* envstr = getenv("NCCL_TESTS_DEVICE");
int gpu0 = envstr ? atoi(envstr) : -1;
for (int i=0; i<nGpus*nThreads; i++) {
gpus[i] = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
CUDACHECK(cudaSetDevice(gpus[i]));
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
if (streamnull) {
streams[i] = NULL;
}
@ -1111,25 +1336,41 @@ testResult_t run() {
//if parallel init is not selected, use main thread to initialize NCCL
ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
void **sendRegHandles = NULL;
void **recvRegHandles = NULL;
void* sendRegHandles[nThreads*nGpus];
void* recvRegHandles[nThreads*nGpus];
memset(sendRegHandles, 0, sizeof(sendRegHandles));
memset(recvRegHandles, 0, sizeof(recvRegHandles));
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
ncclDevComm devComms[nThreads*nGpus];
#endif
int64_t initGpuMem[nThreads] = {0};
int64_t bufferMemory[nThreads] = {0};
if (!parallel_init) {
if (ncclProcs == 1) {
NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus));
} else {
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nGpus*nThreads; i++) {
CUDACHECK(cudaSetDevice(gpus[i]));
NCCLCHECK(ncclCommInitRank(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i));
// Capture the memory used by the GPUs before initializing the NCCL communicators
int64_t* initFreeGpuMem = (int64_t*)calloc(nGpus*3, sizeof(int64_t));
for (int g = 0; g < nGpus; ++g) {
CUDACHECK(cudaSetDevice(gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g]);
}
//if parallel init is not selected, use main thread to initialize NCCL
TESTCHECK(initComms(comms, nGpus*nThreads, ncclProc*nThreads*nGpus, ncclProcs*nThreads*nGpus, gpus, ncclId));
// Capture the memory used by the GPUs after initializing the NCCL communicators
for (int g = 0; g < nGpus; ++g) {
CUDACHECK(cudaSetDevice(gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g + nGpus]);
}
for ( size_t t = 0; t < nThreads; ++t) {
for (int g = 0; g < nGpus; ++g) {
initGpuMem[t] = std::max(initGpuMem[t], initFreeGpuMem[g] - initFreeGpuMem[g + nGpus]);
}
NCCLCHECK(ncclGroupEnd());
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
NCCLCHECK(ncclGroupStart());
sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*nThreads*nGpus) : NULL;
recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*nThreads*nGpus) : NULL;
for (int i=0; i<nGpus*nThreads; i++) {
CUDACHECK(cudaSetDevice(gpus[i]));
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) {
NCCLCHECK(ncclCommWindowRegister(comms[i], sendbuffs[i], maxBytes, (ncclWindow_t*)&sendRegHandles[i], NCCL_WIN_COLL_SYMMETRIC));
@ -1143,27 +1384,82 @@ testResult_t run() {
}
NCCLCHECK(ncclGroupEnd());
#endif
// Capture memory used by after allocating buffers
for (int g = 0; g < nGpus; ++g) {
CUDACHECK(cudaSetDevice(gpus[g]));
getGPUMemoryInfo(nullptr, &initFreeGpuMem[g + nGpus*2]);
}
for ( size_t t = 0; t < nThreads; ++t) {
for (int g = 0; g < nGpus; ++g) {
bufferMemory[t] = std::max(bufferMemory[t], initFreeGpuMem[g + nGpus] - initFreeGpuMem[g + nGpus*2]);
}
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
/* Create device communicators based on test-specific requirements */
if (deviceImpl) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,29,0)
if (test_ncclVersion < NCCL_VERSION(2,29,0)) {
fprintf(stderr,
"Incompatible NCCL versions. nccl-tests was compiled with NCCL %d, but is running with NCCL %d. "
"The %d Device API is not compatible with versions before 2.29.\n",
NCCL_VERSION_CODE, test_ncclVersion, NCCL_VERSION_CODE);
return testInvalidUsage;
}
ncclDevCommRequirements reqs = NCCL_DEV_COMM_REQUIREMENTS_INITIALIZER;
if (!ncclTestEngine.getDevCommRequirements) {
fprintf(stderr, "Device implementation %d is not supported by this test\n", deviceImpl);
return testNotImplemented;
}
ncclCommProperties commProperties = NCCL_COMM_PROPERTIES_INITIALIZER;
NCCLCHECK(ncclCommQueryProperties(comms[0], &commProperties));
TESTCHECK(ncclTestEngine.getDevCommRequirements(deviceImpl, &reqs, &commProperties));
#else
if (test_ncclVersion >= NCCL_VERSION(2,29,0)) {
fprintf(stderr, "Incompatible NCCL versions. nccl-tests was compiled with NCCL 2.28, but is running with NCCL %d. "
"The 2.28 Device API is not compatible with later versions.\n", test_ncclVersion);
return testInvalidUsage;
}
ncclDevCommRequirements reqs = {};
if (!ncclTestEngine.getDevCommRequirements ||
!ncclTestEngine.getDevCommRequirements(deviceImpl, &reqs)) {
fprintf(stderr, "Device implementation %d is not supported by this test\n", deviceImpl);
return testNotImplemented;
}
#endif
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nGpus * nThreads; i++) {
NCCLCHECK(ncclDevCommCreate(comms[i], &reqs, devComms+i));
}
NCCLCHECK(ncclGroupEnd());
}
int64_t deviceCommMaxMem = 0;
for (int g = 0; g < nGpus; ++g) {
CUDACHECK(cudaSetDevice(gpus[g]));
int64_t freeGpuMem;
getGPUMemoryInfo(nullptr, &freeGpuMem);
deviceCommMaxMem = std::max(deviceCommMaxMem, initFreeGpuMem[g + nGpus*2] - freeGpuMem);
}
for ( size_t t = 0; t < nThreads; ++t) {
initGpuMem[t] += deviceCommMaxMem;
}
#endif
free(initFreeGpuMem);
}
int errors[nThreads];
double bw[nThreads];
double* delta;
CUDACHECK(cudaHostAlloc(&delta, sizeof(double)*nThreads*NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped));
int64_t devMemUsed[nThreads];
int bw_count[nThreads];
for (int t=0; t<nThreads; t++) {
bw[t] = 0.0;
errors[t] = bw_count[t] = 0;
devMemUsed[t] = std::numeric_limits<int64_t>::min();
}
fflush(stdout);
const char* timeStr = report_cputime ? "cputime" : "time";
PRINT("#\n");
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
writeResultHeader(report_cputime, report_timestamps);
struct testThread threads[nThreads];
memset(threads, 0, sizeof(struct testThread)*nThreads);
@ -1185,6 +1481,13 @@ testResult_t run() {
threads[t].args.sendbuffs = sendbuffs+t*nGpus;
threads[t].args.recvbuffs = recvbuffs+t*nGpus;
threads[t].args.expected = expected+t*nGpus;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
threads[t].args.devComms = devComms+t*nGpus;
#endif
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
threads[t].args.sendRegHandles = sendRegHandles+t*nGpus;
threads[t].args.recvRegHandles = recvRegHandles+t*nGpus;
#endif
threads[t].args.ncclId = ncclId;
threads[t].args.comms=comms+t*nGpus;
threads[t].args.streams=streams+t*nGpus;
@ -1192,6 +1495,9 @@ testResult_t run() {
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
threads[t].args.bw_count=bw_count+t;
threads[t].args.initGpuMem = initGpuMem + t;
threads[t].args.bufferMemory = bufferMemory + t;
threads[t].args.devMemUsed = devMemUsed + t;
threads[t].args.reportErrors = datacheck;
@ -1210,11 +1516,17 @@ testResult_t run() {
errors[0] += errors[t];
bw[0] += bw[t];
bw_count[0] += bw_count[t];
devMemUsed[0] = std::max(devMemUsed[0], devMemUsed[t]);
initGpuMem[0] = std::max(initGpuMem[0], initGpuMem[t]);
bufferMemory[0] = std::max(bufferMemory[0], bufferMemory[t]);
}
}
#ifdef MPI_SUPPORT
MPI_Allreduce(MPI_IN_PLACE, &errors[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, &devMemUsed[0], 1, MPI_INT64_T, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, &initGpuMem[0], 1, MPI_INT64_T, MPI_MAX, MPI_COMM_WORLD);
MPI_Allreduce(MPI_IN_PLACE, &bufferMemory[0], 1, MPI_INT64_T, MPI_MAX, MPI_COMM_WORLD);
#endif
if (!parallel_init) {
@ -1248,31 +1560,33 @@ testResult_t run() {
if (datacheck) CUDACHECK(cudaFree(expected[i]));
#endif
}
CUDACHECK(cudaFreeHost(delta));
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
free(sendRegHandles);
free(recvRegHandles);
#endif
envstr = getenv("NCCL_TESTS_MIN_BW");
double check_avg_bw = envstr ? atof(envstr) : -1;
const double check_avg_bw = envstr ? atof(envstr) : -1;
bw[0] /= bw_count[0];
PRINT("# Out of bounds values : %d %s\n", errors[0], errors[0] ? "FAILED" : "OK");
PRINT("# Avg bus bandwidth : %g %s\n", bw[0], check_avg_bw == -1 ? "" : (bw[0] < check_avg_bw*(0.9) ? "FAILED" : "OK"));
PRINT("#\n");
writeResultFooter(errors, bw, check_avg_bw, program_invocation_short_name);
if (memory_report) {
memInfo_t memInfos[3];
memInfos[0] = { initGpuMem[0], "Initialization" };
memInfos[1] = { bufferMemory[0], "User-Allocated" };
memInfos[2] = { devMemUsed[0], "Collective" };
writeMemInfo(memInfos, 3);
}
finalizeFooter();
#ifdef MPI_SUPPORT
MPI_Comm_free(&mpi_comm);
MPI_Finalize();
#endif
PRINT("%s\n", ncclGetLastError(NULL));
writeErrors();
// 'cuda-memcheck --leak-check full' requires this
cudaDeviceReset();
if (errors[0] || bw[0] < check_avg_bw*(0.9))
exit(EXIT_FAILURE);
return testNumResults;
else
exit(EXIT_SUCCESS);
return testSuccess;
}

View File

@ -6,7 +6,12 @@
#ifndef __COMMON_H__
#define __COMMON_H__
#define NCCL_TESTS_VERSION "2.17.8"
#include "nccl.h"
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
#include "nccl_device.h"
#endif
#include <stdio.h>
#include <cstdint>
#include <algorithm>
@ -66,7 +71,9 @@ typedef enum {
testCudaError = 2,
testNcclError = 3,
testTimeout = 4,
testNumResults = 5
testNotImplemented = 5,
testInvalidUsage = 6,
testNumResults = 7, // Must be last
} testResult_t;
// Relay errors up and trace
@ -91,8 +98,8 @@ struct testColl {
testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type,
ncclRedOp_t op, int root, int rep, int in_place);
void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks);
testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type,
ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream);
testResult_t (*runColl)(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset,
size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int implIndex);
};
extern struct testColl allReduceTest;
extern struct testColl allGatherTest;
@ -105,6 +112,12 @@ struct testEngine {
void (*getBuffSize)(size_t *sendcount, size_t *recvcount, size_t count, int nranks);
testResult_t (*runTest)(struct threadArgs* args, int root, ncclDataType_t type,
const char* typeName, ncclRedOp_t op, const char* opName);
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,29,0)
testResult_t (*getDevCommRequirements)(int deviceImpl, ncclDevCommRequirements* reqs, ncclCommProperties_t* commProperties);
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
bool (*getDevCommRequirements)(int deviceImpl, ncclDevCommRequirements* reqs);
#endif
};
extern struct testEngine ncclTestEngine;
@ -131,6 +144,9 @@ struct threadArgs {
size_t recvInplaceOffset;
ncclUniqueId ncclId;
ncclComm_t* comms;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
ncclDevComm* devComms;
#endif
cudaStream_t* streams;
void** expected;
@ -142,6 +158,15 @@ struct threadArgs {
int reportErrors;
struct testColl* collTest;
int64_t* initGpuMem;
int64_t* bufferMemory;
int64_t* devMemUsed;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
void** sendRegHandles;
void** recvRegHandles;
#endif
};
typedef testResult_t (*threadFunc_t)(struct threadArgs* args);
@ -164,6 +189,9 @@ extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, voi
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '\0') {
return;
}
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
@ -263,6 +291,7 @@ static size_t wordSize(ncclDataType_t type) {
}
extern int test_ncclVersion; // init'd with ncclGetVersion()
extern int deviceCtaCount; // number of CTAs for device implementation
constexpr int test_opNumMax = (int)ncclNumOps + (NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) ? 1 : 0);
extern int test_opnum;
extern int test_typenum;
@ -299,6 +328,38 @@ static int ncclstringtoop (char *str) {
extern int is_main_proc;
extern thread_local int is_main_thread;
#define PRINT if (is_main_thread) printf
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
template <typename F>
testResult_t testLaunchDeviceKernel(F kernel, void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
if (kernel == nullptr) return testNotImplemented;
ncclDevComm* devComm = (ncclDevComm*)comm;
ncclWindow_t sendwin = (ncclWindow_t)sendbuff;
ncclWindow_t recvwin = (ncclWindow_t)recvbuff;
kernel<<<deviceCtaCount, 512, 0, stream>>>(sendwin, sendoffset, recvwin, recvoffset, count, root, *devComm);
return testSuccess;
}
#define SPECIALIZE_KERNEL(kernel, type, op) \
( op != ncclSum ? nullptr : \
type == ncclInt8 ? kernel<int8_t> : \
type == ncclUint8 ? kernel<uint8_t> : \
type == ncclInt32 ? kernel<int32_t> : \
type == ncclUint32 ? kernel<uint32_t> : \
type == ncclInt64 ? kernel<int64_t> : \
type == ncclUint64 ? kernel<uint64_t> : \
type == ncclFloat16 ? kernel<half> : \
type == ncclFloat32 ? kernel<float> : \
type == ncclFloat64 ? kernel<double> : \
nullptr \
)
#else
template <typename F>
testResult_t testLaunchDeviceKernel(F kernel, void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
return testNotImplemented;
}
#define SPECIALIZE_KERNEL(kernel, type, op) nullptr
#endif
#endif

View File

@ -17,6 +17,13 @@ CUDA_VERSION = $(strip $(shell which $(NVCC) >/dev/null && $(NVCC) --version | g
CUDA_MAJOR = $(shell echo $(CUDA_VERSION) | cut -d "." -f 1)
CUDA_MINOR = $(shell echo $(CUDA_VERSION) | cut -d "." -f 2)
# CUDA 13.0 requires c++17
ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 13; echo $$?),0)
CXXSTD ?= -std=c++17
else
CXXSTD ?= -std=c++14
endif
# Better define NVCC_GENCODE in your environment to the minimal set
# of archs to reduce compile time.
ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 13; echo $$?),0)
@ -59,8 +66,8 @@ NVCC_GENCODE ?= -gencode=arch=compute_35,code=sm_35 \
-gencode=arch=compute_70,code=compute_70
endif
NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++11
CXXFLAGS := -std=c++11
NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) $(CXXSTD)
CXXFLAGS := $(CXXSTD)
LDFLAGS := -L${CUDA_LIB} -lcudart -lrt
NVLDFLAGS := -L${CUDA_LIB} -l${CUDARTLIB} -lrt

View File

@ -43,23 +43,35 @@ void GatherGetBw(size_t count, int typesize, double sec, double* algBw, double*
*busBw = baseBw * factor;
}
testResult_t GatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankOffset = count * wordSize(type);
if (count == 0) return testSuccess;
testResult_t GatherRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankOffset = count * wordSize(type);
if (count == 0) return testSuccess;
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sendbuff, count, type, root, comm, stream));
if (rank == root) {
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, count, type, r, comm, stream));
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
NCCLCHECK(ncclGather(sptr, rptr, count, type, root, comm, stream));
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0)
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sptr, count, type, root, comm, stream));
if (rank == root) {
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclRecv(rptr + r * rankOffset, count, type, r, comm, stream));
}
}
NCCLCHECK(ncclGroupEnd());
#else
printf("NCCL 2.7 or later is needed for gather. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#endif
} else {
return testNotImplemented;
}
NCCLCHECK(ncclGroupEnd());
return testSuccess;
}
@ -109,8 +121,8 @@ testResult_t GatherRunTest(struct threadArgs* args, int root, ncclDataType_t typ
}
struct testEngine gatherEngine = {
GatherGetBuffSize,
GatherRunTest
.getBuffSize = GatherGetBuffSize,
.runTest = GatherRunTest
};
#pragma weak ncclTestEngine=gatherEngine

View File

@ -45,25 +45,29 @@ void HyperCubeGetBw(size_t count, int typesize, double sec, double* algBw, doubl
*busBw = baseBw * factor;
}
testResult_t HyperCubeRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
char* sbuff = (char*)sendbuff;
char* rbuff = (char*)recvbuff;
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankSize = count * wordSize(type);
testResult_t HyperCubeRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
char* sbuff = ((char*)sendbuff) + sendoffset;
char* rbuff = ((char*)recvbuff) + recvoffset;
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankSize = count * wordSize(type);
if (rbuff+rank*rankSize != sbuff) CUDACHECK(cudaMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, cudaMemcpyDeviceToDevice, stream));
if (rbuff+rank*rankSize != sbuff) CUDACHECK(cudaMemcpyAsync(rbuff+rank*rankSize, sbuff, rankSize, cudaMemcpyDeviceToDevice, stream));
// Hypercube AllGather
for (int mask=1; mask<nRanks; mask<<=1) {
NCCLCHECK(ncclGroupStart());
int s = rank & ~(mask-1);
int r = s ^ mask;
NCCLCHECK(ncclSend(rbuff+s*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclRecv(rbuff+r*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclGroupEnd());
// Hypercube AllGather
for (int mask=1; mask<nRanks; mask<<=1) {
NCCLCHECK(ncclGroupStart());
int s = rank & ~(mask-1);
int r = s ^ mask;
NCCLCHECK(ncclSend(rbuff+s*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclRecv(rbuff+r*rankSize, count*mask, type, rank^mask, comm, stream));
NCCLCHECK(ncclGroupEnd());
}
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -111,8 +115,8 @@ testResult_t HyperCubeRunTest(struct threadArgs* args, int root, ncclDataType_t
}
struct testEngine hyperCubeEngine = {
HyperCubeGetBuffSize,
HyperCubeRunTest
.getBuffSize = HyperCubeGetBuffSize,
.runTest = HyperCubeRunTest
};
#pragma weak ncclTestEngine=hyperCubeEngine

105
src/multimem_ops.h Normal file
View File

@ -0,0 +1,105 @@
/*************************************************************************
* Copyright (c) 2016-2025, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef _MULTIMEM_OPS_H_
#define _MULTIMEM_OPS_H_
#include <cuda_runtime.h>
#include <cassert>
// Multimem operations. Since Multimem is currently only available in PTX here are C++ wrappers around it.
// First template argument is data type, second template type is vectorized data type.
// In the future, the second template type also dictates reduction accuracy
template<typename ptrT, typename valT>
__device__ __forceinline__ valT multimemLoadSum(const ptrT* addr) {
assert(false);
// static_assert(std::is_same<ptrT, void>::value, "multimemLoadSum can only be instantiated with implemented types");
// static_assert(std::is_same<valT, void>::value, "multimemLoadSum can only be instantiated with implemented types");
return valT{0};
}
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ double multimemLoadSum<double, double>(const double* addr) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
double result;
asm volatile("multimem.ld_reduce.global.add.f64 %0, [%1];" : "=d"(result) : "l"(multimem_addr) : "memory");
return result;
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ float multimemLoadSum<float, float>(const float* addr) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
float result;
asm volatile("multimem.ld_reduce.global.add.f32 %0, [%1];" : "=f"(result) : "l"(multimem_addr) : "memory");
return result;
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ float2 multimemLoadSum<float, float2>(const float* addr) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
float2 result;
asm volatile("multimem.ld_reduce.global.add.v2.f32 {%0, %1}, [%2];" : "=f"(result.x), "=f"(result.y) : "l"(multimem_addr) : "memory");
return result;
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ float4 multimemLoadSum<float, float4>(const float* addr) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
float4 result;
asm volatile("multimem.ld_reduce.global.add.v4.f32 {%0, %1, %2, %3}, [%4];" : "=f"(result.x), "=f"(result.y), "=f"(result.z), "=f"(result.w) : "l"(multimem_addr) : "memory");
return result;
}
#endif
template<typename ptrT, typename valT>
__device__ __forceinline__ void multimemStore(ptrT* addr, const valT val) {
assert(false);
// static_assert(std::is_same<ptrT, void>::value, "multimemStore can only be instantiated with implemented types");
// static_assert(std::is_same<valT, void>::value, "multimemStore can only be instantiated with implemented types");
}
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ void multimemStore<double, double>(double* addr, const double val) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
asm volatile("multimem.st.global.f64 [%0], %1;" : : "l"(multimem_addr), "d"(val) : "memory");
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ void multimemStore<float, float>(float* addr, const float val) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
asm volatile("multimem.st.global.f32 [%0], %1;" : : "l"(multimem_addr), "f"(val) : "memory");
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ void multimemStore<float, float2>(float* addr, const float2 val) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
asm volatile("multimem.st.global.v2.f32 [%0], {%1, %2};" : : "l"(multimem_addr), "f"(val.x), "f"(val.y) : "memory");
}
#endif
#if __CUDA_ARCH__ >= 900 // Hopper and later
template<>
__device__ __forceinline__ void multimemStore<float, float4>(float* addr, const float4 val) {
const uintptr_t multimem_addr = reinterpret_cast<uintptr_t>(addr);
asm volatile("multimem.st.global.v4.f32 [%0], {%1, %2, %3, %4};" : : "l"(multimem_addr), "f"(val.x), "f"(val.y), "f"(val.z), "f"(val.w) : "memory");
}
#endif
#endif // _MULTIMEM_OPS_H_

View File

@ -39,8 +39,14 @@ void ReduceGetBw(size_t count, int typesize, double sec, double* algBw, double*
*busBw = baseBw;
}
testResult_t ReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
NCCLCHECK(ncclReduce(sendbuff, recvbuff, count, type, op, root, comm, stream));
testResult_t ReduceRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
NCCLCHECK(ncclReduce(sptr, rptr, count, type, op, root, comm, stream));
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -103,8 +109,8 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ
}
struct testEngine reduceEngine = {
ReduceGetBuffSize,
ReduceRunTest
.getBuffSize = ReduceGetBuffSize,
.runTest = ReduceRunTest
};
#pragma weak ncclTestEngine=reduceEngine

View File

@ -42,8 +42,14 @@ void ReduceScatterGetBw(size_t count, int typesize, double sec, double* algBw, d
*busBw = baseBw * factor;
}
testResult_t ReduceScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, count, type, op, comm, stream));
testResult_t ReduceScatterRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
NCCLCHECK(ncclReduceScatter(sptr, rptr, count, type, op, comm, stream));
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -96,8 +102,8 @@ testResult_t ReduceScatterRunTest(struct threadArgs* args, int root, ncclDataTyp
}
struct testEngine reduceScatterEngine = {
ReduceScatterGetBuffSize,
ReduceScatterRunTest
.getBuffSize = ReduceScatterGetBuffSize,
.runTest = ReduceScatterRunTest
};
#pragma weak ncclTestEngine=reduceScatterEngine

View File

@ -39,23 +39,35 @@ void ScatterGetBw(size_t count, int typesize, double sec, double* algBw, double*
*busBw = baseBw * factor;
}
testResult_t ScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankOffset = count * wordSize(type);
if (count == 0) return testSuccess;
testResult_t ScatterRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
size_t rankOffset = count * wordSize(type);
if (count == 0) return testSuccess;
NCCLCHECK(ncclGroupStart());
if (rank == root) {
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, count, type, r, comm, stream));
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0)
NCCLCHECK(ncclScatter(sptr, rptr, count, type, root, comm, stream));
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0)
NCCLCHECK(ncclGroupStart());
if (rank == root) {
for (int r=0; r<nRanks; r++) {
NCCLCHECK(ncclSend(sptr + r * rankOffset, count, type, r, comm, stream));
}
}
NCCLCHECK(ncclRecv(rptr, count, type, root, comm, stream));
NCCLCHECK(ncclGroupEnd());
#else
printf("NCCL 2.7 or later is needed for scatter. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#endif
} else {
return testNotImplemented;
}
NCCLCHECK(ncclRecv(recvbuff, count, type, root, comm, stream));
NCCLCHECK(ncclGroupEnd());
return testSuccess;
}
@ -105,8 +117,8 @@ testResult_t ScatterRunTest(struct threadArgs* args, int root, ncclDataType_t ty
}
struct testEngine scatterEngine = {
ScatterGetBuffSize,
ScatterRunTest
.getBuffSize = ScatterGetBuffSize,
.runTest = ScatterRunTest
};
#pragma weak ncclTestEngine=scatterEngine

View File

@ -43,18 +43,24 @@ void SendRecvGetBw(size_t count, int typesize, double sec, double* algBw, double
*busBw = baseBw * factor;
}
testResult_t SendRecvRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
int recvPeer = (rank-1+nRanks) % nRanks;
int sendPeer = (rank+1) % nRanks;
testResult_t SendRecvRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) {
if (deviceImpl == 0) {
int nRanks;
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
int recvPeer = (rank-1+nRanks) % nRanks;
int sendPeer = (rank+1) % nRanks;
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sendbuff, count, type, sendPeer, comm, stream));
NCCLCHECK(ncclRecv(recvbuff, count, type, recvPeer, comm, stream));
NCCLCHECK(ncclGroupEnd());
char* sptr = (char*)sendbuff + sendoffset;
char* rptr = (char*)recvbuff + recvoffset;
NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sptr, count, type, sendPeer, comm, stream));
NCCLCHECK(ncclRecv(rptr, count, type, recvPeer, comm, stream));
NCCLCHECK(ncclGroupEnd());
} else {
return testNotImplemented;
}
return testSuccess;
}
@ -107,8 +113,8 @@ testResult_t SendRecvRunTest(struct threadArgs* args, int root, ncclDataType_t t
}
struct testEngine sendRecvEngine = {
SendRecvGetBuffSize,
SendRecvRunTest
.getBuffSize = SendRecvGetBuffSize,
.runTest = SendRecvRunTest
};
#pragma weak ncclTestEngine=sendRecvEngine

725
src/util.cu Normal file
View File

@ -0,0 +1,725 @@
/*************************************************************************
* Copyright (c) 2016-2025, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
// This contains an utlities to handle output both to stdout and to
// json files.
//
// An ad-hoc, libc-based approach to writing json has been adopted to
// keep things simple and to avoid injecting a dependency on the
// library for an external JSON utility.
//
// However, this means that the code is a brittle to changes and care
// should be taken when adding/removing things. We also essentially
// give up when passed non-ASCII strings and non-printable characters
// except some of the usual ones.
#include "nccl.h"
#include "util.h"
#include <assert.h>
#include <errno.h>
#include <string>
#include <iomanip>
#define PRINT if (is_main_thread) printf
extern int nThreads;
extern int nGpus;
extern size_t minBytes;
extern size_t maxBytes;
extern size_t stepBytes;
extern size_t stepFactor;
extern int datacheck;
extern int warmup_iters;
extern int iters;
extern int agg_iters;
extern int parallel_init;
extern int blocking_coll;
extern int cudaGraphLaunches;
static FILE *json_report_fp;
static thread_local bool write_json;
#define JSON_FILE_VERSION 1
#define TIME_STRING_FORMAT "%Y-%m-%d %H:%M:%S"
typedef enum {
JSON_NONE, // A pseudo-state meaning that the document is empty
JSON_KEY,
JSON_OBJECT_EMPTY,
JSON_OBJECT_SOME,
JSON_LIST_EMPTY,
JSON_LIST_SOME,
} json_state_t;
// We use these statics to maintain a stack of states where we are writing.
// the init_json_output function gets this set up, and it's the finalize_json_output function's job to clean this up.
json_state_t *states = nullptr;
size_t state_cap = 0; // Allocated stack capacity
size_t state_n = 0; // # of items in the stack.
// This tries to sanitize/quote a string from 'in' into 'out',
// assuming 'out' has length 'lim'. We mainly quote ",/,\,\t,\n, and
// bail if we encounter non-printable stuff or non-ASCII stuff.
// 'in' should be null-terminated, of course.
//
// We return false if we were not able to copy all of 'in', either for
// length reasons or for unhandled characters.
static bool sanitizeJson(char out[], int lim, const char *in) {
int c = 0;
while(*in) {
if(c+1 >= lim) {
out[c] = 0;
return false;
}
switch(*in) {
case '"':
case '\\':
case '/':
case '\t':
case '\n':
if(c + 2 > lim) {
out[c] = 0;
return false;
}
out[c++] = '\\';
if(*in == '\n') {
out[c++] = 'n';
}
else if( *in == '\t') {
out[c++] = 't';
}
else {
out[c++] = *in;
}
break;
default:
if (*in >= 0x7F || *in <= 0x1F) {
out[c] = 0;
return false;
}
out[c++] = *in;
break;
}
++in;
}
out[c] = 0;
return true;
}
// Push state onto the state stack. Reallocate for extra storage if needed.
// Because JSON_NONE is a pseudo-state, don't allow it to be pushed.
static void jsonPushState(json_state_t state) {
assert(state != JSON_NONE);
if(state_cap <= (state_n+1)) {
state_cap = max((size_t)16, state_cap*2);
states = (json_state_t *)realloc(states, sizeof(json_state_t)*state_cap);
assert(states);
}
states[state_n++] = state;
}
// Return the current state at the top of the stack
static json_state_t jsonCurrState() {
if(state_n == 0) {
return JSON_NONE;
}
return states[state_n-1];
}
// Replace the stack with state (equivalent to a pop & push if stack is not empty)
static void jsonReplaceState(json_state_t state) {
assert(state != JSON_NONE);
assert(state_n != 0);
states[state_n-1] = state;
}
// Pop the top state off the stack, or return that the state is empty
static json_state_t jsonPopState() {
if(state_n == 0) {
return JSON_NONE;
}
return states[--state_n];
}
// Emit a key and separator. Santize the key.
// This is only acceptable if the top state is an object
// Emit a ',' separator of we aren't the first item.
static void jsonKey(const char *name) {
switch(jsonCurrState()) {
case JSON_OBJECT_EMPTY:
jsonReplaceState(JSON_OBJECT_SOME);
break;
case JSON_OBJECT_SOME:
fprintf(json_report_fp, ",");
break;
default:
assert(0);
break;
}
char tmp[2048];
sanitizeJson(tmp, sizeof(tmp), name);
fprintf(json_report_fp, "\"%s\":", tmp);
jsonPushState(JSON_KEY);
}
// Helper function for inserting values.
// Only acceptable after keys, top-level, or in lists.
// Emit preceeding ',' if in a list and not first item.
static void jsonValHelper() {
switch(jsonCurrState()) {
case JSON_LIST_EMPTY:
jsonReplaceState(JSON_LIST_SOME);
break;
case JSON_LIST_SOME:
fprintf(json_report_fp, ",");
break;
case JSON_KEY:
jsonPopState();
break;
case JSON_NONE:
break;
default:
assert(0);
}
}
// Start an object
static void jsonStartObject() {
jsonValHelper();
fprintf(json_report_fp, "{");
jsonPushState(JSON_OBJECT_EMPTY);
}
// Close an object
static void jsonFinishObject() {
switch(jsonPopState()) {
case JSON_OBJECT_EMPTY:
case JSON_OBJECT_SOME:
break;
default:
assert(0);
}
fprintf(json_report_fp, "}");
}
// Start a list
static void jsonStartList() {
jsonValHelper();
fprintf(json_report_fp, "[");
jsonPushState(JSON_LIST_EMPTY);
}
// Close a list
static void jsonFinishList() {
switch(jsonPopState()) {
case JSON_LIST_EMPTY:
case JSON_LIST_SOME:
break;
default:
assert(0);
}
fprintf(json_report_fp, "]");
}
// Write a null value
static void jsonNull() {
jsonValHelper();
fprintf(json_report_fp, "null");
}
// Write a (sanititzed) string
static void jsonStr(const char *str) {
if(str == nullptr) {
jsonNull();
return;
}
jsonValHelper();
char tmp[2048];
sanitizeJson(tmp, sizeof(tmp), str);
fprintf(json_report_fp, "\"%s\"", tmp);
}
// Write a bool as "true" or "false" strings.
static void jsonBool(bool val) {
jsonStr(val ? "true" : "false");
}
// Write an integer value
static void jsonInt(const int val) {
jsonValHelper();
fprintf(json_report_fp, "%d", val);
}
// Write a size_t value
static void jsonSize_t(const size_t val) {
jsonValHelper();
fprintf(json_report_fp, "%zu", val);
}
// Write a double value
static void jsonDouble(const double val) {
jsonValHelper();
if(val != val) {
fprintf(json_report_fp, "\"nan\"");
}
else {
fprintf(json_report_fp, "%lf", val);
}
}
// Fill buff with a formatted time string corresponding to 'now.
// Write len or fewer bytes.
void formatNow(char *buff, int len) {
time_t now;
time(&now);
struct tm *timeinfo = localtime(&now);
strftime(buff, len, TIME_STRING_FORMAT, timeinfo);
}
// We provide some status line to stdout.
// The JSON stream is left with a trailing comma and the top-level
// object open for the next set of top-level items (config and
// results).
// This uses unguarded 'printf' rather than the PRINT() macro because
// is_main_thread is not set up at this point.
void jsonOutputInit(const char *in_path,
int argc, char **argv,
char **envp) {
if(in_path == nullptr) {
return;
}
#ifdef MPI_SUPPORT
int proc;
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
if(proc != 0) {
return;
}
#endif
char *try_path = strdup(in_path);
int try_count = 0;
json_report_fp = fopen(try_path, "wx");
while(json_report_fp == NULL) {
if(errno != EEXIST) {
printf("# skipping json output; %s not accessible\n", try_path);
free(try_path);
return;
}
free(try_path);
if(asprintf(&try_path, "%s.%d", in_path, try_count++) == -1) {
printf("# skipping json output; failed to probe destination\n");
return;
}
json_report_fp = fopen(try_path, "wx");
}
printf("# Writing JSON output to %s\n", try_path);
free(try_path);
write_json = true;
jsonStartObject(); // will be closed finalize_json_output
jsonKey("version"); jsonInt(JSON_FILE_VERSION);
jsonKey("start_time");
{
char timebuffer[128];
formatNow(timebuffer, sizeof(timebuffer));
jsonStr(timebuffer);
}
jsonKey("args");
jsonStartList();
for(int i = 0; i < argc; i++) {
jsonStr(argv[i]);
}
jsonFinishList();
jsonKey("env");
jsonStartList();
for(char **e = envp; *e; e++) {
jsonStr(*e);
}
jsonFinishList();
jsonKey("nccl_version"); jsonInt(test_ncclVersion);
}
void jsonIdentifyWriter(bool is_writer) {
write_json &= is_writer;
}
// This cleans up the json output, finishing the object and closing the file.
// If we were not writing json output, we don't do anything.
void jsonOutputFinalize() {
if(write_json) {
jsonKey("end_time");
char timebuffer[128];
formatNow(timebuffer, sizeof(timebuffer));
jsonStr(timebuffer);
jsonFinishObject();
assert(jsonCurrState() == JSON_NONE);
free(states);
states = nullptr;
state_n = 0;
state_cap = 0;
fclose(json_report_fp);
json_report_fp = nullptr;
}
}
struct rankInfo_t {
int rank;
int group;
int pid;
char hostname[1024];
int device;
char device_hex[128];
char devinfo[1024];
};
// Helper function to parse the device info lines passed via MPI to the root rank.
// This fills 'rank' with the parsed contents of 'instring'.
static int parseRankInfo(rankInfo_t *rank, const char *instring) {
int end;
sscanf(instring,
"# Rank %d Group %d Pid %d on %1024s device %d [%128[^]]] %1024[^\n]\n%n",
&rank->rank,
&rank->group,
&rank->pid,
rank->hostname,
&rank->device,
rank->device_hex,
rank->devinfo,
&end);
return end;
}
static void jsonRankInfo(const rankInfo_t *ri) {
jsonStartObject();
jsonKey("rank"); jsonInt(ri->rank);
jsonKey("group"); jsonInt(ri->group);
jsonKey("pid"); jsonInt(ri->pid);
jsonKey("hostname"); jsonStr(ri->hostname);
jsonKey("device"); jsonInt(ri->device);
jsonKey("device_hex"); jsonStr(ri->device_hex);
jsonKey("device_info"); jsonStr(ri->devinfo);
jsonFinishObject();
}
// Write the start of a benchmark output line containing the bytes &
// op type, both to stdout and to json if we are writing there.
void writeBenchmarkLinePreamble(size_t nBytes, size_t nElem, const char typeName[], const char opName[], int root) {
char rootName[100];
sprintf(rootName, "%6i", root);
PRINT("%12li %12li %8s %6s %6s", nBytes, nElem, typeName, opName, rootName);
if(write_json) {
jsonStartObject();
jsonKey("size"); jsonSize_t(nBytes);
jsonKey("count"); jsonSize_t(nElem);
jsonKey("type"); jsonStr(typeName);
jsonKey("redop"); jsonStr(opName);
jsonKey("root"); jsonStr(rootName);
}
}
// Finish a result record we were writing to stdout/json
void writeBenchmarkLineTerminator(int actualIters, const char *name) {
PRINT("\n");
if(write_json) {
jsonKey("actual_iterations"); jsonInt(actualIters);
jsonKey("experiment_name"); jsonStr(name);
jsonFinishObject();
}
}
// Handle a cases where we don't write out of place results
void writeBenchMarkLineNullBody() {
PRINT(" "); // only do in-place for trace replay
if(write_json) {
jsonKey("out_of_place"); jsonNull();
}
}
void getFloatStr(double value, int width, char* str) {
int power = 0;
for (uint64_t val = 1; value >= val; val *= 10) power++;
if (power < width-2) sprintf(str, "%*.2f", width, value);
else if (power < width-1) sprintf(str, "%*.1f", width, value);
else if (power < width+1) sprintf(str, "%*.0f", width, value);
else if (width >= 7) sprintf(str, "%*.1e", width, value);
else if (width >= 8) sprintf(str, "%*.2e", width, value);
else sprintf(str, "%*.0e", width, value);
}
// Write the performance-related payload to stdout/json.
// We call this function twice at the top level per test: once for out-of-place, and once for in-place.
// The Json output assumes out-of-place happens first.
void writeBenchmarkLineBody(double timeUsec, double algBw, double busBw, bool reportErrors, int64_t wrongElts, bool report_cputime, bool report_timestamps, bool out_of_place) {
char timeStr[8];
getFloatStr(timeUsec, 7, timeStr);
char algBwStr[7];
getFloatStr(algBw, 6, algBwStr);
char busBwStr[7];
getFloatStr(busBw, 6, busBwStr);
if (reportErrors) {
PRINT(" %7s %6s %6s %6g", timeStr, algBwStr, busBwStr, (double)wrongElts);
} else {
PRINT(" %7s %6s %6s N/A", timeStr, algBwStr, busBwStr);
}
if (!out_of_place && report_timestamps) {
char timebuffer[128];
formatNow(timebuffer, sizeof(timebuffer));
PRINT("%21s", timebuffer);
}
if(write_json) {
jsonKey(out_of_place ? "out_of_place" : "in_place");
jsonStartObject();
jsonKey(report_cputime ? "cpu_time" : "time"); jsonDouble(timeUsec);
jsonKey("alg_bw"); jsonDouble(algBw);
jsonKey("bus_bw"); jsonDouble(busBw);
jsonKey("nwrong"); (reportErrors ? jsonDouble((double)wrongElts) : jsonNull());
jsonFinishObject();
}
}
// This writes out a report about the run parameters and devices
// involved to stdout and json. For MPI, this will use a collective
// to gather from each rank to the root.
// Root then consumes this output, printing raw lines for stdout and
// parsing them for JSON for proper formatting.
// Perhaps actually sending records around instead of formatted
// strings would be smarter/easier, but I chose to adapt what was
// already in place.
testResult_t writeDeviceReport(size_t *maxMem, int localRank, int proc, int totalProcs, int color, const char hostname[], const char *program_name) {
PRINT("# nccl-tests version %s nccl-headers=%d nccl-library=%d\n", NCCL_TESTS_VERSION, NCCL_VERSION_CODE, test_ncclVersion);
PRINT("# Collective test starting: %s\n", program_name);
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
PRINT("#\n");
if(write_json) {
jsonKey("config");
jsonStartObject();
jsonKey("nthreads"); jsonInt(nThreads);
jsonKey("ngpus"); jsonInt(nGpus);
jsonKey("minimum_bytes"); jsonSize_t(minBytes);
jsonKey("maximum_bytes"); jsonSize_t(maxBytes);
if(stepFactor > 1) {
jsonKey("step_factor"); jsonInt(stepFactor);
}
else {
jsonKey("step_bytes"); jsonSize_t(stepBytes);
}
jsonKey("warmup_iters"); jsonInt(warmup_iters);
jsonKey("iterations"); jsonInt(iters);
jsonKey("aggregated_iterations"); jsonInt(agg_iters);
jsonKey("validation"); jsonInt(datacheck);
jsonKey("graph"); jsonInt(cudaGraphLaunches);
jsonKey("blocking_collectives"); jsonBool(blocking_coll);
jsonKey("parallel_init"); jsonBool(parallel_init);
}
PRINT("# Using devices\n");
#define MAX_LINE 2048
char line[MAX_LINE];
int len = 0;
const char* envstr = getenv("NCCL_TESTS_DEVICE");
const int gpu0 = envstr ? atoi(envstr) : -1;
int available_devices;
CUDACHECK(cudaGetDeviceCount(&available_devices));
for (int i=0; i<nThreads*nGpus; i++) {
const int cudaDev = (gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + i;
const int rank = proc*nThreads*nGpus+i;
cudaDeviceProp prop;
if (cudaDev >= available_devices) {
fprintf(stderr, "Invalid number of GPUs: %d requested but only %d were found.\n",
(gpu0 != -1 ? gpu0 : localRank*nThreads*nGpus) + nThreads*nGpus, available_devices);
fprintf(stderr, "Please check the number of processes and GPUs per process.\n");
return testNotImplemented;
}
CUDACHECK(cudaGetDeviceProperties(&prop, cudaDev));
if (len < MAX_LINE) {
len += snprintf(line+len, MAX_LINE-len, "# Rank %2d Group %2d Pid %6d on %10s device %2d [%04x:%02x:%02x] %s\n",
rank, color, getpid(), hostname, cudaDev, prop.pciDomainID, prop.pciBusID, prop.pciDeviceID, prop.name);
}
*maxMem = std::min(*maxMem, prop.totalGlobalMem);
}
if (len >= MAX_LINE) {
strcpy(line+MAX_LINE-5, "...\n");
}
#if MPI_SUPPORT
char *lines = (proc == 0) ? (char *)malloc(totalProcs*MAX_LINE) : NULL;
// Gather all output in rank order to root (0)
MPI_Gather(line, MAX_LINE, MPI_BYTE, lines, MAX_LINE, MPI_BYTE, 0, MPI_COMM_WORLD);
if (proc == 0) {
if(write_json) {
jsonKey("devices");
jsonStartList();
}
for (int p = 0; p < totalProcs; p++) {
PRINT("%s", lines+MAX_LINE*p);
if(write_json) {
rankInfo_t rankinfo;
parseRankInfo(&rankinfo, lines + MAX_LINE*p);
jsonRankInfo(&rankinfo);
}
}
if(write_json) {
jsonFinishList();
}
free(lines);
}
MPI_Allreduce(MPI_IN_PLACE, maxMem, 1, MPI_LONG, MPI_MIN, MPI_COMM_WORLD);
#else
PRINT("%s", line);
if(write_json) {
rankInfo_t rankinfo;
parseRankInfo(&rankinfo, line);
jsonKey("devices");
jsonStartList();
jsonRankInfo(&rankinfo);
jsonFinishList();
}
#endif
if(write_json) {
jsonFinishObject();
}
return testSuccess;
}
// Write a result header to stdout/json.
// Json results object and contained table list are left open
void writeResultHeader(bool report_cputime, bool report_timestamps) {
const char* tsLbl = report_timestamps ? "timestamp" : "";
const int tsPad = report_timestamps ? 19 : 0;
const char* tsFmt = report_timestamps ? TIME_STRING_FORMAT : "";
const char* timeStr = report_cputime ? "cputime" : "time";
PRINT("#\n");
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s %*s\n", "size", "count", "type", "redop", "root",
timeStr, "algbw", "busbw", "#wrong", timeStr, "algbw", "busbw", "#wrong", tsPad, tsLbl);
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s %*s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "", tsPad, tsFmt);
if(write_json) {
jsonKey("results"); jsonStartList();
}
}
// Write the footer for results to stdout/json.
// We close the table list and write out the summary items.
// Results object is left open for errors.
void writeResultFooter(const int errors[], const double bw[], double check_avg_bw, const char *program_name) {
if(write_json) {
jsonFinishList();
}
PRINT("# %-20s : %d %s\n", "Out of bounds values", errors[0], errors[0] ? "FAILED" : "OK");
PRINT("# %-20s : %g %s\n", "Avg bus bandwidth", bw[0], check_avg_bw == -1 ? "" : (bw[0] < check_avg_bw*(0.9) ? "FAILED" : "OK"));
PRINT("#\n");
PRINT("# Collective test concluded: %s\n", program_name);
if(write_json) {
jsonKey("out_of_bounds");
jsonStartObject();
jsonKey("count"); jsonInt(errors[0]);
jsonKey("okay"); jsonBool(errors[0] == 0);
jsonFinishObject();
jsonKey("average_bus_bandwidith");
jsonStartObject();
jsonKey("bandwidith"); jsonDouble(bw[0]);
jsonKey("okay"); check_avg_bw == -1 ? jsonStr("unchecked") : jsonBool(bw[0] >= check_avg_bw*(0.9));
jsonFinishObject();
}
}
std::string getMemString(double amount) {
std::string postfix = " B";
if (abs(amount) >= 1024.0*1024.0*1024.0) {
postfix = " GB";
amount /= 1024.0 * 1024.0 * 1024.0;
} else if (abs(amount) >= 1024.0*1024.0) {
postfix = " MB";
amount /= 1024.0 * 1024.0;
} else if (abs(amount) >= 1024.0) {
postfix = " KB";
amount /= 1024.0;
}
int precision = 0;
if (abs(amount) < 10.0) {
precision = 2;
} else if (abs(amount) < 100.0) {
precision = 1;
}
std::stringstream ss;
ss << std::fixed << std::setprecision(precision) << amount << postfix;
return ss.str();
}
void writeMemInfo(memInfo_t* memInfos, int numMemInfos) {
std::stringstream ss;
uint64_t maxAmount = 0;
for (int i = 0; i < numMemInfos; i++) {
ss << memInfos[i].name << " "
<< getMemString(memInfos[i].amount)
<< " ";
if (i < numMemInfos - 1) {
ss << "| ";
}
maxAmount += memInfos[i].amount;
}
ss << "| Total " << getMemString(maxAmount);
PRINT("# %-20s : %s\n", "GPU memory usage", ss.str().c_str());
}
// Write out remaining errors to stdout/json.
void writeErrors() {
const char *error = ncclGetLastError(NULL);
if(error && strlen(error) > 0) {
PRINT("# error: %s\n", error);
} else {
PRINT("\n");
}
if(write_json) {
jsonKey("errors");
jsonStartList();
if(error) {
jsonStr(error);
}
jsonFinishList();
}
}
void finalizeFooter() {
PRINT("#\n");
}

44
src/util.h Normal file
View File

@ -0,0 +1,44 @@
/*************************************************************************
* Copyright (c) 2016-2025, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef __UTIL_H__
#define __UTIL_H__
#include "common.h"
struct memInfo_t {
int64_t amount;
const char* name;
};
// Try to set up JSON file output. If MPI is used, only rank 0 will proceed.
// This should be called by only a single thread.
// If 'in_path' is NULL, we stop.
// Otherwise, we borrow 'in_path' and try to open it as a new file.
// If it already exists, we probe for new files by appending integers
// until we succeed.
// Then we write argv and envp to the json output, santizing them. We also
// write the nccl version.
// The top-level object remains open for the rest of the output.
void jsonOutputInit(const char *path, int argc, char **argv, char **envp);
// Should be called to identify main thread after threads are started to ensure we don't duplicate output
void jsonIdentifyWriter(bool is_writer);
// Write end time and close top-level object. Reset json state and close output file.
void jsonOutputFinalize();
void writeBenchmarkLinePreamble(size_t nBytes, size_t nElem, const char typeName[], const char opName[], int root);
void writeBenchmarkLineTerminator(int actualIters, const char *name);
void writeBenchMarkLineNullBody();
void writeBenchmarkLineBody(double timeUsec, double algBw, double busBw, bool reportErrors, int64_t wrongElts, bool report_cputime, bool report_timestamps, bool out_of_place);
testResult_t writeDeviceReport(size_t *maxMem, int localRank, int proc, int totalProcs, int color, const char hostname[], const char *program_name);
void writeResultHeader(bool report_cputime, bool report_timestamps);
void writeResultFooter(const int errors[], const double bw[], double check_avg_bw, const char *program_name);
void finalizeFooter();
void writeMemInfo(memInfo_t* memInfos, int numMemInfos);
void writeErrors();
#endif

89
src/vector_types.h Normal file
View File

@ -0,0 +1,89 @@
/*************************************************************************
* Copyright (c) 2016-2025, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef _VECTOR_TYPES_H_
#define _VECTOR_TYPES_H_
#include <cuda_runtime.h>
// Helper functions to use vectorized types
// This maps at compile time each data type to its best available vectorized type.
// As close to 128 bits as possible
template <typename T>
struct VectorTypeMapping{
using Type=T; // Default no vectorization
};
template <>
struct VectorTypeMapping<float>{
using Type=float4;
};
template <>
struct VectorTypeMapping<double>{
using Type=double2;
};
template <>
struct VectorTypeMapping<int8_t>{
using Type=char4; // Largest built-in CUDA type for char (32-bit)
};
template <>
struct VectorTypeMapping<uint8_t>{
using Type=uchar4; // Largest built-in CUDA type for uchar (32-bit)
};
template <>
struct VectorTypeMapping<int32_t>{
using Type=int4;
};
template <>
struct VectorTypeMapping<uint32_t>{
using Type=uint4;
};
// Vector addition helper functions
// They enable clean math with vector types.
template <typename T>
__device__ __forceinline__ T vectorAdd(T a, T b) {
return a + b;
}
template <>
__device__ __forceinline__ float4 vectorAdd(float4 a, float4 b) {
return make_float4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w);
}
template <>
__device__ __forceinline__ double2 vectorAdd(double2 a, double2 b) {
return make_double2(a.x + b.x, a.y + b.y);
}
template <>
__device__ __forceinline__ char4 vectorAdd(char4 a, char4 b) {
return make_char4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w);
}
template <>
__device__ __forceinline__ uchar4 vectorAdd(uchar4 a, uchar4 b) {
return make_uchar4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w);
}
template <>
__device__ __forceinline__ int4 vectorAdd(int4 a, int4 b) {
return make_int4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w);
}
template <>
__device__ __forceinline__ uint4 vectorAdd(uint4 a, uint4 b) {
return make_uint4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w);
}
#endif // _VECTOR_TYPES_H_