Add symmetric registration support

-R 2 will enable symmetric registration
This commit is contained in:
Kaiming Ouyang
2025-03-13 21:14:39 -07:00
committed by Sylvain Jeaugey
parent b4300cc79d
commit 59072b7e3d
+64 -6
View File
@@ -641,7 +641,23 @@ testResult_t threadInit(struct threadArgs* args) {
NCCLCHECK(ncclCommInitRank(args->comms+i, nranks, args->ncclId, rank));
}
NCCLCHECK(ncclGroupEnd());
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
void** sendRegHandles = (local_register) ? (void**)malloc(sizeof(*sendRegHandles) * args->nGpus) : NULL;
void** recvRegHandles = (local_register) ? (void**)malloc(sizeof(*recvRegHandles) * args->nGpus) : NULL;
if (local_register == 2) {
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < args->nGpus; i++) {
NCCLCHECK(ncclCommSymmetricRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &sendRegHandles[i]));
NCCLCHECK(ncclCommSymmetricRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &recvRegHandles[i]));
}
NCCLCHECK(ncclGroupEnd());
} else if (local_register == 1) {
for (int i = 0; i < args->nGpus; i++) {
NCCLCHECK(ncclCommRegister(args->comms[i], args->sendbuffs[i], args->maxbytes, &sendRegHandles[i]));
NCCLCHECK(ncclCommRegister(args->comms[i], args->recvbuffs[i], args->maxbytes, &recvRegHandles[i]));
}
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
void **sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*args->nGpus) : NULL;
void **recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*args->nGpus) : NULL;
for (int i=0; i<args->nGpus; i++) {
@@ -652,11 +668,23 @@ testResult_t threadInit(struct threadArgs* args) {
TESTCHECK(threadRunTests(args));
NCCLCHECK(ncclGroupStart());
for (int i=0; i<args->nGpus; i++) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (local_register == 2) {
NCCLCHECK(ncclCommSymmetricDeregister(args->comms[i], sendRegHandles[i]));
NCCLCHECK(ncclCommSymmetricDeregister(args->comms[i], recvRegHandles[i]));
} else if (local_register == 1) {
NCCLCHECK(ncclCommDeregister(args->comms[i], sendRegHandles[i]));
NCCLCHECK(ncclCommDeregister(args->comms[i], recvRegHandles[i]));
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], sendRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommDeregister(args->comms[i], recvRegHandles[i]));
#endif
}
NCCLCHECK(ncclGroupEnd());
for (int i=0; i<args->nGpus; i++) {
NCCLCHECK(ncclCommDestroy(args->comms[i]));
}
return testSuccess;
@@ -836,7 +864,9 @@ int main(int argc, char* argv[]) {
average = (int)strtol(optarg, NULL, 0);
break;
case 'R':
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
local_register = (int)strtol(optarg, NULL, 0);
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
if ((int)strtol(optarg, NULL, 0)) {
local_register = 1;
}
@@ -875,7 +905,7 @@ int main(int argc, char* argv[]) {
"[-G,--cudagraph <num graph launches>] \n\t"
"[-C,--report_cputime <0/1>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-R,--local_register <1/0> enable local buffer registration on send/recv buffers (default: disable)] \n\t"
"[-R,--local_register <2/1/0> enable symmetric <2> or local buffer <1> registration on send/recv buffers (default: disable)] \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
@@ -1056,7 +1086,23 @@ testResult_t run() {
}
NCCLCHECK(ncclGroupEnd());
}
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
sendRegHandles = (local_register) ? (void**)malloc(sizeof(*sendRegHandles) * nThreads * nGpus) : NULL;
recvRegHandles = (local_register) ? (void**)malloc(sizeof(*recvRegHandles) * nThreads * nGpus) : NULL;
if (local_register == 2) {
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nGpus * nThreads; i++) {
NCCLCHECK(ncclCommSymmetricRegister(comms[i], sendbuffs[i], maxBytes, &sendRegHandles[i]));
NCCLCHECK(ncclCommSymmetricRegister(comms[i], recvbuffs[i], maxBytes, &recvRegHandles[i]));
}
NCCLCHECK(ncclGroupEnd());
} else if (local_register == 1) {
for (int i = 0; i < nGpus * nThreads; i++) {
NCCLCHECK(ncclCommRegister(comms[i], sendbuffs[i], maxBytes, &sendRegHandles[i]));
NCCLCHECK(ncclCommRegister(comms[i], recvbuffs[i], maxBytes, &recvRegHandles[i]));
}
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
sendRegHandles = (local_register) ? (void **)malloc(sizeof(*sendRegHandles)*nThreads*nGpus) : NULL;
recvRegHandles = (local_register) ? (void **)malloc(sizeof(*recvRegHandles)*nThreads*nGpus) : NULL;
for (int i=0; i<nGpus*nThreads; i++) {
@@ -1139,11 +1185,23 @@ testResult_t run() {
#endif
if (!parallel_init) {
NCCLCHECK(ncclGroupStart());
for(int i=0; i<nGpus*nThreads; ++i) {
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
#if NCCL_VERSION_CODE >= NCCL_VERSION(2,27,0)
if (local_register == 2) {
NCCLCHECK(ncclCommSymmetricDeregister(comms[i], sendRegHandles[i]));
NCCLCHECK(ncclCommSymmetricDeregister(comms[i], recvRegHandles[i]));
} else if (local_register == 1) {
NCCLCHECK(ncclCommDeregister(comms[i], sendRegHandles[i]));
NCCLCHECK(ncclCommDeregister(comms[i], recvRegHandles[i]));
}
#elif NCCL_VERSION_CODE >= NCCL_VERSION(2,19,0)
if (local_register) NCCLCHECK(ncclCommDeregister(comms[i], sendRegHandles[i]));
if (local_register) NCCLCHECK(ncclCommDeregister(comms[i], recvRegHandles[i]));
#endif
}
NCCLCHECK(ncclGroupEnd());
for (int i = 0; i < nGpus * nThreads; i++) {
NCCLCHECK(ncclCommDestroy(comms[i]));
}
free(comms);