Resync with NCCL 2.13

* Added "verifiable", a suite of kernels for generating and verifying reduction
  input and output arrays in a bit-precise way.
* Data corruption errors now reported in number of wrong elements instead of max
  deviation.
* Use ncclGetLastError.
* Don't run hypercube on non-powers of 2 ranks.
* Fix to hypercube data verification.
* Use "thread local" as the defaut CUDA capture mode.
* Replaced pthread_yield -> sched_yield()
* Bugfix to the cpu-side barrier/allreduce implementations.
This commit is contained in:
John Bachan 2022-08-19 15:15:10 -05:00
parent 8274cb47b6
commit 51af5572bf
18 changed files with 1706 additions and 515 deletions

View File

@ -83,12 +83,16 @@ build: ${BIN_FILES}
clean:
rm -rf ${DST_DIR}
${DST_DIR}/%.o: %.cu common.h
TEST_VERIFIABLE_SRCDIR := ../verifiable
TEST_VERIFIABLE_BUILDDIR := $(BUILDDIR)/verifiable
include ../verifiable/verifiable.mk
${DST_DIR}/%.o: %.cu common.h $(TEST_VERIFIABLE_HDRS)
@printf "Compiling %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) -c $<
${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o
${DST_DIR}/%_perf:${DST_DIR}/%.o ${DST_DIR}/common.o $(TEST_VERIFIABLE_OBJS)
@printf "Linking %-35s > %s\n" $< $@
@mkdir -p ${DST_DIR}
$(NVCC) -o $@ $(NVCUFLAGS) $^ ${NVLDFLAGS}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s out-of-place in-place \n", "", "", "");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s", size, count, typeName);
}
void AllGatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count/nranks;
*recvcount = (count/nranks)*nranks;
@ -38,9 +26,9 @@ testResult_t AllGatherInitData(struct threadArgs* args, ncclDataType_t type, ncc
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
TESTCHECK(InitData((char*)args->expected[i] + args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
CUDACHECK(cudaDeviceSynchronize());
}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6s", size, count, typeName, opName);
}
void AllReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
@ -38,7 +26,7 @@ testResult_t AllReduceInitData(struct threadArgs* args, ncclDataType_t type, ncc
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
CUDACHECK(cudaDeviceSynchronize());
}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6s", size, count, typeName, opName);
}
void AlltoAllGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = (count/nranks)*nranks;
*recvcount = (count/nranks)*nranks;
@ -39,9 +27,10 @@ testResult_t AlltoAllInitData(struct threadArgs* args, ncclDataType_t type, nccl
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes/nranks*j, sendcount/nranks, type, rep+rank*sendcount/nranks, j));
size_t partcount = sendcount/nranks;
TESTCHECK(InitData((char*)args->expected[i] + j*partcount*wordSize(type), partcount, rank*partcount, type, ncclSum, 33*rep + j, 1, 0));
}
CUDACHECK(cudaDeviceSynchronize());
}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "root",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6i", size, count, typeName, root);
}
void BroadcastGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
@ -37,8 +25,8 @@ testResult_t BroadcastInitData(struct threadArgs* args, ncclDataType_t type, ncc
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, root));
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[i], recvcount, 0, type, ncclSum, rep, 1, 0));
CUDACHECK(cudaDeviceSynchronize());
}
return testSuccess;

View File

@ -7,10 +7,13 @@
#include "common.h"
#include <pthread.h>
#include <cstdio>
#include <type_traits>
#include <getopt.h>
#include <libgen.h>
#include "cuda.h"
#include "../verifiable/verifiable.h"
int test_ncclVersion = 0; // init'd with ncclGetVersion()
#if NCCL_MAJOR >= 2
@ -107,362 +110,154 @@ static double parsesize(const char *value) {
return size * units;
}
double DeltaMaxValue(ncclDataType_t type) {
switch(type) {
case ncclHalf: return 1e-2;
#if defined(__CUDA_BF16_TYPES_EXIST__)
case ncclBfloat16: return 1e-2;
#endif
case ncclFloat: return 1e-5;
case ncclDouble: return 1e-12;
case ncclInt:
#if NCCL_MAJOR >= 2
case ncclUint8:
//case ncclInt32:
case ncclUint32:
#endif
case ncclInt64:
case ncclUint64: return 1e-200;
}
return 1e-200;
}
template<typename T> __device__
double absDiff(T a, T b) {
return fabs((double)(b - a));
}
template<> __device__
double absDiff<half>(half a, half b) {
float x = __half2float(a);
float y = __half2float(b);
return fabs((double)(y-x));
}
template<typename T> __device__
float toFloat(T a) {
return (float)a;
}
template<> __device__
float toFloat(half a) {
return __half2float(a);
}
#if defined(__CUDA_BF16_TYPES_EXIST__)
template<> __device__
float toFloat(__nv_bfloat16 a) {
return __bfloat162float(a);
}
#endif
template<typename T, int BSIZE> __global__
void deltaKern(void* A_, void* B_, size_t count, double* max) {
const T* A = (const T*)A_;
const T* B = (const T*)B_;
__shared__ double temp[BSIZE];
int tid = blockIdx.x*blockDim.x + threadIdx.x;
double locmax = 0.0;
for(size_t i=tid; i<count; i+=blockDim.x*gridDim.x) {
double delta = absDiff(A[i], B[i]);
if( delta > locmax ) {
locmax = delta;
#ifdef DEBUG_PRINT
if (delta > .1) printf("Error at %ld/%ld(%p) : %f != %f\n", i, count, B+i, toFloat(A[i]), toFloat(B[i]));
#endif
}
}
tid = threadIdx.x;
temp[tid] = locmax;
for(int stride = BSIZE/2; stride > 1; stride>>=1) {
__syncthreads();
if( tid < stride )
temp[tid] = temp[tid] > temp[tid+stride] ? temp[tid] : temp[tid+stride];
}
__syncthreads();
if( threadIdx.x == 0)
max[blockIdx.x] = temp[0] > temp[1] ? temp[0] : temp[1];
}
testResult_t CheckDelta(void* results, void* expected, size_t count, ncclDataType_t type, double* devmax) {
switch (type) {
#if defined(__CUDA_BF16_TYPES_EXIST__)
case ncclBfloat16:
deltaKern<__nv_bfloat16, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
#endif
case ncclHalf:
deltaKern<half, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
case ncclFloat:
deltaKern<float, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
case ncclDouble:
deltaKern<double, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
case ncclChar:
#if NCCL_MAJOR >= 2
case ncclUint8:
#endif
deltaKern<uint8_t, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
case ncclInt:
#if NCCL_MAJOR >= 2
case ncclUint32:
#endif
deltaKern<uint32_t, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
case ncclInt64:
case ncclUint64:
deltaKern<uint64_t, 512><<<NUM_BLOCKS, 512>>>(results, expected, count, devmax); break;
}
testResult_t CheckDelta(void* results, void* expected, size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int64_t *wrongEltN) {
ncclVerifiableVerify(results, expected, count, (int)type, (int)op, nranks, seed, offset, wrongEltN, cudaStreamDefault);
CUDACHECK(cudaDeviceSynchronize());
for (int i=1; i<NUM_BLOCKS; i++) devmax[0] = std::max(devmax[0], devmax[i]);
return testSuccess;
}
// For integer values, we use values between 0 and 255
template<typename T>
__device__ T testValue(const size_t offset, const int rep, const int rank) {
uint8_t v = (rep+rank+offset) % 256;
return (T)v;
testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks) {
ncclVerifiablePrepareExpected(data, count, (int)type, (int)op, nranks, seed, offset, cudaStreamDefault);
return testSuccess;
}
// For floating point datatype, we use values between 0 and 1 otherwise the
// Product operation will produce NaNs.
template<>
__device__ double testValue<double>(const size_t offset, const int rep, const int rank) {
return 1.0/(1.0+(double)testValue<int>(offset, rep, rank));
}
template<>
__device__ float testValue<float>(const size_t offset, const int rep, const int rank) {
return 1.0/(1.0+(float)testValue<int>(offset, rep, rank));
}
template<>
__device__ half testValue<half>(const size_t offset, const int rep, const int rank) {
return __float2half(testValue<float>(offset, rep, rank));
}
#if defined(__CUDA_BF16_TYPES_EXIST__)
template<>
__device__ __nv_bfloat16 testValue<__nv_bfloat16>(const size_t offset, const int rep, const int rank) {
return __float2bfloat16(testValue<float>(offset, rep, rank));
}
#endif
// Operations
template<typename T>
__device__ T ncclOpSum(T a, T b) { return a+b; }
template<typename T>
__device__ T ncclOpProd(T a, T b) { return a*b; }
template<typename T>
__device__ T ncclOpMax(T a, T b) { return a>b ? a : b; }
template<typename T>
__device__ T ncclOpMin(T a, T b) { return a<b ? a : b; }
// Definitions for half
template<>
__device__ half ncclOpSum(half a, half b) { return __float2half(__half2float(a)+__half2float(b)); }
template<>
__device__ half ncclOpProd(half a, half b) { return __float2half(__half2float(a)*__half2float(b)); }
template<>
__device__ half ncclOpMax(half a, half b) { return __half2float(a)>__half2float(b) ? a : b; }
template<>
__device__ half ncclOpMin(half a, half b) { return __half2float(a)<__half2float(b) ? a : b; }
template<typename T>
__device__ T ncclPPOpIdent(T x, int arg) { return x; }
template<typename T>
__device__ T ncclPPOpMul(T x, int arg) { return x*T(arg); }
template<typename T>
__device__ T ncclPPOpDiv(T x, int arg) { return x/T(arg); }
template<>
__device__ half ncclPPOpMul(half x, int arg) {
return __float2half(__half2float(x)*float(arg));
}
template<>
__device__ half ncclPPOpDiv(half x, int n) {
return __float2half(__half2float(x)/n);
}
#if defined(__CUDA_BF16_TYPES_EXIST__)
template<>
__device__ __nv_bfloat16 ncclPPOpMul(__nv_bfloat16 x, int arg) {
return __float2bfloat16(__bfloat162float(x)*float(arg));
}
template<>
__device__ __nv_bfloat16 ncclPPOpDiv(__nv_bfloat16 x, int n) {
return __float2bfloat16(__bfloat162float(x)/n);
}
#endif
__host__ __device__ int preMulScalar(int rank) {
return 1 + rank%2;
testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, uint64_t seed, int nranks, int rank) {
ncclVerifiablePrepareInput(data, count, (int)type, (int)op, nranks, rank, seed, offset, cudaStreamDefault);
return testSuccess;
}
template<typename T, T (*Op)(T, T), T(*PreOp)(T,int), T(*PostOp)(T,int)>
__global__ void InitDataReduceKernel(T* data, const size_t N, const size_t offset, const int rep, const int nranks) {
for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o<N; o+=gridDim.x*blockDim.x) {
T val = testValue<T>(o+offset, rep, 0);
val = PreOp(val, preMulScalar(0));
for (int i=1; i<nranks; i++) {
T val1 = testValue<T>(o+offset, rep, i);
val1 = PreOp(val1, preMulScalar(i));
val = Op(val, val1);
}
data[o] = PostOp(val, nranks);
void Barrier(struct threadArgs *args) {
thread_local int epoch = 0;
static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER};
static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER};
static int counter[2] = {0, 0};
pthread_mutex_lock(&lock[epoch]);
if(++counter[epoch] == args->nThreads)
pthread_cond_broadcast(&cond[epoch]);
if(args->thread+1 == args->nThreads) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
MPI_Barrier(MPI_COMM_WORLD);
#endif
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
else {
while(counter[epoch] != 0)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
}
pthread_mutex_unlock(&lock[epoch]);
epoch ^= 1;
}
#define KERN(type, op, preop, postop) (void*)InitDataReduceKernel<type, op<type>, preop<type>, postop<type> >
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,11,0)
#define OPS(type) \
KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpSum/*Avg*/, ncclPPOpIdent, ncclPPOpDiv), \
KERN(type, ncclOpSum/*PreMulSum*/, ncclPPOpMul, ncclPPOpIdent)
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
#define OPS(type) \
KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpSum/*Avg*/, ncclPPOpIdent, ncclPPOpDiv)
#else
#define OPS(type) \
KERN(type, ncclOpSum, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpProd, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMax, ncclPPOpIdent, ncclPPOpIdent), \
KERN(type, ncclOpMin, ncclPPOpIdent, ncclPPOpIdent)
#endif
static void* const redInitDataKerns[test_opNumMax*ncclNumTypes] = {
OPS(int8_t), OPS(uint8_t), OPS(int32_t), OPS(uint32_t), OPS(int64_t), OPS(uint64_t), OPS(half), OPS(float), OPS(double),
#if defined(__CUDA_BF16_TYPES_EXIST__) && NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
OPS(__nv_bfloat16)
#endif
};
testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const int rep, const int nranks) {
dim3 grid = { 32, 1, 1 };
dim3 block = { 256, 1, 1 };
void* args[5] = { (void*)&data, (void*)&count, (void*)&offset, (void*)&rep, (void*)&nranks };
CUDACHECK(cudaLaunchKernel(redInitDataKerns[type*test_opNumMax+op], grid, block, args, 0, cudaStreamDefault));
return testSuccess;
}
// Inter-thread/process barrier+allreduce. The quality of the return value
// for average=0 (which means broadcast from rank=0) is dubious. The returned
// value will actually be the result of process-local broadcast from the local thread=0.
template<typename T>
__global__ void InitDataKernel(T* data, const size_t N, const int rep, const int rank) {
for (size_t o=blockIdx.x*blockDim.x+threadIdx.x; o<N; o+=gridDim.x*blockDim.x)
data[o] = testValue<T>(o, rep, rank);
}
void Allreduce(struct threadArgs* args, T* value, int average) {
thread_local int epoch = 0;
static pthread_mutex_t lock[2] = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_INITIALIZER};
static pthread_cond_t cond[2] = {PTHREAD_COND_INITIALIZER, PTHREAD_COND_INITIALIZER};
static T accumulator[2];
static int counter[2] = {0, 0};
static void* const initDataKerns[ncclNumTypes] = {
(void*)InitDataKernel< int8_t>,
(void*)InitDataKernel< uint8_t>,
(void*)InitDataKernel< int32_t>,
(void*)InitDataKernel<uint32_t>,
(void*)InitDataKernel< int64_t>,
(void*)InitDataKernel<uint64_t>,
(void*)InitDataKernel< half>,
(void*)InitDataKernel< float>,
(void*)InitDataKernel< double>,
#if defined(__CUDA_BF16_TYPES_EXIST__) && NCCL_VERSION_CODE >= NCCL_VERSION(2,10,0)
(void*)InitDataKernel<__nv_bfloat16>
#endif
};
template<typename T>
testResult_t InitDataType(void* dest, const size_t N, const int rep, const int rank) {
T* ptr = (T*)dest;
InitDataKernel<<<16, 512>>>(ptr, N, rep, rank);
return testSuccess;
}
testResult_t InitData(void* data, const size_t count, ncclDataType_t type, const int rep, const int rank) {
dim3 grid = { 32, 1, 1 };
dim3 block = { 256, 1, 1 };
void* args[4] = { (void*)&data, (void*)&count, (void*)&rep, (void*)&rank };
CUDACHECK(cudaLaunchKernel(initDataKerns[type], grid, block, args, 0, cudaStreamDefault));
return testSuccess;
}
void Barrier(struct threadArgs* args) {
while (args->barrier[args->barrier_idx] != args->thread) pthread_yield();
args->barrier[args->barrier_idx] = args->thread + 1;
if (args->thread+1 == args->nThreads) {
#ifdef MPI_SUPPORT
MPI_Barrier(MPI_COMM_WORLD);
#endif
args->barrier[args->barrier_idx] = 0;
pthread_mutex_lock(&lock[epoch]);
if(counter[epoch] == 0) {
if(average != 0 || args->thread == 0) accumulator[epoch] = *value;
} else {
while (args->barrier[args->barrier_idx]) pthread_yield();
}
args->barrier_idx=!args->barrier_idx;
}
// Inter-thread/process barrier+allreduce
void Allreduce(struct threadArgs* args, double* value, int average) {
while (args->barrier[args->barrier_idx] != args->thread) pthread_yield();
double val = *value;
if (args->thread > 0) {
double val2 = args->reduce[args->barrier_idx];
if (average == 1) val += val2;
if (average == 2) val = std::min(val, val2);
if (average == 3) val = std::max(val, val2);
}
if (average || args->thread == 0) args->reduce[args->barrier_idx] = val;
args->barrier[args->barrier_idx] = args->thread + 1;
if (args->thread+1 == args->nThreads) {
#ifdef MPI_SUPPORT
if (average != 0) {
MPI_Op op = average == 1 ? MPI_SUM : average == 2 ? MPI_MIN : MPI_MAX;
MPI_Allreduce(MPI_IN_PLACE, (void*)&args->reduce[args->barrier_idx], 1, MPI_DOUBLE, op, MPI_COMM_WORLD);
switch(average) {
case /*r0*/ 0: if(args->thread == 0) accumulator[epoch] = *value; break;
case /*avg*/1: accumulator[epoch] += *value; break;
case /*min*/2: accumulator[epoch] = std::min<T>(accumulator[epoch], *value); break;
case /*max*/3: accumulator[epoch] = std::max<T>(accumulator[epoch], *value); break;
case /*sum*/4: accumulator[epoch] += *value; break;
}
#endif
if (average == 1) args->reduce[args->barrier_idx] /= args->nProcs*args->nThreads;
args->reduce[1-args->barrier_idx] = 0;
args->barrier[args->barrier_idx] = 0;
} else {
while (args->barrier[args->barrier_idx]) pthread_yield();
}
*value = args->reduce[args->barrier_idx];
args->barrier_idx=!args->barrier_idx;
if(++counter[epoch] == args->nThreads)
pthread_cond_broadcast(&cond[epoch]);
if(args->thread+1 == args->nThreads) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
if(average != 0) {
static_assert(std::is_same<T, long long>::value || std::is_same<T, double>::value, "Allreduce<T> only for T in {long long, double}");
MPI_Datatype ty = std::is_same<T, long long>::value ? MPI_LONG_LONG :
std::is_same<T, double>::value ? MPI_DOUBLE :
MPI_Datatype();
MPI_Op op = average == 1 ? MPI_SUM :
average == 2 ? MPI_MIN :
average == 3 ? MPI_MAX :
average == 4 ? MPI_SUM : MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD);
}
#endif
if(average == 1) accumulator[epoch] /= args->nProcs*args->nThreads;
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
else {
while(counter[epoch] != 0)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
}
pthread_mutex_unlock(&lock[epoch]);
*value = accumulator[epoch];
epoch ^= 1;
}
testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, double *delta) {
testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int in_place, int64_t *wrongElts) {
int nranks = args->nProcs*args->nGpus*args->nThreads;
size_t count = args->expectedBytes/wordSize(type);
double maxDelta = 0.0;
int64_t *wrongPerGpu = nullptr;
CUDACHECK(cudaHostAlloc((void**)&wrongPerGpu, args->nGpus*sizeof(int64_t), cudaHostAllocMapped));
for (int i=0; i<args->nGpus; i++) {
int device;
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
NCCLCHECK(ncclCommCuDevice(args->comms[i], &device));
CUDACHECK(cudaSetDevice(device));
void *data = in_place ? ((void *)((uintptr_t)args->recvbuffs[i] + args->recvInplaceOffset*rank)) : args->recvbuffs[i];
TESTCHECK(CheckDelta(data , args->expected[i], count, type, args->deltaHost));
maxDelta = std::max(*(args->deltaHost), maxDelta);
#ifdef DEBUG_PRINT
if (rank == 0) {
int *expectedHost = (int *)malloc(args->expectedBytes);
int *dataHost = (int *)malloc(args->expectedBytes);
TESTCHECK(CheckDelta(data, args->expected[i], count, 0, type, op, 0, nranks, wrongPerGpu+i));
cudaMemcpy(expectedHost, args->expected[0], args->expectedBytes, cudaMemcpyDeviceToHost);
printf("\n Expected: ");
for(int j=0; j<args->expectedBytes/sizeof(int); j++) {
printf("%d:%d ", j, expectedHost[j]);
}
printf("\n");
#if 1 && DEBUG_PRINT
if (args->reportErrors && wrongPerGpu[i] != 0) {
printf("rank=%d #wrong=%d\n", rank, (int)wrongPerGpu[i]);
char *expectedHost = (char*)malloc(args->expectedBytes);
char *dataHost = (char*)malloc(args->expectedBytes);
int eltsz = wordSize(type);
cudaMemcpy(expectedHost, args->expected[i], args->expectedBytes, cudaMemcpyDeviceToHost);
cudaMemcpy(dataHost, data, args->expectedBytes, cudaMemcpyDeviceToHost);
cudaMemcpy(dataHost, data, args->expectedBytes, cudaMemcpyDeviceToHost);
printf("\n Actual: ");
for (int j=0; j<args->expectedBytes/sizeof(int); j++) {
printf("%d:%d ", j, dataHost[j]);
}
printf("\n");
free(expectedHost);
free(dataHost);
for(int j=0; j<args->expectedBytes/eltsz; j++) {
unsigned long long want, got;
want = 0;
memcpy(&want, expectedHost + j*eltsz, eltsz);
got = 0;
memcpy(&got, dataHost + j*eltsz, eltsz);
if(want != got) {
printf(" rank=%d elt[%d]: want=0x%llx got=0x%llx\n", rank, j, want, got);
}
}
free(expectedHost);
free(dataHost);
}
#endif
}
double nranks = args->nProcs*args->nThreads*args->nGpus;
if (args->reportErrors && maxDelta > DeltaMaxValue(type)*(nranks - 1)) args->errors[0]++;
*delta = maxDelta;
*wrongElts = 0;
for (int i=0; i < args->nGpus; i++) *wrongElts += wrongPerGpu[i];
cudaFree(wrongPerGpu);
if (args->reportErrors && *wrongElts) args->errors[0]++;
return testSuccess;
}
@ -503,7 +298,7 @@ testResult_t testStreamSynchronize(int ngpus, cudaStream_t* streams, ncclComm_t*
}
// We might want to let other threads (including NCCL threads) use the CPU.
if (idle) pthread_yield();
if (idle) sched_yield();
}
free(done);
return testSuccess;
@ -541,19 +336,18 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
__nv_bfloat16 bf16;
#endif
};
int scalar = preMulScalar(rank);
switch(type) {
case ncclInt8: i8 = int8_t(scalar); break;
case ncclUint8: u8 = uint8_t(scalar); break;
case ncclInt32: i32 = int32_t(scalar); break;
case ncclUint32: u32 = uint32_t(scalar); break;
case ncclInt64: i64 = int32_t(scalar); break;
case ncclUint64: u64 = uint32_t(scalar); break;
case ncclFloat16: f16 = __float2half(float(scalar)); break;
case ncclFloat32: f32 = float(scalar); break;
case ncclFloat64: f64 = double(scalar); break;
case ncclInt8: i8 = ncclVerifiablePremulScalar<int8_t>(rank); break;
case ncclUint8: u8 = ncclVerifiablePremulScalar<uint8_t>(rank); break;
case ncclInt32: i32 = ncclVerifiablePremulScalar<int32_t>(rank); break;
case ncclUint32: u32 = ncclVerifiablePremulScalar<uint32_t>(rank); break;
case ncclInt64: i64 = ncclVerifiablePremulScalar<int64_t>(rank); break;
case ncclUint64: u64 = ncclVerifiablePremulScalar<uint64_t>(rank); break;
case ncclFloat16: f16 = ncclVerifiablePremulScalar<half>(rank); break;
case ncclFloat32: f32 = ncclVerifiablePremulScalar<float>(rank); break;
case ncclFloat64: f64 = ncclVerifiablePremulScalar<double>(rank); break;
#if defined(__CUDA_BF16_TYPES_EXIST__)
case ncclBfloat16: bf16 = __float2bfloat16(float(scalar)); break;
case ncclBfloat16: bf16 = ncclVerifiablePremulScalar<__nv_bfloat16>(rank); break;
#endif
}
NCCLCHECK(ncclRedOpCreatePreMulSum(&op, &u64, type, ncclScalarHostImmediate, args->comms[i]));
@ -607,9 +401,10 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
if (cudaGraphLaunches >= 1) {
// Begin cuda graph capture
for (int i=0; i<args->nGpus; i++) {
// Thread local mode is needed for:
// - Multi-thread mode
// - P2P pre-connect
// Thread local mdoe is needed for:
// - Multi-thread mode: where graph capture and instantiation can happen concurrently across threads
// - P2P pre-connect: when there is no warm-up, P2P pre-connect is done during graph capture.
// Since pre-connect calls cudaMalloc, we cannot use global capture mode
CUDACHECK(cudaStreamBeginCapture(args->streams[i], cudaStreamCaptureModeThreadLocal));
}
}
@ -669,7 +464,7 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
Barrier(args);
double maxDelta = 0;
int64_t wrongElts = 0;
static __thread int rep = 0;
rep++;
if (datacheck) {
@ -717,10 +512,12 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
}
#endif
TESTCHECK(CheckData(args, type, op, root, in_place, &maxDelta));
TESTCHECK(CheckData(args, type, op, root, in_place, &wrongElts));
//aggregate delta from all threads and procs
Allreduce(args, &maxDelta, 3);
long long wrongElts1 = wrongElts;
Allreduce(args, &wrongElts1, /*sum*/4);
wrongElts = wrongElts1;
}
double timeUsec = deltaSec*1.0E6;
@ -733,9 +530,9 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
sprintf(timeStr, "%7.2f", timeUsec);
}
if (datacheck) {
PRINT(" %7s %6.2f %6.2f %5.0le", timeStr, algBw, busBw, maxDelta);
PRINT(" %7s %6.2f %6.2f %5g", timeStr, algBw, busBw, (double)wrongElts);
} else {
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
PRINT(" %7s %6.2f %6.2f %5s", timeStr, algBw, busBw, "N/A");
}
args->bw[0] += busBw;
@ -775,7 +572,9 @@ testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char*
// Benchmark
for (size_t size = args->minbytes; size<=args->maxbytes; size = ((args->stepfactor > 1) ? size*args->stepfactor : size+args->stepbytes)) {
setupArgs(size, type, args);
print_line_header(max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, root);
char rootName[100];
sprintf(rootName, "%6i", root);
PRINT("%12li %12li %8s %6s %6s", max(args->sendBytes, args->expectedBytes), args->nbytes / wordSize(type), typeName, opName, rootName);
TESTCHECK(BenchTime(args, type, op, root, 0));
TESTCHECK(BenchTime(args, type, op, root, 1));
PRINT("\n");
@ -828,7 +627,7 @@ testResult_t threadLaunch(struct testThread* thread) {
return testSuccess;
}
testResult_t AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, size_t nbytes, int nranks) {
testResult_t AllocateBuffs(void **sendbuff, size_t sendBytes, void **recvbuff, size_t recvBytes, void **expected, size_t nbytes) {
CUDACHECK(cudaMalloc(sendbuff, nbytes));
CUDACHECK(cudaMalloc(recvbuff, nbytes));
if (datacheck) CUDACHECK(cudaMalloc(expected, recvBytes));
@ -1027,8 +826,10 @@ testResult_t run() {
#endif
is_main_thread = (proc == 0) ? 1 : 0;
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d validation: %d \n", nThreads, nGpus, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes", warmup_iters, iters, datacheck);
PRINT("# nThread %d nGpus %d minBytes %ld maxBytes %ld step: %ld(%s) warmup iters: %d iters: %d agg iters: %d validation: %d graph: %d\n",
nThreads, nGpus, minBytes, maxBytes,
(stepFactor > 1)?stepFactor:stepBytes, (stepFactor > 1)?"factor":"bytes",
warmup_iters, iters, agg_iters, datacheck, cudaGraphLaunches);
if (blocking_coll) PRINT("# Blocking Enabled: wait for completion and barrier after each collective \n");
if (parallel_init) PRINT("# Parallel Init Enabled: threads call into NcclInitRank concurrently \n");
PRINT("#\n");
@ -1087,7 +888,7 @@ testResult_t run() {
for (int i=0; i<nGpus*nThreads; i++) {
CUDACHECK(cudaSetDevice(localRank*nThreads*nGpus+i));
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes, nProcs*nThreads*nGpus));
TESTCHECK(AllocateBuffs(sendbuffs+i, sendBytes, recvbuffs+i, recvBytes, expected+i, (size_t)maxBytes));
CUDACHECK(cudaStreamCreateWithFlags(streams+i, cudaStreamNonBlocking));
}
@ -1119,11 +920,11 @@ testResult_t run() {
}
PRINT("#\n");
print_header();
int* sync = (int*)calloc(2, sizeof(int));
int* barrier = (int*)calloc(2, sizeof(int));
double* reduce = (double*)calloc(2, sizeof(double));
PRINT("# %10s %12s %8s %6s %6s out-of-place in-place \n", "", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %6s %7s %6s %6s %6s\n", "size", "count", "type", "redop", "root",
"time", "algbw", "busbw", "#wrong", "time", "algbw", "busbw", "#wrong");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
struct testThread threads[nThreads];
memset(threads, 0, sizeof(struct testThread)*nThreads);
@ -1147,12 +948,6 @@ testResult_t run() {
threads[t].args.comms=comms+t*nGpus;
threads[t].args.streams=streams+t*nGpus;
threads[t].args.barrier = (volatile int*)barrier;
threads[t].args.barrier_idx = 0;
threads[t].args.reduce = (volatile double*)reduce;
threads[t].args.sync = (volatile int*)sync;
threads[t].args.sync_idx = 0;
threads[t].args.deltaHost = (delta + t*NUM_BLOCKS);
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
threads[t].args.bw_count=bw_count+t;

View File

@ -28,6 +28,21 @@
} \
} while(0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,12,10)
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
char hostname[1024]; \
getHostName(hostname, 1024); \
printf("%s: Test NCCL failure %s:%d " \
"'%s / %s'\n", \
hostname,__FILE__,__LINE__, \
ncclGetErrorString(res), \
ncclGetLastError(NULL)); \
return testNcclError; \
} \
} while(0)
#else
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
@ -39,6 +54,7 @@
return testNcclError; \
} \
} while(0)
#endif
typedef enum {
testSuccess = 0,
@ -111,14 +127,6 @@ struct threadArgs {
void** expected;
size_t expectedBytes;
volatile int* sync;
int sync_idx;
volatile int* barrier;
int barrier_idx;
volatile double* reduce;
int syncRank;
int syncNranks;
double* deltaHost;
int* errors;
double* bw;
int* bw_count;
@ -141,8 +149,8 @@ struct testThread {
// Provided by common.cu
extern void Barrier(struct threadArgs* args);
extern testResult_t TimeTest(struct threadArgs* args, ncclDataType_t type, const char* typeName, ncclRedOp_t op, const char* opName, int root);
extern testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const int rep, const int nranks);
extern testResult_t InitData(void* data, const size_t count, ncclDataType_t type, const int rep, const int rank);
extern testResult_t InitDataReduce(void* data, const size_t count, const size_t offset, ncclDataType_t type, ncclRedOp_t op, const uint64_t seed, const int nranks);
extern testResult_t InitData(void* data, const size_t count, size_t offset, ncclDataType_t type, ncclRedOp_t op, const uint64_t seed, const int nranks, const int rank);
extern void AllocateBuffs(void **sendbuff, void **recvbuff, void **expected, void **expectedHost, size_t nbytes, int nranks);
// Provided by each coll
@ -228,7 +236,7 @@ static size_t wordSize(ncclDataType_t type) {
case ncclInt64:
case ncclUint64:
case ncclDouble:
//case ncclFloat64:
//case ncclFloat64:
return 8;
default: return 0;
}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "root",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6i", size, count, typeName, root);
}
void GatherGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count/nranks;
*recvcount = (count/nranks)*nranks;
@ -38,12 +26,10 @@ testResult_t GatherInitData(struct threadArgs* args, ncclDataType_t type, ncclRe
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault));
if (rank == root) {
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
}
TESTCHECK(InitData(args->expected[i], nranks*sendcount, 0, type, ncclSum, rep, 1, 0));
}
CUDACHECK(cudaDeviceSynchronize());
}

View File

@ -9,18 +9,6 @@
#define ALIGN 4
void print_header() {
PRINT("# %10s %12s %8s out-of-place in-place \n", "", "", "");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s", size, count, typeName);
}
void HyperCubeGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
size_t base = (count/(ALIGN*nranks))*ALIGN;
*sendcount = base;
@ -41,9 +29,9 @@ testResult_t HyperCubeInitData(struct threadArgs* args, ncclDataType_t type, ncc
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? ((char*)args->recvbuffs[i])+rank*args->sendBytes : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0));
for (int j=0; j<nranks; j++) {
TESTCHECK(InitData(((char*)args->expected[i])+args->sendBytes*j, sendcount, type, rep, j));
TESTCHECK(InitData((char*)args->expected[i] + args->sendBytes*j, sendcount, 0, type, ncclSum, 33*rep + j, 1, 0));
}
CUDACHECK(cudaDeviceSynchronize());
}
@ -110,9 +98,16 @@ testResult_t HyperCubeRunTest(struct threadArgs* args, int root, ncclDataType_t
run_typenames = test_typenames;
}
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", -1));
// Check if this is a power of 2
int nRanks = args->nProcs*args->nThreads*args->nGpus;
if (nRanks && !(nRanks & (nRanks - 1))) {
for (int i=0; i<type_count; i++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", -1));
}
} else {
printf("nRanks %d is not a power of 2, skipping\n", nRanks);
}
return testSuccess;
}

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop", "root",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6s %6i", size, count, typeName, opName, root);
}
void ReduceGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
@ -38,7 +26,7 @@ testResult_t ReduceInitData(struct threadArgs* args, ncclDataType_t type, ncclRe
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault));
if (rank == root) TESTCHECK(InitDataReduce(args->expected[i], recvcount, 0, type, op, rep, nranks));
CUDACHECK(cudaDeviceSynchronize());

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "redop",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6s", size, count, typeName, opName);
}
void ReduceScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = (count/nranks)*nranks;
*recvcount = count/nranks;
@ -38,7 +26,7 @@ testResult_t ReduceScatterInitData(struct threadArgs* args, ncclDataType_t type,
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, 0, type, op, rep, nranks, rank));
CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault));
TESTCHECK(InitDataReduce(args->expected[i], recvcount, rank*recvcount, type, op, rep, nranks));
CUDACHECK(cudaDeviceSynchronize());

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "root",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s %6i", size, count, typeName, root);
}
void ScatterGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = (count/nranks)*nranks;
*recvcount = count/nranks;
@ -37,8 +25,8 @@ testResult_t ScatterInitData(struct threadArgs* args, ncclDataType_t type, ncclR
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
if (rank == root) TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(args->expected[i], recvcount, type, rep+rank*recvcount, root));
if (rank == root) TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, rep, 1, 0));
TESTCHECK(InitData(args->expected[i], recvcount, rank*recvcount, type, ncclSum, rep, 1, 0));
CUDACHECK(cudaDeviceSynchronize());
}
return testSuccess;

View File

@ -7,18 +7,6 @@
#include "cuda_runtime.h"
#include "common.h"
void print_header() {
PRINT("# %10s %12s %8s out-of-place in-place \n", "", "", "");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}
void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s", size, count, typeName);
}
void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = count;
*recvcount = count;
@ -38,9 +26,9 @@ testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, nccl
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i);
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
TESTCHECK(InitData(data, sendcount, rank*sendcount, type, ncclSum, rep, 1, 0));
int peer = (rank-1+nranks)%nranks;
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, peer));
TESTCHECK(InitData(args->expected[i], recvcount, peer*recvcount, type, ncclSum, rep, 1, 0));
CUDACHECK(cudaDeviceSynchronize());
}
// We don't support in-place sendrecv

24
verifiable/Makefile Normal file
View File

@ -0,0 +1,24 @@
include ../../makefiles/common.mk
.PHONY: all clean
BUILDDIR := $(abspath ../../build)
NCCLDIR := $(BUILDDIR)
NVCUFLAGS += -I$(NCCLDIR)/include/ -I../include
DST_DIR := $(BUILDDIR)/test/verifiable
all: $(DST_DIR)/self_test $(DST_DIR)/verifiable.o
clean:
rm -rf $(DST_DIR)
TEST_VERIFIABLE_SRCDIR := .
TEST_VERIFIABLE_BUILDDIR := $(DST_DIR)
include verifiable.mk
self_test: $(DST_DIR)/self_test
$(DST_DIR)/self_test: verifiable.cu verifiable.h
@printf "Linking %s\n" $@
@mkdir -p $(DST_DIR)
$(NVCC) -o $@ $(NVCUFLAGS) -DSELF_TEST=1 verifiable.cu $(NVLDFLAGS)

View File

@ -0,0 +1,177 @@
/* Generate parameters for our error bound model of floating point average
* (sum of scaled values) by sampling sums of random sequences for each
* floating point type.
*
* The model has parameters "coef" and "power", where for two floats a & b,
* they are close enough if and only if:
* abs(intBits(a) - intBits(b)) <= 1 + coef*pow(rank_n, power);
*
* Where intBits(x) is the reinterpretation of the float bitpattern as an integer.
*
* Compile with:
* nvcc -gencode=arch=compute_80,code=sm_80
*/
#include <algorithm>
#include <cmath>
#include <cstdio>
#include <cstdint>
#include <cuda_bf16.h>
#include <cuda_fp16.h>
using std::uint64_t;
using std::uint32_t;
using bfloat16 = __nv_bfloat16;
template<typename T>
struct float_traits;
template<>
struct float_traits<float> {
static constexpr int mantissa_bits = 23;
static constexpr int exponent_bits = 8;
using uint_t = uint32_t;
__device__ static float make(double x) { return (float)x; }
__device__ static float make(uint64_t x) { return (float)x; }
__device__ static double todouble(float x) { return x; }
__device__ static float add(float a, float b) { return a+b; }
__device__ static float mul(float a, float b) { return a*b; }
};
template<>
struct float_traits<double> {
static constexpr int mantissa_bits = 52;
static constexpr int exponent_bits = 11;
using uint_t = uint64_t;
__device__ static double make(double x) { return x; }
__device__ static double make(uint64_t x) { return (double)x; }
__device__ static double todouble(double x) { return x; }
__device__ static double add(double a, double b) { return a+b; }
__device__ static double mul(double a, double b) { return a*b; }
};
template<>
struct float_traits<half> {
static constexpr int mantissa_bits = 10;
static constexpr int exponent_bits = 5;
using uint_t = uint16_t;
__device__ static half make(double x) { return __double2half(x); }
__device__ static half make(uint64_t x) { return __int2half_rn(x); }
__device__ static double todouble(half x) { return __half2float(x); }
__device__ static half add(half a, half b) { return __hadd(a, b); }
__device__ static half mul(half a, half b) { return __hmul(a, b); }
};
template<>
struct float_traits<bfloat16> {
static constexpr int mantissa_bits = 7;
static constexpr int exponent_bits = 8;
using uint_t = uint16_t;
__device__ static bfloat16 make(double x) { return __double2bfloat16(x); }
__device__ static bfloat16 make(uint64_t x) { return __int2bfloat16_rn(x); }
__device__ static double todouble(bfloat16 x) { return __bfloat162float(x); }
__device__ static bfloat16 add(bfloat16 a, bfloat16 b) { return __hadd(a, b); }
__device__ static bfloat16 mul(bfloat16 a, bfloat16 b) { return __hmul(a, b); }
};
template<typename F>
__device__ int compare(F a, F b) {
union { typename float_traits<F>::uint_t ua; F fa; };
union { typename float_traits<F>::uint_t ub; F fb; };
ua=0; ub=0;
fa=a; fb=b;
//std::printf("bits(%1.10f)=%x bits(%1.10f)=%x\n", fa, ua, fb, ub);
return ua < ub ? ub-ua : ua-ub;
}
struct xoshiro256ss {
uint64_t s[4];
__device__ xoshiro256ss(int seed) {
constexpr uint64_t src[4] = {0xbb99e851d1f545cc, 0xbfc4022389ca40cb, 0xe84aff5cb1914af5, 0x845999858284de77};
for(int i=0; i < 4; i++)
s[i] = src[i] + (seed + i)*0xb45de8a52fdb65d3;
}
__device__ uint64_t operator()() {
auto rol64 = [](uint64_t x, int k) {
return (x << k) | (x >> (64 - k));
};
uint64_t const result = rol64(s[1] * 5, 7) * 9;
uint64_t const t = s[1] << 17;
s[2] ^= s[0];
s[3] ^= s[1];
s[1] ^= s[2];
s[0] ^= s[3];
s[2] ^= t;
s[3] = rol64(s[3], 45);
return result;
}
};
template<typename F>
__global__ void kernel() {
using traits = float_traits<F>;
constexpr int samps = 4<<10;
__shared__ F accf[samps];
__shared__ double accd[samps];
xoshiro256ss rng(threadIdx.x);
float expo_avg = 1;
for(int pass=0; pass < 2; pass++) {
F scalar = traits::make(1.0/(3.14159 + .5*threadIdx.x));
int err_max = 0;
float coef = 0;
double expo_sum = 0;
int expo_n = 0;
int max_ranks = std::is_same<F,float>::value ? 16<<10 : 1<<traits::mantissa_bits;
for(int round=0; round < 1 + (16<<10)/max_ranks; round++) {
//for(int round=0; round < 2; round++) {
for(int i=threadIdx.x; i < samps; i += blockDim.x) {
accf[i] = 0;
accd[i] = 0;
}
__syncthreads();
for(int r=0; r < max_ranks; r++) {
int err = 0;
for(int i=threadIdx.x; i < samps; i+=blockDim.x) {
constexpr uint64_t m = (1ll<<traits::mantissa_bits)-1;
double d = std::is_same<F,float>::value ? double(rng() & m) : 1.0;
F f = traits::make(d);
accf[i] = traits::add(accf[i], traits::mul(scalar, f));
accd[i] += traits::todouble(f);
//if(threadIdx.x==0 && std::is_same<F,half>::value) std::printf(" r=%d f=%f\n", r, traits::todouble(accf[i]));
int e = compare(accf[i], traits::mul(scalar, traits::make(accd[i])));
err = err > e ? err : e;
}
err = __reduce_max_sync(-1u, err);
err_max = err_max > err ? err_max : err;
if (r >= 2) {
// err = 1 + coef*pow(r,expo)
float c = float(err-1)/powf(float(r), expo_avg);
coef = coef > c ? coef : c;
}
if (r >= 2) {
double expo = log2f(1+err_max)/log2f(r);
expo_sum += expo;
expo_n++;
//if(threadIdx.x==0 && std::is_same<F,half>::value) std::printf(" r=%d err=%d errmax=%d expo=%f sum=%f n=%d\n", r, err, err_max, expo, expo_sum, expo_n);
}
}
}
if(pass==0)
expo_avg = expo_sum/expo_n;
else if(threadIdx.x == 0)
std::printf(" coef=%1.10f expo=%1.10f\n", coef, expo_avg);
}
}
int main() {
std::printf("type=float:\n");
kernel<float><<<1,32>>>();
cudaDeviceSynchronize();
std::printf("\ntype=half:\n");
kernel<half><<<1,32>>>();
cudaDeviceSynchronize();
std::printf("\ntype=bfloat16:\n");
kernel<bfloat16><<<1,32>>>();
cudaDeviceSynchronize();
return 0;
}

1227
verifiable/verifiable.cu Normal file

File diff suppressed because it is too large Load Diff

59
verifiable/verifiable.h Normal file
View File

@ -0,0 +1,59 @@
#ifndef _d41d8cd98f00b204e9800998ecf8427e
#define _d41d8cd98f00b204e9800998ecf8427e
#include <cuda_runtime.h>
#include <stdint.h>
/* Routines for launching kernels that verify reduction results. A significant
* feature of these routines is they carefully craft floating point input
* to produce exactly predictable output.
*
* int elt_ty: actually just a ncclDataType_t
*
* int red_op: mostly just a ncclRedOp_t. Since PreMulSum ops are dynamically
* created, these are encoded as the value ncclNumOps and their scalar is
* assumed to be `ncclVerifiablePremulScalar(rank_me)`
*
* uint64_t seed: arbitrary 64-bits to use in seeding the random values
*
* intptr_t elt_ix0: index of first element pointed to by elts when generating
* random values. This makes it possible to generate subsequences independently
* as well as in aggregate.
*
* int rank_n: Number of contributions into the reduction. Non-reduction
* collectives like broadcast, gather, etc will always set this to one.
*
* int rank_me: Index of this contribution
*/
// Use this as the local scalar for PreMulSum ops
template<typename T>
__host__ __device__ T ncclVerifiablePremulScalar(int rank_me) {
return T(rank_me%2 == 0 ? 1.0f : 2.0f);
}
// Enqueue kernel to generate data which is to be reduced.
void ncclVerifiablePrepareInput(
void *elts, intptr_t elt_n, int elt_ty, int red_op, int rank_n, int rank_me,
uint64_t seed, intptr_t elt_ix0, cudaStream_t stream
);
// Enqueue kernel to generate expected results of reduction.
void ncclVerifiablePrepareExpected(
void *elts, intptr_t elt_n, int elt_ty, int red_op, int rank_n,
uint64_t seed, intptr_t elt_ix0, cudaStream_t stream
);
// Enqueue kernel to verify reduced data matches expectation. The number of
// failed elements is written to bad_elt_n which must be in cudaHost memory.
// If `expected == nullptr` then the expected results are generated on-the-fly
// which can be costly. Thus if you plan to run the same reduction multiple
// times it is advantageous to precompute the expected values with
// ncclVerifiablePrepareExpected and pass them as `expected` here.
void ncclVerifiableVerify(
void const *results, void const *expected, intptr_t elt_n, int elt_ty,
int red_op, int rank_n, uint64_t seed, intptr_t elt_ix0,
int64_t *bad_elt_n, cudaStream_t stream
);
#endif

11
verifiable/verifiable.mk Normal file
View File

@ -0,0 +1,11 @@
# We requires both of the following paths to be set upon including this makefile
# TEST_VERIFIABLE_SRCDIR = <points to this directory>
# TEST_VERIFIABLE_BUILDDIR = <points to destination of .o file>
TEST_VERIFIABLE_HDRS = $(TEST_VERIFIABLE_SRCDIR)/verifiable.h
TEST_VERIFIABLE_OBJS = $(TEST_VERIFIABLE_BUILDDIR)/verifiable.o
$(TEST_VERIFIABLE_BUILDDIR)/verifiable.o: $(TEST_VERIFIABLE_SRCDIR)/verifiable.cu $(TEST_VERIFY_REDUCE_HDRS)
@printf "Compiling %s\n" $@
@mkdir -p $(TEST_VERIFIABLE_BUILDDIR)
$(NVCC) -o $@ $(NVCUFLAGS) -c $(TEST_VERIFIABLE_SRCDIR)/verifiable.cu