From e12dbb0a14f915c25d0bdaec631d98587b156d97 Mon Sep 17 00:00:00 2001 From: David Addison Date: Thu, 4 Sep 2025 17:23:22 -0700 Subject: [PATCH] Update to align with the NCCL 2.28 release Added Device API infrastructure and example kernels Two new command line arguments: -D device kernel implementation to use <0/1/2/3/4> -V number of CTAs to launch device kernels with Added new CTA Policy command line option: -x set the CTA Policy <0/1/2> --- src/all_gather.cu | 10 +- src/all_reduce.cu | 434 +++++++++++++++++++++++++++++++++++++++++- src/alltoall.cu | 160 ++++++++++++++-- src/broadcast.cu | 25 ++- src/common.cu | 269 +++++++++++++++++++++----- src/common.h | 66 ++++++- src/common.mk | 11 +- src/gather.cu | 40 ++-- src/hypercube.cu | 38 ++-- src/multimem_ops.h | 105 ++++++++++ src/reduce.cu | 10 +- src/reduce_scatter.cu | 10 +- src/scatter.cu | 40 ++-- src/sendrecv.cu | 28 +-- src/vector_types.h | 89 +++++++++ 15 files changed, 1196 insertions(+), 139 deletions(-) create mode 100644 src/multimem_ops.h create mode 100644 src/vector_types.h diff --git a/src/all_gather.cu b/src/all_gather.cu index 6db67e6..f9aa942 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -43,8 +43,14 @@ void AllGatherGetBw(size_t count, int typesize, double sec, double* algBw, doubl *busBw = baseBw * factor; } -testResult_t AllGatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - NCCLCHECK(ncclAllGather(sendbuff, recvbuff, count, type, comm, stream)); +testResult_t AllGatherRunColl(void* sendbuff, size_t sendoffset,void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + char* sptr = (char*)sendbuff + sendoffset; + char* rptr = (char*)recvbuff + recvoffset; + NCCLCHECK(ncclAllGather(sptr, rptr, count, type, comm, stream)); + } else { + return testNotImplemented; + } return testSuccess; } diff --git a/src/all_reduce.cu b/src/all_reduce.cu index 4aa1fee..ce0b519 100644 --- a/src/all_reduce.cu +++ b/src/all_reduce.cu @@ -4,8 +4,32 @@ * See LICENSE.txt for license information ************************************************************************/ +/* + * AllReduce Performance Test Implementation + * + * This file implements multiple AllReduce kernel variants optimized for different + * use cases within CUDA P2P connectivity. + * These kernels are designed to highlight the device API functionality. As well as how to optimize for best performance. + * + * IMPORTANT: All custom kernels require CUDA P2P connectivity since they require Load-Store Accessible (LSA) memory. + * + * Kernel Selection Strategy: + * - deviceImpl = 0: NCCL's built-in AllReduce implementation (fallback) + * - deviceImpl = 1: allReduceLsaKernel - Basic LSA implementation for demonstration and small message sizes. + * - deviceImpl = 2: allReduceLsaVectorizedKernel - Vectorized LSA for demonstration to achieve performance for large message sizes. + * - deviceImpl = 3: allReduceMultimemKernel - Multi-memory for hardware acceleration. Requires Multimem capable hardware but can offer better performance. + * - deviceImpl = 4: allReduceMultimemVectorizedKernel - Vectorized multi-memory for best performance. Requires Multimem capable hardware but can offer better performance. + */ + #include "cuda_runtime.h" #include "common.h" +#include +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +#include "nccl_device.h" +#include "vector_types.h" +#include "multimem_ops.h" +constexpr int WARP_SIZE = 32; +#endif void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, size_t eltSize, int nranks) { *sendcount = count; @@ -40,9 +64,413 @@ void AllReduceGetBw(size_t count, int typesize, double sec, double* algBw, doubl *busBw = baseBw * factor; } -testResult_t AllReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - NCCLCHECK(ncclAllReduce(sendbuff, recvbuff, count, type, op, comm, stream)); - return testSuccess; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +/* + * Kernel 1: allReduceLsaKernel - Basic LSA-based AllReduce + * + * Purpose: Provides a simple, deterministic AllReduce implementation for small to + * medium message sizes within CUDA P2P connectivity. + * + * Solution: Implements AllReduce using direct peer-to-peer memory access through + * LSA windows. Each rank reads from all other ranks, performs reduction, and + * writes the result back to all ranks using cooperative thread arrays. + * + * Key Optimizations: + * - LSA barriers for faster synchronization than global barriers + * - Global grid stride loop to distribute work across all ranks + * - Direct peer access within CUDA P2P connectivity for optimal bandwidth + * + * CUDA P2P Connectivity Requirement: CRITICAL - This kernel requires all participating + * ranks to be within the same CUDA P2P connectivity. + * + * Use Case: Small to medium messages (< 1MB) where simplicity and determinism + * are more important than maximum bandwidth. + */ +template +__global__ void allReduceLsaKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x }; + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + const int rank = devComm.rank, nRanks = devComm.nRanks; + + const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks); + const int globalNthreads = blockDim.x * gridDim.x * nRanks; + + for (size_t offset = globalTid; offset < count; offset += globalNthreads) { + T v = T{0}; + for (int peer=0; peer +__global__ void allReduceLsaVectorizedKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x }; + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + // Compile time vector type and vector size mapping + using TN = typename VectorTypeMapping::Type; + constexpr int VECTOR_FACTOR = sizeof(TN)/sizeof(T); + + constexpr int UNROLL_FACTOR = 128/sizeof(TN); // Same as before 128 Bytes per thread + + const int rank = devComm.rank, nRanks = devComm.nRanks; + + const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks); + const int globalNthreads = blockDim.x * gridDim.x * nRanks; + + // Since we use vector types, the non-vector allocated memory is not necessarily aligned. + const size_t alignment_offset = (sendoffset % sizeof(TN)) / sizeof(T); + const size_t aligned_count = count - alignment_offset; + const size_t vector_count = aligned_count / VECTOR_FACTOR; + const size_t remainder = aligned_count % VECTOR_FACTOR; + + // As before + const int elements_per_block = globalNthreads * UNROLL_FACTOR; + const int num_blocks = vector_count / elements_per_block; + + const int warp_id = globalTid / WARP_SIZE; + const int lane_id = globalTid % WARP_SIZE; + + const int warp_offset = warp_id * WARP_SIZE * UNROLL_FACTOR; + const int lane_offset = lane_id; + const int warp_lane_offset = warp_offset + lane_offset; + + // Handle misaligned elements first using scalar operations. Grid stride loop with scalar handling + if (alignment_offset > 0) { + for (size_t offset = globalTid; offset < alignment_offset; offset += globalNthreads) { + T v_scalar = T{0}; + + for (int peer=0; peer 0) { + const size_t remainder_start = alignment_offset + vector_count * VECTOR_FACTOR; + const int globalTid_remainder = globalTid; + const int globalNthreads_remainder = globalNthreads; + + for (size_t offset = globalTid_remainder; offset < remainder; offset += globalNthreads_remainder) { + T v_scalar = 0; + const size_t actual_offset = remainder_start + offset; + + for (int peer=0; peer +__global__ void allReduceMultimemKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamTagLsa(), blockIdx.x, true }; + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + const int rank = devComm.rank, nRanks = devComm.nRanks; + + const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks); + const int globalNthreads = blockDim.x * gridDim.x * nRanks; + + T* send_ptr = reinterpret_cast(ncclGetLsaMultimemPointer(sendwin, sendoffset, devComm)); + T* recv_ptr = reinterpret_cast(ncclGetLsaMultimemPointer(recvwin, recvoffset, devComm)); + for (size_t offset=globalTid; offset < count; offset += globalNthreads) { + if (offset < count) { + T v = multimemLoadSum(send_ptr + offset); + multimemStore(recv_ptr + offset, v); + } + } + bar.sync(ncclCoopCta(), cuda::memory_order_release); +} + +/* + * Kernel 4: allReduceMultimemVectorizedKernel - Vectorized Multi-memory AllReduce + * + * Purpose: Ultimate performance AllReduce implementation combining multi-memory + * hardware acceleration with vectorized operations and loop unrolling for maximum + * bandwidth utilization within CUDA P2P connectivity. + * + * Solution: Combines the hardware acceleration benefits of multi-memory operations + * with the bandwidth optimization techniques from vectorized kernels. This kernel + * represents the highest performance option for large, aligned data sets. + * + * Key Optimizations: + * - Multi-memory primitives for hardware-accelerated operations + * - Vectorized loads/stores for maximum memory bandwidth (128-bit operations) + * - Aggressive loop unrolling for improved instruction-level parallelism + * - Warp-coalesced memory access patterns for optimal memory controller utilization + * - Hardware-assisted memory synchronization and reduction + * - Graceful handling of misaligned data with scalar fallback + * + * CUDA P2P Connectivity Requirement: CRITICAL - Requires CUDA P2P connectivity and + * multi-memory support. This kernel leverages both P2P locality and hardware + * acceleration for optimal performance. + * + * Hardware Requirements: Hopper+ architecture with multi-memory support enabled. + * + * Performance Note: This kernel provides the best performance for large, aligned + * data sets but requires careful data alignment for optimal vectorization benefits. + */ +template +__global__ void allReduceMultimemVectorizedKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamTagLsa(), blockIdx.x, true }; + + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + using TN = typename VectorTypeMapping::Type; + constexpr int VECTOR_FACTOR = sizeof(TN)/sizeof(T); + + constexpr int UNROLL_FACTOR = 128/sizeof(TN); + + const int rank = devComm.rank, nRanks = devComm.nRanks; + + const int globalTid = threadIdx.x + blockDim.x * (rank + blockIdx.x * nRanks); + const int globalNthreads = blockDim.x * gridDim.x * nRanks; + + // Calculate alignment offset to handle misaligned elements first + const size_t alignment_offset = (sendoffset % sizeof(TN)) / sizeof(T); + const size_t aligned_count = count - alignment_offset; + const size_t vector_count = aligned_count / VECTOR_FACTOR; + const size_t remainder = aligned_count % VECTOR_FACTOR; + + const int elements_per_block = globalNthreads * UNROLL_FACTOR; + const int num_blocks = vector_count / elements_per_block; + + const int warp_id = globalTid / WARP_SIZE; + const int lane_id = globalTid % WARP_SIZE; + + const int warp_offset = warp_id * WARP_SIZE * UNROLL_FACTOR; + const int lane_offset = lane_id; + const int warp_lane_offset = warp_offset + lane_offset; + + // Multimem pointers that handle scalar access for misaligned and remainder elements + T* send_ptr = reinterpret_cast(ncclGetLsaMultimemPointer(sendwin, sendoffset, devComm)); + T* recv_ptr = reinterpret_cast(ncclGetLsaMultimemPointer(recvwin, recvoffset, devComm)); + + // Handle misaligned elements first using scalar operations + if (alignment_offset > 0) { + for (size_t offset = globalTid; offset < max(alignment_offset,count); offset += globalNthreads) { + T v_scalar = multimemLoadSum(send_ptr + offset); + multimemStore(recv_ptr+offset, v_scalar); + } + } + + // separate TN* for 2 reasons. a) alignment offset, b) pointer arithmetic with the vectorized type + TN* send_ptrN = reinterpret_cast(ncclGetLsaMultimemPointer(sendwin, sendoffset+alignment_offset*sizeof(T), devComm)); + TN* recv_ptrN = reinterpret_cast(ncclGetLsaMultimemPointer(recvwin, recvoffset+alignment_offset*sizeof(T), devComm)); + + // Handle vectorized memory that can be handled in whole chunks (no if) + for (int block = 0; block < num_blocks; block += 1) { + TN v[UNROLL_FACTOR] = {TN{0}}; + const size_t block_offset = block * globalNthreads * UNROLL_FACTOR; +#pragma unroll + for (int i=0; i < UNROLL_FACTOR; i++) { + const int stride_offset = i * WARP_SIZE; + const size_t offset = warp_lane_offset + block_offset + stride_offset; + v[i] = multimemLoadSum(reinterpret_cast(send_ptrN + offset)); + } + +#pragma unroll + for (int i=0; i < UNROLL_FACTOR; i++) { + const int stride_offset = i * WARP_SIZE; + const size_t offset = warp_lane_offset + block_offset + stride_offset; + multimemStore(reinterpret_cast(recv_ptrN+offset), v[i]); + } + } + + // Handle the last partial vectorized block, but with if conditions + const int block = num_blocks; + TN v[UNROLL_FACTOR] = {TN{0}}; + const size_t block_offset = block * globalNthreads * UNROLL_FACTOR; +#pragma unroll + for (int i=0; i < UNROLL_FACTOR; i++) { + const int stride_offset = i * WARP_SIZE; + const size_t offset = warp_lane_offset + block_offset + stride_offset; + if (offset < vector_count) { + v[i] = multimemLoadSum(reinterpret_cast(send_ptrN+offset)); + } + } +#pragma unroll + for (int i=0; i < UNROLL_FACTOR; i++) { + const int stride_offset = i * WARP_SIZE; + const size_t offset = warp_lane_offset + block_offset + stride_offset; + if (offset < vector_count) { + multimemStore(reinterpret_cast(recv_ptrN+offset), v[i]); + } + } + + // Handle remainder elements using scalar operations + if (remainder > 0) { + const size_t remainder_start = alignment_offset + vector_count * VECTOR_FACTOR; + const int globalTid_remainder = globalTid; + const int globalNthreads_remainder = globalNthreads; + + for (size_t offset = globalTid_remainder; offset < remainder; offset += globalNthreads_remainder) { + const size_t actual_offset = remainder_start + offset; + T v_scalar = multimemLoadSum(send_ptr+actual_offset); + multimemStore(recv_ptr+actual_offset, v_scalar); + } + } + + // Sync + bar.sync(ncclCoopCta(), cuda::memory_order_release); +} +#endif + +testResult_t AllReduceRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + + char* sptr = (char*)sendbuff + sendoffset; + char* rptr = (char*)recvbuff + recvoffset; + + switch (deviceImpl) { + case 0: + NCCLCHECK(ncclAllReduce(sptr, rptr, count, type, op, comm, stream)); + return testSuccess; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + case 1: + TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceLsaKernel, type, op), + sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream, 0)); + return testSuccess; + case 2: + TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceLsaVectorizedKernel, type, op), + sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream, 0)); + return testSuccess; + case 3: + TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceMultimemKernel, type, op), + sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream, 1)); + return testSuccess; + case 4: + TESTCHECK(testLaunchDeviceKernel(SPECIALIZE_KERNEL(allReduceMultimemVectorizedKernel, type, op), + sendbuff, sendoffset, recvbuff, recvoffset, count, type, op, root, comm, stream, 1)); + return testSuccess; +#endif + } + + return testNotImplemented; } struct testColl allReduceTest = { diff --git a/src/alltoall.cu b/src/alltoall.cu index dd085e5..3418174 100644 --- a/src/alltoall.cu +++ b/src/alltoall.cu @@ -6,6 +6,10 @@ #include "cuda_runtime.h" #include "common.h" +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +#include "nccl_device.h" +#include "vector_types.h" +#endif void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, size_t eltSize, int nranks) { *paramcount = (count/nranks) & -(16/eltSize); @@ -45,23 +49,151 @@ void AlltoAllGetBw(size_t count, int typesize, double sec, double* algBw, double *busBw = baseBw * factor; } -testResult_t AlltoAllRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - int nRanks; - NCCLCHECK(ncclCommCount(comm, &nRanks)); - size_t rankOffset = count * wordSize(type); +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +// shared scalar AlltoAll implementation used by both kernels +template +__device__ void AlltoAllScalarImpl(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int rank, int nRanks, int tid, int nthreads) { + T* sendPtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, rank); -#if NCCL_MAJOR < 2 || NCCL_MINOR < 7 - printf("NCCL 2.7 or later is needed for alltoall. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR); - return testNcclError; -#else - NCCLCHECK(ncclGroupStart()); - for (int r=0; r +__global__ void NvlAlltoAllKernel(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x }; + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + int rank = devComm.rank, nRanks = devComm.nRanks; + int tid = threadIdx.x + blockDim.x * blockIdx.x; + int nthreads = blockDim.x * gridDim.x; + + AlltoAllScalarImpl(sendwin, sendoffset, recvwin, recvoffset, count, rank, nRanks, tid, nthreads); + + bar.sync(ncclCoopCta(), cuda::memory_order_release); +} + +// Device implementation #2 - optimized NVL kernel using vectorization and unrolling +template +__global__ void NvlAlltoAllKernelOptimized(ncclWindow_t sendwin, size_t sendoffset, ncclWindow_t recvwin, size_t recvoffset, size_t count, int root, struct ncclDevComm devComm) { + ncclLsaBarrierSession bar { ncclCoopCta(), devComm, ncclTeamLsa(devComm), devComm.lsaBarrier, blockIdx.x }; + bar.sync(ncclCoopCta(), cuda::memory_order_relaxed); + + using TN = typename VectorTypeMapping::Type; + constexpr int VECTOR_FACTOR = sizeof(TN) / sizeof(T); + constexpr int UNROLL_FACTOR = 128/sizeof(TN); + constexpr int PEER_UNROLL = 2; + + int rank = devComm.rank, nRanks = devComm.nRanks; + int tid = threadIdx.x + blockDim.x * blockIdx.x; + int nthreads = blockDim.x * gridDim.x; + + T* sendPtr = (T*)ncclGetLsaPointer(sendwin, sendoffset, rank); + + // alignment check: can we use vectorized operations? + bool canVectorize = (sizeof(TN) > sizeof(T)) && // Only if vectorization helps + (reinterpret_cast(sendPtr) % sizeof(TN) == 0) && // Base aligned + ((count * sizeof(T)) % sizeof(TN) == 0); // Stride compatible + + if (canVectorize) { + size_t vector_count = count / VECTOR_FACTOR; + int elements_per_iteration = nthreads * UNROLL_FACTOR; + + // process aligned vectorized elements without bounds checks + size_t aligned_vector_count = (vector_count / elements_per_iteration) * elements_per_iteration; + for (size_t base_offset = tid; base_offset < aligned_vector_count; base_offset += elements_per_iteration) { + // unroll a limited number of peers at a time + for (int peerBase = 0; peerBase < nRanks; peerBase += PEER_UNROLL) { + int peersInGroup = min(PEER_UNROLL, nRanks - peerBase); + + #pragma unroll + for (int p = 0; p < peersInGroup; p++) { + int peer = peerBase + p; + TN* sendVecPtr = (TN*)(sendPtr + peer * count); + TN* recvVecPtr = (TN*)((T*)ncclGetLsaPointer(recvwin, recvoffset, peer) + rank * count); + TN values[UNROLL_FACTOR]; + + // split load/store into separate loops for better overlap and ILP + #pragma unroll + for (int i = 0; i < UNROLL_FACTOR; i++) { + size_t offset = base_offset + i * nthreads; + values[i] = sendVecPtr[offset]; + } + #pragma unroll + for (int i = 0; i < UNROLL_FACTOR; i++) { + size_t offset = base_offset + i * nthreads; + recvVecPtr[offset] = values[i]; + } + } + } + } + + // handle remaining vectorized elements that didn't fit in aligned chunks + for (size_t base_offset = aligned_vector_count + tid; base_offset < vector_count; base_offset += nthreads) { + for (int peer = 0; peer < nRanks; peer++) { + TN* sendVecPtr = (TN*)(sendPtr + peer * count); + TN* recvVecPtr = (TN*)((T*)ncclGetLsaPointer(recvwin, recvoffset, peer) + rank * count); + recvVecPtr[base_offset] = sendVecPtr[base_offset]; + } + } + + // handle any remaining elements not divisible by vectorization factor + size_t scalar_start = vector_count * VECTOR_FACTOR; + for (size_t offset = scalar_start + tid; offset < count; offset += nthreads) { + for (int peer = 0; peer < nRanks; peer++) { + T value = sendPtr[peer * count + offset]; + T* recvPtr = (T*)ncclGetLsaPointer(recvwin, recvoffset, peer); + recvPtr[rank * count + offset] = value; + } + } + } else { + // simple scalar fallback for unaligned data (identical to simple kernel) + AlltoAllScalarImpl(sendwin, sendoffset, recvwin, recvoffset, count, rank, nRanks, tid, nthreads); + } + + bar.sync(ncclCoopCta(), cuda::memory_order_release); +} #endif + +testResult_t AlltoAllRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + char* sptr = (char*)sendbuff + sendoffset; + char* rptr = (char*)recvbuff + recvoffset; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + NCCLCHECK(ncclAlltoAll(sptr, rptr, count, type, comm, stream)); +#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0) + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + size_t rankOffset = count * wordSize(type); + NCCLCHECK(ncclGroupStart()); + for (int r=0; r= 2 && NCCL_MINOR >= 2 - NCCLCHECK(ncclBroadcast(sendbuff, recvbuff, count, type, root, comm, stream)); + NCCLCHECK(ncclBroadcast(sptr, rptr, count, type, root, comm, stream)); #else - if (rank == root) { - NCCLCHECK(ncclBcast(sendbuff, count, type, root, comm, stream)); - } else { - NCCLCHECK(ncclBcast(recvbuff, count, type, root, comm, stream)); - } + if (rank == root) { + NCCLCHECK(ncclBcast(sptr, count, type, root, comm, stream)); + } else { + NCCLCHECK(ncclBcast(rptr, count, type, root, comm, stream)); + } #endif + } else { + return testNotImplemented; + } return testSuccess; } diff --git a/src/common.cu b/src/common.cu index 42bc3eb..f37a3d8 100644 --- a/src/common.cu +++ b/src/common.cu @@ -17,6 +17,11 @@ #include "../verifiable/verifiable.h" +#pragma weak ncclCommWindowRegister +#pragma weak ncclCommWindowDeregister +#pragma weak ncclDevCommCreate +#pragma weak ncclDevCommDestroy + #define DIVUP(x, y) \ (((x)+(y)-1)/(y)) @@ -91,6 +96,11 @@ static int streamnull = 0; static int timeout = 0; static int cudaGraphLaunches = 0; static int report_cputime = 0; +static int deviceImpl = 0; + +int deviceCtaCount = 16; // Default number of CTAs for device implementation +bool deviceMultimemEnabled = false; // Track whether multimem was successfully enabled + // Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX) static int average = 1; #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) @@ -98,6 +108,9 @@ static int average = 1; #define SYMMETRIC_REGISTER 2 static int local_register = 0; #endif +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0) +static int ctaPolicy = -1; +#endif static int minCudaArch = 1<<30; #define NUM_BLOCKS 32 @@ -401,10 +414,22 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t } #endif - TESTCHECK(args->collTest->runColl( - (void*)(in_place ? recvBuff + args->sendInplaceOffset*rank : sendBuff), - (void*)(in_place ? recvBuff + args->recvInplaceOffset*rank : recvBuff), - count, type, op, root, args->comms[i], args->streams[i])); + if (deviceImpl == 0) { + TESTCHECK(args->collTest->runColl( + (void*)(in_place ? recvBuff : sendBuff), in_place ? args->sendInplaceOffset*rank : 0, + (void*)recvBuff, in_place ? args->recvInplaceOffset*rank : 0, + count, type, op, root, args->comms[i], args->streams[i], 0)); + } else { +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + void* sendwin = args->sendRegHandles[i]; + void* recvwin = args->recvRegHandles[i]; + CUDACHECK(cudaSetDevice(args->gpus[i])); + TESTCHECK(args->collTest->runColl( + (void*)(in_place ? recvwin : sendwin), shift + in_place ? args->sendInplaceOffset*rank : 0, + (void*)recvwin, shift + in_place ? args->recvInplaceOffset*rank : 0, + count, type, op, root, (ncclComm_t)(args->devComms+i), args->streams[i], deviceImpl)); +#endif + } #if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) if(opIndex >= ncclNumOps) { @@ -650,49 +675,95 @@ testResult_t threadInit(struct threadArgs* args) { //set main thread again is_main_thread = (is_main_proc && args->thread == 0) ? 1 : 0; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,14,0) + ncclConfig_t config = NCCL_CONFIG_INITIALIZER; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0) + if (ctaPolicy >= 0) + config.CTAPolicy = ctaPolicy; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + config.nvlinkCentricSched = 1; +#endif +#endif +#endif + NCCLCHECK(ncclGroupStart()); for (int i=0; inGpus; i++) { int rank = args->proc*args->nThreads*args->nGpus + args->thread*args->nGpus + i; CUDACHECK(cudaSetDevice(args->gpus[i])); +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,14,0) + NCCLCHECK(ncclCommInitRankConfig(args->comms+i, nranks, args->ncclId, rank, &config)); +#else NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank)); +#endif } NCCLCHECK(ncclGroupEnd()); #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) NCCLCHECK(ncclGroupStart()); - void **sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*args->nGpus) : NULL; - void **recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*args->nGpus) : NULL; for (int i=0; inGpus; i++) { #if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0) if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) { - NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, (ncclWindow_t*)&sendRegHandles[i], NCCL_WIN_COLL_SYMMETRIC)); - NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, (ncclWindow_t*)&recvRegHandles[i], NCCL_WIN_COLL_SYMMETRIC)); + NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, (ncclWindow_t*)&args->sendRegHandles[i], NCCL_WIN_COLL_SYMMETRIC)); + NCCLCHECK(ncclCommWindowRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, (ncclWindow_t*)&args->recvRegHandles[i], NCCL_WIN_COLL_SYMMETRIC)); } else #endif { - if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &sendRegHandles[i])); - if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &recvRegHandles[i])); + if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &args->sendRegHandles[i])); + if (local_register) NCCLCHECK(ncclCommRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &args->recvRegHandles[i])); } } NCCLCHECK(ncclGroupEnd()); #endif +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + /* Create device communicators with multimem fallback */ + if (deviceImpl) { + // Duplicate comms so our checks here do not affect the originals + ncclComm_t tmpComms[args->nGpus]; + memset(tmpComms, 0, sizeof(tmpComms)); + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { + int rank; + NCCLCHECK(ncclCommUserRank(args->comms[i], &rank)); + NCCLCHECK(ncclCommSplit(args->comms[i], 0, rank, &tmpComms[i], NULL)); + } + NCCLCHECK(ncclGroupEnd()); + + // Check multimem support on the duplicated comms + bool checkMultimemFailed = false; + ncclResult_t result; + ncclDevComm tmpDevComms[args->nGpus]; + memset(tmpDevComms, 0, sizeof(tmpDevComms)); + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { + ncclDevCommRequirements reqs; + memset(&reqs, 0, sizeof(reqs)); + reqs.lsaBarrierCount = deviceCtaCount; + reqs.lsaMultimem = true; + result = ncclDevCommCreate(tmpComms[i], &reqs, &tmpDevComms[i]); + if (result != ncclInProgress && result != ncclSuccess) { + checkMultimemFailed = true; + } + } + result = ncclGroupEnd(); + if (result != ncclSuccess) checkMultimemFailed = true; + deviceMultimemEnabled = !checkMultimemFailed; + + // Create final dev comms with correct multimem setting and cleanup temps + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < args->nGpus; i++) { + ncclDevCommRequirements reqs; + memset(&reqs, 0, sizeof(reqs)); + reqs.lsaBarrierCount = deviceCtaCount; + reqs.lsaMultimem = deviceMultimemEnabled; + NCCLCHECK(ncclDevCommCreate(args->comms[i], &reqs, args->devComms+i)); + NCCLCHECK(ncclDevCommDestroy(tmpComms[i], &tmpDevComms[i])); + NCCLCHECK(ncclCommDestroy(tmpComms[i])); + } + NCCLCHECK(ncclGroupEnd()); + } +#endif TESTCHECK(threadRunTests(args)); - for (int i=0; inGpus; i++) { -#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) -#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0) - if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) { - NCCLCHECK(ncclCommWindowDeregister(args->comms[i], (ncclWindow_t)sendRegHandles[i])); - NCCLCHECK(ncclCommWindowDeregister(args->comms[i], (ncclWindow_t)recvRegHandles[i])); - } else -#endif - { - if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], sendRegHandles[i])); - if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], recvRegHandles[i])); - } -#endif - NCCLCHECK(ncclCommDestroy(args->comms[i])); - } return testSuccess; } @@ -778,13 +849,16 @@ int main(int argc, char* argv[]) { {"report_cputime", required_argument, 0, 'C'}, {"average", required_argument, 0, 'a'}, {"local_register", required_argument, 0, 'R'}, + {"cta_policy", required_argument, 0, 'x'}, + {"device_implementation", required_argument, 0, 'D'}, + {"device_cta_count", required_argument, 0, 'V'}, {"help", no_argument, 0, 'h'}, {} }; while(1) { int c; - c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:N:p:c:o:d:r:z:y:T:hG:C:a:R:", longopts, &longindex); + c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:N:p:c:o:d:r:z:y:T:hG:C:a:R:x:D:V:", longopts, &longindex); if (c == -1) break; @@ -887,6 +961,40 @@ int main(int argc, char* argv[]) { printf("Option -R (register) is not supported before NCCL 2.19. Ignoring\n"); #endif break; + case 'x': + if (test_ncclVersion >= NCCL_VERSION(2,27,0)) { + ctaPolicy = (int)strtol(optarg, NULL, 0); + if (ctaPolicy > 1 && test_ncclVersion < NCCL_VERSION(2,28,0)) { + printf("Option -x (cta_policy) %d is not supported before NCCL 2.28. Ignoring\n", ctaPolicy); + ctaPolicy = -1; + } + } + else + printf("Option -x (cta_policy) is not supported before NCCL 2.27. Ignoring\n"); + break; + case 'D': + if (test_ncclVersion >= NCCL_VERSION(2,28,0)) { + deviceImpl = (int)strtol(optarg, NULL, 0); + } + else { + fprintf(stderr, "Option -D (device implementation) requires NCCL >= 2.28.0\n"); + return -1; + } + break; + case 'V': + if (test_ncclVersion >= NCCL_VERSION(2,28,0)) { + deviceCtaCount = (int)strtol(optarg, NULL, 0); + if (deviceCtaCount <= 0 || deviceCtaCount > 128) { + fprintf(stderr, "device_cta_count (-V) must be positive and less than 128, got %d. " + "Using default value 16.\n", deviceCtaCount); + deviceCtaCount = 16; + } + } + else { + fprintf(stderr, "Option -V (device CTA count) requires NCCL >= 2.28.0\n"); + return -1; + } + break; case 'h': default: if (c != 'h') printf("invalid option '%c'\n", c); @@ -919,6 +1027,9 @@ int main(int argc, char* argv[]) { "[-C,--report_cputime <0/1>] \n\t" "[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t" "[-R,--local_register <0/1/2> enable local (1) or symmetric (2) buffer registration on send/recv buffers (default: disable (0))] \n\t" + "[-x,--cta_policy <0/1/2> set CTA policy (NCCL_CTA_POLICY_DEFAULT (0), NCCL_CTA_POLICY_EFFICIENCY (1), NCCL_CTA_POLICY_ZERO (2)) (default: do not set)] \n\t" + "[-D,--device_implementation enable device implementation (default: 0, use NCCL implementation; requires -R 2 if > 0)] \n\t" + "[-V,--device_cta_count set number of CTAs for device implementation (default: 16)] \n\t" "[-h,--help]\n", basename(argv[0])); return 0; @@ -930,6 +1041,11 @@ int main(int argc, char* argv[]) { (unsigned long long)maxBytes); return -1; } + if (deviceImpl > 0 && (local_register != SYMMETRIC_REGISTER)) { + fprintf(stderr, "device implementation (-D > 0) requires enabling symmetric memory registration (-R 2)\n"); + return -1; + } + #ifdef MPI_SUPPORT MPI_Init(&argc, &argv); #endif @@ -1115,24 +1231,37 @@ testResult_t run() { //if parallel init is not selected, use main thread to initialize NCCL ncclComm_t* comms = (ncclComm_t*)malloc(sizeof(ncclComm_t)*nThreads*nGpus); #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) - void **sendRegHandles = NULL; - void **recvRegHandles = NULL; + void* sendRegHandles[nThreads*nGpus]; + void* recvRegHandles[nThreads*nGpus]; + memset(sendRegHandles, 0, sizeof(sendRegHandles)); + memset(recvRegHandles, 0, sizeof(recvRegHandles)); +#endif +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + ncclDevComm devComms[nThreads*nGpus]; #endif if (!parallel_init) { - if (ncclProcs == 1) { - NCCLCHECK(ncclCommInitAll(comms, nGpus*nThreads, gpus)); - } else { - NCCLCHECK(ncclGroupStart()); - for (int i=0; i= NCCL_VERSION(2,14,0) + ncclConfig_t config = NCCL_CONFIG_INITIALIZER; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0) + if (ctaPolicy >= 0) + config.CTAPolicy = ctaPolicy; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + config.nvlinkCentricSched = 1; +#endif +#endif +#endif + NCCLCHECK(ncclGroupStart()); + for (int i=0; i= NCCL_VERSION(2,14,0) + NCCLCHECK(ncclCommInitRankConfig(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i, &config)); +#else + NCCLCHECK(ncclCommInitRank(comms+i, ncclProcs*nThreads*nGpus, ncclId, ncclProc*nThreads*nGpus+i)); +#endif } + NCCLCHECK(ncclGroupEnd()); #if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) NCCLCHECK(ncclGroupStart()); - sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*nThreads*nGpus) : NULL; - recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*nThreads*nGpus) : NULL; for (int i=0; i= NCCL_VERSION(2,27,0) if (test_ncclVersion >= NCCL_VERSION(2,27,0) && (local_register == SYMMETRIC_REGISTER)) { @@ -1146,13 +1275,59 @@ testResult_t run() { } } NCCLCHECK(ncclGroupEnd()); +#endif +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + /* Create device communicators with multimem fallback */ + if (deviceImpl) { + // Duplicate comms so our checks here do not affect the originals + ncclComm_t tmpComms[nGpus * nThreads]; + memset(tmpComms, 0, sizeof(tmpComms)); + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < nGpus * nThreads; i++) { + int rank; + NCCLCHECK(ncclCommUserRank(comms[i], &rank)); + NCCLCHECK(ncclCommSplit(comms[i], 0, rank, &tmpComms[i], NULL)); + } + NCCLCHECK(ncclGroupEnd()); + + // Check multimem support on the duplicated comms + bool checkMultimemFailed = false; + ncclResult_t result; + ncclDevComm tmpDevComms[nGpus * nThreads]; + memset(tmpDevComms, 0, sizeof(tmpDevComms)); + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < nGpus * nThreads; i++) { + ncclDevCommRequirements reqs; + memset(&reqs, 0, sizeof(reqs)); + reqs.lsaBarrierCount = deviceCtaCount; + reqs.lsaMultimem = true; + result = ncclDevCommCreate(tmpComms[i], &reqs, &tmpDevComms[i]); + if (result != ncclInProgress && result != ncclSuccess) { + checkMultimemFailed = true; + } + } + result = ncclGroupEnd(); + if (result != ncclSuccess) checkMultimemFailed = true; + deviceMultimemEnabled = !checkMultimemFailed; + + // Create final dev comms with correct multimem setting and cleanup temps + NCCLCHECK(ncclGroupStart()); + for (int i = 0; i < nGpus * nThreads; i++) { + ncclDevCommRequirements reqs; + memset(&reqs, 0, sizeof(reqs)); + reqs.lsaBarrierCount = deviceCtaCount; + reqs.lsaMultimem = deviceMultimemEnabled; + NCCLCHECK(ncclDevCommCreate(comms[i], &reqs, devComms+i)); + NCCLCHECK(ncclDevCommDestroy(tmpComms[i], &tmpDevComms[i])); + NCCLCHECK(ncclCommDestroy(tmpComms[i])); + } + NCCLCHECK(ncclGroupEnd()); + } #endif } int errors[nThreads]; double bw[nThreads]; - double* delta; - CUDACHECK(cudaHostAlloc(&delta, sizeof(double)*nThreads*NUM_BLOCKS, cudaHostAllocPortable | cudaHostAllocMapped)); int bw_count[nThreads]; for (int t=0; t= NCCL_VERSION(2,28,0) + threads[t].args.devComms = devComms+t*nGpus; +#endif +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) + threads[t].args.sendRegHandles = sendRegHandles+t*nGpus; + threads[t].args.recvRegHandles = recvRegHandles+t*nGpus; +#endif threads[t].args.ncclId = ncclId; threads[t].args.comms=comms+t*nGpus; threads[t].args.streams=streams+t*nGpus; @@ -1252,11 +1434,6 @@ testResult_t run() { if (datacheck) CUDACHECK(cudaFree(expected[i])); #endif } - CUDACHECK(cudaFreeHost(delta)); -#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) - free(sendRegHandles); - free(recvRegHandles); -#endif envstr = getenv("NCCL_TESTS_MIN_BW"); double check_avg_bw = envstr ? atof(envstr) : -1; diff --git a/src/common.h b/src/common.h index ff834f6..6ccee42 100644 --- a/src/common.h +++ b/src/common.h @@ -7,6 +7,9 @@ #define __COMMON_H__ #include "nccl.h" +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +#include "nccl_device.h" +#endif #include #include #include @@ -66,7 +69,8 @@ typedef enum { testCudaError = 2, testNcclError = 3, testTimeout = 4, - testNumResults = 5 + testNotImplemented = 5, + testNumResults = 6 } testResult_t; // Relay errors up and trace @@ -91,8 +95,8 @@ struct testColl { testResult_t (*initData)(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place); void (*getBw)(size_t count, int typesize, double sec, double* algBw, double* busBw, int nranks); - testResult_t (*runColl)(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, - ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream); + testResult_t (*runColl)(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, + size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int implIndex); }; extern struct testColl allReduceTest; extern struct testColl allGatherTest; @@ -131,6 +135,9 @@ struct threadArgs { size_t recvInplaceOffset; ncclUniqueId ncclId; ncclComm_t* comms; +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) + ncclDevComm* devComms; +#endif cudaStream_t* streams; void** expected; @@ -142,6 +149,11 @@ struct threadArgs { int reportErrors; struct testColl* collTest; + +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0) + void** sendRegHandles; + void** recvRegHandles; +#endif }; typedef testResult_t (*threadFunc_t)(struct threadArgs* args); @@ -263,6 +275,8 @@ static size_t wordSize(ncclDataType_t type) { } extern int test_ncclVersion; // init'd with ncclGetVersion() +extern int deviceCtaCount; // number of CTAs for device implementation +extern bool deviceMultimemEnabled; // whether multimem was successfully enabled constexpr int test_opNumMax = (int)ncclNumOps + (NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0) ? 1 : 0); extern int test_opnum; extern int test_typenum; @@ -301,4 +315,50 @@ extern int is_main_proc; extern thread_local int is_main_thread; #define PRINT if (is_main_thread) printf +#if NCCL_VERSION_CODE >= NCCL_VERSION(2,28,0) +template +testResult_t testLaunchDeviceKernel(F kernel, void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int useMultimem) { + if (kernel == nullptr) return testNotImplemented; + ncclDevComm* devComm = (ncclDevComm*)comm; + + // Check if multimem is enabled for this kernel + if (useMultimem && !deviceMultimemEnabled) { + printf("[KERNEL_LAUNCH_ERROR] Device kernel requires multimem but it was not available during " + "DevComm creation. Multimem support may not be available on this hardware.\n"); + return testInternalError; + } + + // Only check mcBasePtr if multimem is active for this kernel + if (useMultimem && devComm->lsaMultimem.mcBasePtr == nullptr) { + printf("[KERNEL_LAUNCH_ERROR] Device kernel requires multimem, which may not be available.\n"); + return testInternalError; + } + + ncclWindow_t sendwin = (ncclWindow_t)sendbuff; + ncclWindow_t recvwin = (ncclWindow_t)recvbuff; + kernel<<>>(sendwin, sendoffset, recvwin, recvoffset, count, root, *devComm); + return testSuccess; +} + +#define SPECIALIZE_KERNEL(kernel, type, op) \ + ( op != ncclSum ? nullptr : \ + type == ncclInt8 ? kernel : \ + type == ncclUint8 ? kernel : \ + type == ncclInt32 ? kernel : \ + type == ncclUint32 ? kernel : \ + type == ncclInt64 ? kernel : \ + type == ncclUint64 ? kernel : \ + type == ncclFloat16 ? kernel : \ + type == ncclFloat32 ? kernel : \ + type == ncclFloat64 ? kernel : \ + nullptr \ + ) +#else +template +testResult_t testLaunchDeviceKernel(F kernel, void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int useMultimem) { + return testNotImplemented; +} +#define SPECIALIZE_KERNEL(kernel, type, op) nullptr +#endif + #endif diff --git a/src/common.mk b/src/common.mk index fbbdbb6..0bd630f 100644 --- a/src/common.mk +++ b/src/common.mk @@ -17,6 +17,13 @@ CUDA_VERSION = $(strip $(shell which $(NVCC) >/dev/null && $(NVCC) --version | g CUDA_MAJOR = $(shell echo $(CUDA_VERSION) | cut -d "." -f 1) CUDA_MINOR = $(shell echo $(CUDA_VERSION) | cut -d "." -f 2) +# CUDA 13.0 requires c++17 +ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 13; echo $$?),0) + CXXSTD ?= -std=c++17 +else + CXXSTD ?= -std=c++14 +endif + # Better define NVCC_GENCODE in your environment to the minimal set # of archs to reduce compile time. ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 13; echo $$?),0) @@ -59,8 +66,8 @@ NVCC_GENCODE ?= -gencode=arch=compute_35,code=sm_35 \ -gencode=arch=compute_70,code=compute_70 endif -NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) -std=c++14 -CXXFLAGS := -std=c++14 +NVCUFLAGS := -ccbin $(CXX) $(NVCC_GENCODE) $(CXXSTD) +CXXFLAGS := $(CXXSTD) LDFLAGS := -L${CUDA_LIB} -lcudart -lrt NVLDFLAGS := -L${CUDA_LIB} -l${CUDARTLIB} -lrt diff --git a/src/gather.cu b/src/gather.cu index a4a7a30..a4c2ded 100644 --- a/src/gather.cu +++ b/src/gather.cu @@ -43,23 +43,35 @@ void GatherGetBw(size_t count, int typesize, double sec, double* algBw, double* *busBw = baseBw * factor; } -testResult_t GatherRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - int nRanks; - NCCLCHECK(ncclCommCount(comm, &nRanks)); - int rank; - NCCLCHECK(ncclCommUserRank(comm, &rank)); - size_t rankOffset = count * wordSize(type); - if (count == 0) return testSuccess; +testResult_t GatherRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); + size_t rankOffset = count * wordSize(type); + if (count == 0) return testSuccess; - NCCLCHECK(ncclGroupStart()); - NCCLCHECK(ncclSend(sendbuff, count, type, root, comm, stream)); - if (rank == root) { - for (int r=0; r= NCCL_VERSION(2,28,0) + NCCLCHECK(ncclGather(sptr, rptr, count, type, root, comm, stream)); +#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0) + NCCLCHECK(ncclGroupStart()); + NCCLCHECK(ncclSend(sptr, count, type, root, comm, stream)); + if (rank == root) { + for (int r=0; r +#include + +// Multimem operations. Since Multimem is currently only available in PTX here are C++ wrappers around it. +// First template argument is data type, second template type is vectorized data type. +// In the future, the second template type also dictates reduction accuracy + +template +__device__ __forceinline__ valT multimemLoadSum(const ptrT* addr) { + assert(false); + // static_assert(std::is_same::value, "multimemLoadSum can only be instantiated with implemented types"); + // static_assert(std::is_same::value, "multimemLoadSum can only be instantiated with implemented types"); + return valT{0}; +} + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ double multimemLoadSum(const double* addr) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + double result; + asm volatile("multimem.ld_reduce.global.add.f64 %0, [%1];" : "=d"(result) : "l"(multimem_addr) : "memory"); + return result; +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ float multimemLoadSum(const float* addr) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + float result; + asm volatile("multimem.ld_reduce.global.add.f32 %0, [%1];" : "=f"(result) : "l"(multimem_addr) : "memory"); + return result; +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ float2 multimemLoadSum(const float* addr) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + float2 result; + asm volatile("multimem.ld_reduce.global.add.v2.f32 {%0, %1}, [%2];" : "=f"(result.x), "=f"(result.y) : "l"(multimem_addr) : "memory"); + return result; +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ float4 multimemLoadSum(const float* addr) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + float4 result; + asm volatile("multimem.ld_reduce.global.add.v4.f32 {%0, %1, %2, %3}, [%4];" : "=f"(result.x), "=f"(result.y), "=f"(result.z), "=f"(result.w) : "l"(multimem_addr) : "memory"); + return result; +} +#endif + +template +__device__ __forceinline__ void multimemStore(ptrT* addr, const valT val) { + assert(false); + // static_assert(std::is_same::value, "multimemStore can only be instantiated with implemented types"); + // static_assert(std::is_same::value, "multimemStore can only be instantiated with implemented types"); +} + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ void multimemStore(double* addr, const double val) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + asm volatile("multimem.st.global.f64 [%0], %1;" : : "l"(multimem_addr), "d"(val) : "memory"); +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ void multimemStore(float* addr, const float val) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + asm volatile("multimem.st.global.f32 [%0], %1;" : : "l"(multimem_addr), "f"(val) : "memory"); +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ void multimemStore(float* addr, const float2 val) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + asm volatile("multimem.st.global.v2.f32 [%0], {%1, %2};" : : "l"(multimem_addr), "f"(val.x), "f"(val.y) : "memory"); +} +#endif + +#if __CUDA_ARCH__ >= 900 // Hopper and later +template<> +__device__ __forceinline__ void multimemStore(float* addr, const float4 val) { + const uintptr_t multimem_addr = reinterpret_cast(addr); + asm volatile("multimem.st.global.v4.f32 [%0], {%1, %2, %3, %4};" : : "l"(multimem_addr), "f"(val.x), "f"(val.y), "f"(val.z), "f"(val.w) : "memory"); +} +#endif + + +#endif // _MULTIMEM_OPS_H_ diff --git a/src/reduce.cu b/src/reduce.cu index 731abfa..a171c7c 100644 --- a/src/reduce.cu +++ b/src/reduce.cu @@ -39,8 +39,14 @@ void ReduceGetBw(size_t count, int typesize, double sec, double* algBw, double* *busBw = baseBw; } -testResult_t ReduceRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - NCCLCHECK(ncclReduce(sendbuff, recvbuff, count, type, op, root, comm, stream)); +testResult_t ReduceRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + char* sptr = (char*)sendbuff + sendoffset; + char* rptr = (char*)recvbuff + recvoffset; + NCCLCHECK(ncclReduce(sptr, rptr, count, type, op, root, comm, stream)); + } else { + return testNotImplemented; + } return testSuccess; } diff --git a/src/reduce_scatter.cu b/src/reduce_scatter.cu index 35cfdd4..f0669e1 100644 --- a/src/reduce_scatter.cu +++ b/src/reduce_scatter.cu @@ -42,8 +42,14 @@ void ReduceScatterGetBw(size_t count, int typesize, double sec, double* algBw, d *busBw = baseBw * factor; } -testResult_t ReduceScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - NCCLCHECK(ncclReduceScatter(sendbuff, recvbuff, count, type, op, comm, stream)); +testResult_t ReduceScatterRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + char* sptr = (char*)sendbuff + sendoffset; + char* rptr = (char*)recvbuff + recvoffset; + NCCLCHECK(ncclReduceScatter(sptr, rptr, count, type, op, comm, stream)); + } else { + return testNotImplemented; + } return testSuccess; } diff --git a/src/scatter.cu b/src/scatter.cu index d1eec71..1b23cae 100644 --- a/src/scatter.cu +++ b/src/scatter.cu @@ -39,23 +39,35 @@ void ScatterGetBw(size_t count, int typesize, double sec, double* algBw, double* *busBw = baseBw * factor; } -testResult_t ScatterRunColl(void* sendbuff, void* recvbuff, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream) { - int nRanks; - NCCLCHECK(ncclCommCount(comm, &nRanks)); - int rank; - NCCLCHECK(ncclCommUserRank(comm, &rank)); - size_t rankOffset = count * wordSize(type); - if (count == 0) return testSuccess; +testResult_t ScatterRunColl(void* sendbuff, size_t sendoffset, void* recvbuff, size_t recvoffset, size_t count, ncclDataType_t type, ncclRedOp_t op, int root, ncclComm_t comm, cudaStream_t stream, int deviceImpl) { + if (deviceImpl == 0) { + int nRanks; + NCCLCHECK(ncclCommCount(comm, &nRanks)); + int rank; + NCCLCHECK(ncclCommUserRank(comm, &rank)); + size_t rankOffset = count * wordSize(type); + if (count == 0) return testSuccess; - NCCLCHECK(ncclGroupStart()); - if (rank == root) { - for (int r=0; r= NCCL_VERSION(2,28,0) + NCCLCHECK(ncclScatter(sptr, rptr, count, type, root, comm, stream)); +#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,7,0) + NCCLCHECK(ncclGroupStart()); + if (rank == root) { + for (int r=0; r + +// Helper functions to use vectorized types + +// This maps at compile time each data type to its best available vectorized type. +// As close to 128 bits as possible +template +struct VectorTypeMapping{ + using Type=T; // Default no vectorization +}; + +template <> +struct VectorTypeMapping{ + using Type=float4; +}; + +template <> +struct VectorTypeMapping{ + using Type=double2; +}; + +template <> +struct VectorTypeMapping{ + using Type=char4; // Largest built-in CUDA type for char (32-bit) +}; + +template <> +struct VectorTypeMapping{ + using Type=uchar4; // Largest built-in CUDA type for uchar (32-bit) +}; + +template <> +struct VectorTypeMapping{ + using Type=int4; +}; + +template <> +struct VectorTypeMapping{ + using Type=uint4; +}; + + +// Vector addition helper functions +// They enable clean math with vector types. +template +__device__ __forceinline__ T vectorAdd(T a, T b) { + return a + b; +} + +template <> +__device__ __forceinline__ float4 vectorAdd(float4 a, float4 b) { + return make_float4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w); +} + +template <> +__device__ __forceinline__ double2 vectorAdd(double2 a, double2 b) { + return make_double2(a.x + b.x, a.y + b.y); +} + +template <> +__device__ __forceinline__ char4 vectorAdd(char4 a, char4 b) { + return make_char4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w); +} + +template <> +__device__ __forceinline__ uchar4 vectorAdd(uchar4 a, uchar4 b) { + return make_uchar4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w); +} + +template <> +__device__ __forceinline__ int4 vectorAdd(int4 a, int4 b) { + return make_int4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w); +} + +template <> +__device__ __forceinline__ uint4 vectorAdd(uint4 a, uint4 b) { + return make_uint4(a.x + b.x, a.y + b.y, a.z + b.z, a.w + b.w); +} + +#endif // _VECTOR_TYPES_H_