Add multi-channel DP test results

This commit is contained in:
kangzhen02 2026-04-01 09:57:40 +08:00
parent af1dcac92a
commit 56d0a64f8b
2 changed files with 264 additions and 4 deletions

View File

@ -278,7 +278,10 @@ void Barrier(struct threadArgs *args) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
MPI_Barrier(MPI_COMM_WORLD);
{
MPI_Comm bc = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD;
MPI_Barrier(bc);
}
#endif
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
@ -332,11 +335,12 @@ void Allreduce(struct threadArgs* args, T* value, int average) {
average == 2 ? MPI_MIN :
average == 3 ? MPI_MAX :
average == 4 ? MPI_SUM : MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD);
MPI_Comm ac = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD;
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, ac);
}
#endif
if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads;
if(average == 1) accumulator[epoch] /= args->nProcs*args->nThreads;
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
@ -618,6 +622,23 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches;
Allreduce(args, &deltaSec, average);
// Save per-pair deltaSec for sv_data before global averaging
double pairDeltaSec = deltaSec;
#ifdef MPI_SUPPORT
// When using split sub-communicator, the Allreduce above only averages within
// the pair. Do a second global Allreduce so the original output reflects all pairs.
if (args->mpi_comm != MPI_COMM_NULL && args->thread+1 == args->nThreads) {
// deltaSec was already averaged within the pair; each pair's nProcs procs
// hold the same pair_avg. MPI_SUM across totalProcs procs gives
// nProcs * sum_of_pair_avgs; dividing by totalProcs yields the global average.
double globalDelta = deltaSec;
MPI_Allreduce(MPI_IN_PLACE, &globalDelta, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
globalDelta /= args->totalProcs;
deltaSec = globalDelta;
}
#endif
#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
@ -696,6 +717,49 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
args->bw[0] += busBw;
args->bw_count[0]++;
// Record per-size full perf data for split verbose mode (using per-pair values)
if (args->sv_sizes) {
// Compute per-pair bandwidth from pairDeltaSec (not global deltaSec)
double pairAlgBw, pairBusBw;
args->collTest->getBw(count, wordSize(type), pairDeltaSec, &pairAlgBw, &pairBusBw, args->nProcs*args->nThreads*args->nGpus);
double pairTimeUsec = pairDeltaSec * 1.0E6;
size_t msgSize = max(args->sendBytes, args->expectedBytes);
size_t svCount = args->nbytes / wordSize(type);
// Find or create entry for this size
int idx = -1;
for (int i = 0; i < args->sv_nSizes; i++) {
if (args->sv_sizes[i] == msgSize) { idx = i; break; }
}
if (idx < 0) {
// New size entry — grow arrays if needed
if (args->sv_nSizes >= args->sv_maxSizes) {
int newMax = args->sv_maxSizes ? args->sv_maxSizes * 2 : 16;
args->sv_sizes = (size_t*)realloc(args->sv_sizes, newMax * sizeof(size_t));
args->sv_counts = (size_t*)realloc(args->sv_counts, newMax * sizeof(size_t));
args->sv_data = (double*)realloc(args->sv_data, newMax * 6 * sizeof(double));
for (int i = args->sv_maxSizes; i < newMax; i++) {
args->sv_sizes[i] = 0; args->sv_counts[i] = 0;
for (int k = 0; k < 6; k++) args->sv_data[i * 6 + k] = 0.0;
}
args->sv_maxSizes = newMax;
}
idx = args->sv_nSizes++;
args->sv_sizes[idx] = msgSize;
args->sv_counts[idx] = svCount;
for (int k = 0; k < 6; k++) args->sv_data[idx * 6 + k] = 0.0;
}
int base = idx * 6;
if (in_place == 0) {
args->sv_data[base + 0] = pairTimeUsec;
args->sv_data[base + 1] = pairAlgBw;
args->sv_data[base + 2] = pairBusBw;
} else {
args->sv_data[base + 3] = pairTimeUsec;
args->sv_data[base + 4] = pairAlgBw;
args->sv_data[base + 5] = pairBusBw;
}
}
return testSuccess;
}
@ -1256,8 +1320,11 @@ testResult_t run() {
}
char *splitMaskEnv = NULL;
int splitNumColors = 0; // total number of split groups (0 means no split)
if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT_MASK")) {
color = proc & strtoul(splitMaskEnv, NULL, 16);
unsigned long mask = strtoul(splitMaskEnv, NULL, 16);
color = proc & mask;
splitNumColors = (int)(mask + 1);
} else if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT")) {
if (
(strncasecmp(splitMaskEnv, "AND", strlen("AND")) == 0 && parseInt(splitMaskEnv + strlen("AND"), &color)) ||
@ -1503,6 +1570,9 @@ testResult_t run() {
threads[t].args.totalProcs=totalProcs;
threads[t].args.nProcs=ncclProcs;
threads[t].args.proc=ncclProc;
#ifdef MPI_SUPPORT
threads[t].args.mpi_comm=mpi_comm;
#endif
threads[t].args.nThreads=nThreads;
threads[t].args.thread=t;
threads[t].args.nGpus=nGpus;
@ -1524,6 +1594,21 @@ testResult_t run() {
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
threads[t].args.bw_count=bw_count+t;
// Per-size tracking: only allocate when split verbose is enabled
threads[t].args.sv_sizes=NULL;
threads[t].args.sv_counts=NULL;
threads[t].args.sv_data=NULL;
threads[t].args.sv_nSizes=0;
threads[t].args.sv_maxSizes=0;
if (splitNumColors > 0) {
char* vEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE");
if (vEnv && atoi(vEnv) != 0) {
threads[t].args.sv_maxSizes = 16;
threads[t].args.sv_sizes = (size_t*)calloc(16, sizeof(size_t));
threads[t].args.sv_counts = (size_t*)calloc(16, sizeof(size_t));
threads[t].args.sv_data = (double*)calloc(16 * 6, sizeof(double));
}
}
threads[t].args.initGpuMem = initGpuMem + t;
threads[t].args.bufferMemory = bufferMemory + t;
threads[t].args.devMemUsed = devMemUsed + t;
@ -1545,6 +1630,20 @@ testResult_t run() {
errors[0] += errors[t];
bw[0] += bw[t];
bw_count[0] += bw_count[t];
// Merge per-size data from thread t into thread 0
if (threads[t].args.sv_sizes && threads[0].args.sv_sizes) {
for (int s = 0; s < threads[t].args.sv_nSizes; s++) {
// Find matching size in thread 0
int idx = -1;
for (int s0 = 0; s0 < threads[0].args.sv_nSizes; s0++) {
if (threads[0].args.sv_sizes[s0] == threads[t].args.sv_sizes[s]) { idx = s0; break; }
}
if (idx >= 0) {
for (int k = 0; k < 6; k++)
threads[0].args.sv_data[idx * 6 + k] += threads[t].args.sv_data[s * 6 + k];
}
}
}
devMemUsed[0] = std::max(devMemUsed[0], devMemUsed[t]);
initGpuMem[0] = std::max(initGpuMem[0], initGpuMem[t]);
bufferMemory[0] = std::max(bufferMemory[0], bufferMemory[t]);
@ -1594,6 +1693,155 @@ testResult_t run() {
const double check_avg_bw = envstr ? atof(envstr) : -1;
bw[0] /= bw_count[0];
#ifdef MPI_SUPPORT
// Per-split bandwidth report: gather each split group's full perf data to proc 0
// and print individual GPU-pair results in the same format as the original nccl-tests output.
// Activated by setting NCCL_TESTS_SPLIT_VERBOSE=1 alongside NCCL_TESTS_SPLIT_MASK.
{
char* verboseEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE");
if (verboseEnv && atoi(verboseEnv) != 0 && splitNumColors > 0) {
int numColors = splitNumColors;
int nSizes = threads[0].args.sv_nSizes;
// Synchronize nSizes across all procs
MPI_Allreduce(MPI_IN_PLACE, &nSizes, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
if (nSizes > 0) {
// Prepare local arrays aligned to proc 0's size order
size_t* localSizes = (size_t*)calloc(nSizes, sizeof(size_t));
size_t* localCounts = (size_t*)calloc(nSizes, sizeof(size_t));
double* localData = (double*)calloc(nSizes * 6, sizeof(double));
if (threads[0].args.sv_sizes) {
int myN = threads[0].args.sv_nSizes;
for (int s = 0; s < myN && s < nSizes; s++) {
localSizes[s] = threads[0].args.sv_sizes[s];
localCounts[s] = threads[0].args.sv_counts[s];
for (int k = 0; k < 6; k++)
localData[s * 6 + k] = threads[0].args.sv_data[s * 6 + k];
}
}
// Broadcast sizes and counts from proc 0 so all procs know the order
MPI_Bcast(localSizes, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD);
MPI_Bcast(localCounts, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD);
// Reorder local data to match proc 0's size order
double* orderedData = (double*)calloc(nSizes * 6, sizeof(double));
if (threads[0].args.sv_sizes) {
for (int s = 0; s < nSizes; s++) {
for (int ls = 0; ls < threads[0].args.sv_nSizes; ls++) {
if (threads[0].args.sv_sizes[ls] == localSizes[s]) {
for (int k = 0; k < 6; k++)
orderedData[s * 6 + k] = threads[0].args.sv_data[ls * 6 + k];
break;
}
}
}
}
// Gather all procs' data to proc 0
double* allData = NULL;
if (proc == 0) {
allData = (double*)calloc(totalProcs * nSizes * 6, sizeof(double));
}
MPI_Gather(orderedData, nSizes * 6, MPI_DOUBLE, allData, nSizes * 6, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (proc == 0) {
// Get type/op/root names for output (same for all sizes in a run)
const char* typeName = test_typenames[nccltype];
const char* opName = test_opnames[ncclop];
// Compute per-size max busBw across all pairs (for SLOW detection)
double* maxBusBwOop = (double*)calloc(nSizes, sizeof(double));
double* maxBusBwIp = (double*)calloc(nSizes, sizeof(double));
for (int s = 0; s < nSizes; s++) {
for (int c = 0; c < numColors; c++) {
int off = c * nSizes * 6 + s * 6;
if (allData[off + 2] > maxBusBwOop[s]) maxBusBwOop[s] = allData[off + 2];
if (allData[off + 5] > maxBusBwIp[s]) maxBusBwIp[s] = allData[off + 5];
}
}
int largestIdx = nSizes - 1;
printf("#\n");
printf("# ============= Per-GPU-pair Bandwidth Breakdown =============\n");
for (int c = 0; c < numColors; c++) {
// Determine SLOW based on largest size busBw ratio
int off_lg = c * nSizes * 6 + largestIdx * 6;
double rLgOop = (maxBusBwOop[largestIdx] > 0) ? allData[off_lg + 2] / maxBusBwOop[largestIdx] * 100.0 : 0.0;
double rLgIp = (maxBusBwIp[largestIdx] > 0) ? allData[off_lg + 5] / maxBusBwIp[largestIdx] * 100.0 : 0.0;
bool pairSlow = (rLgOop < 90.0 || rLgIp < 90.0);
printf("#\n");
printf("# --- GPU Pair %d ---%s\n", c, pairSlow ? " <<< SLOW" : "");
printf("# out-of-place in-place \n");
printf("# size count type redop root time algbw busbw #wrong time algbw busbw #wrong \n");
printf("# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s) \n");
for (int s = 0; s < nSizes; s++) {
int off = c * nSizes * 6 + s * 6;
double timeOop = allData[off + 0];
double algBwOop = allData[off + 1];
double busBwOop = allData[off + 2];
double timeIp = allData[off + 3];
double algBwIp = allData[off + 4];
double busBwIp = allData[off + 5];
// Format numbers with auto-precision like the original nccl-tests output
// getFloatStr: fits value into fixed width by adjusting decimal places
auto fmtFloat = [](double val, int w, char* buf) {
int power = 0;
for (uint64_t v = 1; val >= v; v *= 10) power++;
if (power < w-2) sprintf(buf, "%*.2f", w, val);
else if (power < w-1) sprintf(buf, "%*.1f", w, val);
else sprintf(buf, "%*.0f", w, val);
};
char tO[8], aO[7], bO[7], tI[8], aI[7], bI[7];
fmtFloat(timeOop, 7, tO); fmtFloat(algBwOop, 6, aO); fmtFloat(busBwOop, 6, bO);
fmtFloat(timeIp, 7, tI); fmtFloat(algBwIp, 6, aI); fmtFloat(busBwIp, 6, bI);
printf("%12zu %12zu %8s %6s %6i %7s %6s %6s N/A %7s %6s %6s N/A\n",
localSizes[s], localCounts[s], typeName, opName, -1,
tO, aO, bO, tI, aI, bI);
}
}
// Summary based on largest message size
printf("#\n# --- Summary (based on largest msg size: %zu bytes) ---\n", localSizes[largestIdx]);
printf("# %8s %11s %7s %11s %7s %s\n",
"GPU_Pair", "OOP(GB/s)", "vs_max", "IP(GB/s)", "vs_max", "Status");
for (int c = 0; c < numColors; c++) {
int off = c * nSizes * 6 + largestIdx * 6;
double busBwOop = allData[off + 2];
double busBwIp = allData[off + 5];
double rOop = (maxBusBwOop[largestIdx] > 0) ? busBwOop / maxBusBwOop[largestIdx] * 100.0 : 0.0;
double rIp = (maxBusBwIp[largestIdx] > 0) ? busBwIp / maxBusBwIp[largestIdx] * 100.0 : 0.0;
bool pairSlow = (rOop < 90.0 || rIp < 90.0);
printf("# %8d %11.2f %6.1f%% %11.2f %6.1f%% %s\n",
c, busBwOop, rOop, busBwIp, rIp,
pairSlow ? "<<< SLOW" : "OK");
}
printf("#\n");
printf("# ==============================================================\n");
printf("#\n");
free(maxBusBwOop); free(maxBusBwIp);
}
free(localSizes); free(localCounts); free(localData);
free(orderedData); free(allData);
}
}
}
// Free per-size tracking arrays
for (int t = 0; t < nThreads; t++) {
free(threads[t].args.sv_sizes);
free(threads[t].args.sv_counts);
free(threads[t].args.sv_data);
}
#endif
writeResultFooter(errors, bw, check_avg_bw, program_invocation_short_name);
if (memory_report) {
memInfo_t memInfos[3];

View File

@ -137,6 +137,9 @@ struct threadArgs {
int nGpus;
int* gpus;
int localRank;
#ifdef MPI_SUPPORT
MPI_Comm mpi_comm; // split sub-communicator; used by Barrier/Allreduce
#endif
void** sendbuffs;
size_t sendBytes;
size_t sendInplaceOffset;
@ -154,6 +157,15 @@ struct threadArgs {
int* errors;
double* bw;
int* bw_count;
// Per-size full performance tracking for split verbose mode
// sv_data layout: [nSizes * 6] — for each size index:
// [0] timeOop [1] algBwOop [2] busBwOop
// [3] timeIp [4] algBwIp [5] busBwIp
size_t* sv_sizes; // message sizes (bytes)
size_t* sv_counts; // element counts
double* sv_data; // packed perf data per size
int sv_nSizes; // number of sizes recorded
int sv_maxSizes; // allocated capacity
int reportErrors;