diff --git a/src/common.cu b/src/common.cu index 9dff7af..73e0279 100644 --- a/src/common.cu +++ b/src/common.cu @@ -278,7 +278,10 @@ void Barrier(struct threadArgs *args) { while(counter[epoch] != args->nThreads) pthread_cond_wait(&cond[epoch], &lock[epoch]); #ifdef MPI_SUPPORT - MPI_Barrier(MPI_COMM_WORLD); + { + MPI_Comm bc = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD; + MPI_Barrier(bc); + } #endif counter[epoch] = 0; pthread_cond_broadcast(&cond[epoch]); @@ -332,11 +335,12 @@ void Allreduce(struct threadArgs* args, T* value, int average) { average == 2 ? MPI_MIN : average == 3 ? MPI_MAX : average == 4 ? MPI_SUM : MPI_Op(); - MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD); + MPI_Comm ac = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD; + MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, ac); } #endif - if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads; + if(average == 1) accumulator[epoch] /= args->nProcs*args->nThreads; counter[epoch] = 0; pthread_cond_broadcast(&cond[epoch]); } @@ -618,6 +622,23 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches; Allreduce(args, &deltaSec, average); + // Save per-pair deltaSec for sv_data before global averaging + double pairDeltaSec = deltaSec; + +#ifdef MPI_SUPPORT + // When using split sub-communicator, the Allreduce above only averages within + // the pair. Do a second global Allreduce so the original output reflects all pairs. + if (args->mpi_comm != MPI_COMM_NULL && args->thread+1 == args->nThreads) { + // deltaSec was already averaged within the pair; each pair's nProcs procs + // hold the same pair_avg. MPI_SUM across totalProcs procs gives + // nProcs * sum_of_pair_avgs; dividing by totalProcs yields the global average. + double globalDelta = deltaSec; + MPI_Allreduce(MPI_IN_PLACE, &globalDelta, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); + globalDelta /= args->totalProcs; + deltaSec = globalDelta; + } +#endif + #if CUDART_VERSION >= 11030 if (cudaGraphLaunches >= 1) { //destroy cuda graph @@ -696,6 +717,49 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t args->bw[0] += busBw; args->bw_count[0]++; + // Record per-size full perf data for split verbose mode (using per-pair values) + if (args->sv_sizes) { + // Compute per-pair bandwidth from pairDeltaSec (not global deltaSec) + double pairAlgBw, pairBusBw; + args->collTest->getBw(count, wordSize(type), pairDeltaSec, &pairAlgBw, &pairBusBw, args->nProcs*args->nThreads*args->nGpus); + double pairTimeUsec = pairDeltaSec * 1.0E6; + + size_t msgSize = max(args->sendBytes, args->expectedBytes); + size_t svCount = args->nbytes / wordSize(type); + // Find or create entry for this size + int idx = -1; + for (int i = 0; i < args->sv_nSizes; i++) { + if (args->sv_sizes[i] == msgSize) { idx = i; break; } + } + if (idx < 0) { + // New size entry — grow arrays if needed + if (args->sv_nSizes >= args->sv_maxSizes) { + int newMax = args->sv_maxSizes ? args->sv_maxSizes * 2 : 16; + args->sv_sizes = (size_t*)realloc(args->sv_sizes, newMax * sizeof(size_t)); + args->sv_counts = (size_t*)realloc(args->sv_counts, newMax * sizeof(size_t)); + args->sv_data = (double*)realloc(args->sv_data, newMax * 6 * sizeof(double)); + for (int i = args->sv_maxSizes; i < newMax; i++) { + args->sv_sizes[i] = 0; args->sv_counts[i] = 0; + for (int k = 0; k < 6; k++) args->sv_data[i * 6 + k] = 0.0; + } + args->sv_maxSizes = newMax; + } + idx = args->sv_nSizes++; + args->sv_sizes[idx] = msgSize; + args->sv_counts[idx] = svCount; + for (int k = 0; k < 6; k++) args->sv_data[idx * 6 + k] = 0.0; + } + int base = idx * 6; + if (in_place == 0) { + args->sv_data[base + 0] = pairTimeUsec; + args->sv_data[base + 1] = pairAlgBw; + args->sv_data[base + 2] = pairBusBw; + } else { + args->sv_data[base + 3] = pairTimeUsec; + args->sv_data[base + 4] = pairAlgBw; + args->sv_data[base + 5] = pairBusBw; + } + } return testSuccess; } @@ -1256,8 +1320,11 @@ testResult_t run() { } char *splitMaskEnv = NULL; + int splitNumColors = 0; // total number of split groups (0 means no split) if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT_MASK")) { - color = proc & strtoul(splitMaskEnv, NULL, 16); + unsigned long mask = strtoul(splitMaskEnv, NULL, 16); + color = proc & mask; + splitNumColors = (int)(mask + 1); } else if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT")) { if ( (strncasecmp(splitMaskEnv, "AND", strlen("AND")) == 0 && parseInt(splitMaskEnv + strlen("AND"), &color)) || @@ -1503,6 +1570,9 @@ testResult_t run() { threads[t].args.totalProcs=totalProcs; threads[t].args.nProcs=ncclProcs; threads[t].args.proc=ncclProc; +#ifdef MPI_SUPPORT + threads[t].args.mpi_comm=mpi_comm; +#endif threads[t].args.nThreads=nThreads; threads[t].args.thread=t; threads[t].args.nGpus=nGpus; @@ -1524,6 +1594,21 @@ testResult_t run() { threads[t].args.errors=errors+t; threads[t].args.bw=bw+t; threads[t].args.bw_count=bw_count+t; + // Per-size tracking: only allocate when split verbose is enabled + threads[t].args.sv_sizes=NULL; + threads[t].args.sv_counts=NULL; + threads[t].args.sv_data=NULL; + threads[t].args.sv_nSizes=0; + threads[t].args.sv_maxSizes=0; + if (splitNumColors > 0) { + char* vEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE"); + if (vEnv && atoi(vEnv) != 0) { + threads[t].args.sv_maxSizes = 16; + threads[t].args.sv_sizes = (size_t*)calloc(16, sizeof(size_t)); + threads[t].args.sv_counts = (size_t*)calloc(16, sizeof(size_t)); + threads[t].args.sv_data = (double*)calloc(16 * 6, sizeof(double)); + } + } threads[t].args.initGpuMem = initGpuMem + t; threads[t].args.bufferMemory = bufferMemory + t; threads[t].args.devMemUsed = devMemUsed + t; @@ -1545,6 +1630,20 @@ testResult_t run() { errors[0] += errors[t]; bw[0] += bw[t]; bw_count[0] += bw_count[t]; + // Merge per-size data from thread t into thread 0 + if (threads[t].args.sv_sizes && threads[0].args.sv_sizes) { + for (int s = 0; s < threads[t].args.sv_nSizes; s++) { + // Find matching size in thread 0 + int idx = -1; + for (int s0 = 0; s0 < threads[0].args.sv_nSizes; s0++) { + if (threads[0].args.sv_sizes[s0] == threads[t].args.sv_sizes[s]) { idx = s0; break; } + } + if (idx >= 0) { + for (int k = 0; k < 6; k++) + threads[0].args.sv_data[idx * 6 + k] += threads[t].args.sv_data[s * 6 + k]; + } + } + } devMemUsed[0] = std::max(devMemUsed[0], devMemUsed[t]); initGpuMem[0] = std::max(initGpuMem[0], initGpuMem[t]); bufferMemory[0] = std::max(bufferMemory[0], bufferMemory[t]); @@ -1594,6 +1693,155 @@ testResult_t run() { const double check_avg_bw = envstr ? atof(envstr) : -1; bw[0] /= bw_count[0]; +#ifdef MPI_SUPPORT + // Per-split bandwidth report: gather each split group's full perf data to proc 0 + // and print individual GPU-pair results in the same format as the original nccl-tests output. + // Activated by setting NCCL_TESTS_SPLIT_VERBOSE=1 alongside NCCL_TESTS_SPLIT_MASK. + { + char* verboseEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE"); + if (verboseEnv && atoi(verboseEnv) != 0 && splitNumColors > 0) { + int numColors = splitNumColors; + int nSizes = threads[0].args.sv_nSizes; + + // Synchronize nSizes across all procs + MPI_Allreduce(MPI_IN_PLACE, &nSizes, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); + + if (nSizes > 0) { + // Prepare local arrays aligned to proc 0's size order + size_t* localSizes = (size_t*)calloc(nSizes, sizeof(size_t)); + size_t* localCounts = (size_t*)calloc(nSizes, sizeof(size_t)); + double* localData = (double*)calloc(nSizes * 6, sizeof(double)); + if (threads[0].args.sv_sizes) { + int myN = threads[0].args.sv_nSizes; + for (int s = 0; s < myN && s < nSizes; s++) { + localSizes[s] = threads[0].args.sv_sizes[s]; + localCounts[s] = threads[0].args.sv_counts[s]; + for (int k = 0; k < 6; k++) + localData[s * 6 + k] = threads[0].args.sv_data[s * 6 + k]; + } + } + + // Broadcast sizes and counts from proc 0 so all procs know the order + MPI_Bcast(localSizes, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD); + MPI_Bcast(localCounts, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD); + + // Reorder local data to match proc 0's size order + double* orderedData = (double*)calloc(nSizes * 6, sizeof(double)); + if (threads[0].args.sv_sizes) { + for (int s = 0; s < nSizes; s++) { + for (int ls = 0; ls < threads[0].args.sv_nSizes; ls++) { + if (threads[0].args.sv_sizes[ls] == localSizes[s]) { + for (int k = 0; k < 6; k++) + orderedData[s * 6 + k] = threads[0].args.sv_data[ls * 6 + k]; + break; + } + } + } + } + + // Gather all procs' data to proc 0 + double* allData = NULL; + if (proc == 0) { + allData = (double*)calloc(totalProcs * nSizes * 6, sizeof(double)); + } + MPI_Gather(orderedData, nSizes * 6, MPI_DOUBLE, allData, nSizes * 6, MPI_DOUBLE, 0, MPI_COMM_WORLD); + + if (proc == 0) { + // Get type/op/root names for output (same for all sizes in a run) + const char* typeName = test_typenames[nccltype]; + const char* opName = test_opnames[ncclop]; + + // Compute per-size max busBw across all pairs (for SLOW detection) + double* maxBusBwOop = (double*)calloc(nSizes, sizeof(double)); + double* maxBusBwIp = (double*)calloc(nSizes, sizeof(double)); + for (int s = 0; s < nSizes; s++) { + for (int c = 0; c < numColors; c++) { + int off = c * nSizes * 6 + s * 6; + if (allData[off + 2] > maxBusBwOop[s]) maxBusBwOop[s] = allData[off + 2]; + if (allData[off + 5] > maxBusBwIp[s]) maxBusBwIp[s] = allData[off + 5]; + } + } + + int largestIdx = nSizes - 1; + + printf("#\n"); + printf("# ============= Per-GPU-pair Bandwidth Breakdown =============\n"); + + for (int c = 0; c < numColors; c++) { + // Determine SLOW based on largest size busBw ratio + int off_lg = c * nSizes * 6 + largestIdx * 6; + double rLgOop = (maxBusBwOop[largestIdx] > 0) ? allData[off_lg + 2] / maxBusBwOop[largestIdx] * 100.0 : 0.0; + double rLgIp = (maxBusBwIp[largestIdx] > 0) ? allData[off_lg + 5] / maxBusBwIp[largestIdx] * 100.0 : 0.0; + bool pairSlow = (rLgOop < 90.0 || rLgIp < 90.0); + + printf("#\n"); + printf("# --- GPU Pair %d ---%s\n", c, pairSlow ? " <<< SLOW" : ""); + printf("# out-of-place in-place \n"); + printf("# size count type redop root time algbw busbw #wrong time algbw busbw #wrong \n"); + printf("# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s) \n"); + + for (int s = 0; s < nSizes; s++) { + int off = c * nSizes * 6 + s * 6; + double timeOop = allData[off + 0]; + double algBwOop = allData[off + 1]; + double busBwOop = allData[off + 2]; + double timeIp = allData[off + 3]; + double algBwIp = allData[off + 4]; + double busBwIp = allData[off + 5]; + + // Format numbers with auto-precision like the original nccl-tests output + // getFloatStr: fits value into fixed width by adjusting decimal places + auto fmtFloat = [](double val, int w, char* buf) { + int power = 0; + for (uint64_t v = 1; val >= v; v *= 10) power++; + if (power < w-2) sprintf(buf, "%*.2f", w, val); + else if (power < w-1) sprintf(buf, "%*.1f", w, val); + else sprintf(buf, "%*.0f", w, val); + }; + char tO[8], aO[7], bO[7], tI[8], aI[7], bI[7]; + fmtFloat(timeOop, 7, tO); fmtFloat(algBwOop, 6, aO); fmtFloat(busBwOop, 6, bO); + fmtFloat(timeIp, 7, tI); fmtFloat(algBwIp, 6, aI); fmtFloat(busBwIp, 6, bI); + + printf("%12zu %12zu %8s %6s %6i %7s %6s %6s N/A %7s %6s %6s N/A\n", + localSizes[s], localCounts[s], typeName, opName, -1, + tO, aO, bO, tI, aI, bI); + } + } + + // Summary based on largest message size + printf("#\n# --- Summary (based on largest msg size: %zu bytes) ---\n", localSizes[largestIdx]); + printf("# %8s %11s %7s %11s %7s %s\n", + "GPU_Pair", "OOP(GB/s)", "vs_max", "IP(GB/s)", "vs_max", "Status"); + for (int c = 0; c < numColors; c++) { + int off = c * nSizes * 6 + largestIdx * 6; + double busBwOop = allData[off + 2]; + double busBwIp = allData[off + 5]; + double rOop = (maxBusBwOop[largestIdx] > 0) ? busBwOop / maxBusBwOop[largestIdx] * 100.0 : 0.0; + double rIp = (maxBusBwIp[largestIdx] > 0) ? busBwIp / maxBusBwIp[largestIdx] * 100.0 : 0.0; + bool pairSlow = (rOop < 90.0 || rIp < 90.0); + printf("# %8d %11.2f %6.1f%% %11.2f %6.1f%% %s\n", + c, busBwOop, rOop, busBwIp, rIp, + pairSlow ? "<<< SLOW" : "OK"); + } + printf("#\n"); + printf("# ==============================================================\n"); + printf("#\n"); + + free(maxBusBwOop); free(maxBusBwIp); + } + free(localSizes); free(localCounts); free(localData); + free(orderedData); free(allData); + } + } + } + // Free per-size tracking arrays + for (int t = 0; t < nThreads; t++) { + free(threads[t].args.sv_sizes); + free(threads[t].args.sv_counts); + free(threads[t].args.sv_data); + } +#endif + writeResultFooter(errors, bw, check_avg_bw, program_invocation_short_name); if (memory_report) { memInfo_t memInfos[3]; diff --git a/src/common.h b/src/common.h index a68ecaa..fa8fe38 100644 --- a/src/common.h +++ b/src/common.h @@ -137,6 +137,9 @@ struct threadArgs { int nGpus; int* gpus; int localRank; +#ifdef MPI_SUPPORT + MPI_Comm mpi_comm; // split sub-communicator; used by Barrier/Allreduce +#endif void** sendbuffs; size_t sendBytes; size_t sendInplaceOffset; @@ -154,6 +157,15 @@ struct threadArgs { int* errors; double* bw; int* bw_count; + // Per-size full performance tracking for split verbose mode + // sv_data layout: [nSizes * 6] — for each size index: + // [0] timeOop [1] algBwOop [2] busBwOop + // [3] timeIp [4] algBwIp [5] busBwIp + size_t* sv_sizes; // message sizes (bytes) + size_t* sv_counts; // element counts + double* sv_data; // packed perf data per size + int sv_nSizes; // number of sizes recorded + int sv_maxSizes; // allocated capacity int reportErrors;