• NVIDIA NCCL 源码学习(二)- bootstrap网络连接的建立


    上次介绍到rank0的机器生成了ncclUniqueId,并完成了机器的bootstrap网络和通信网络的初始化,这节接着看下所有节点间bootstrap的连接是如何建立的

    rank0节点执行ncclGetUniqueId生成ncclUniqueId,通过mpi将Id广播到所有节点,然后所有节点都会执行ncclCommInitRank,这里其他节点也会进行初始化bootstrap网络和通信网络的操作,然后会执行到ncclCommInitRankSync

    1. ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
    2. ncclResult_t res;
    3. CUDACHECK(cudaSetDevice(cudaDev));
    4. NCCLCHECKGOTO(commAlloc(newcomm, nranks, myrank), res, cleanup);
    5. NCCLCHECKGOTO(initTransportsRank(*newcomm, &commId), res, cleanup);
    6. NCCLCHECKGOTO(devCommSetup(*newcomm), res, cleanup);
    7. INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %x - Init COMPLETE", *newcomm, myrank, nranks, (*newcomm)->cudaDev, (*newcomm)->busId);
    8. return ncclSuccess;
    9. cleanup:
    10. if ((*newcomm) && (*newcomm)->bootstrap) bootstrapAbort((*newcomm)->bootstrap);
    11. *newcomm = NULL;
    12. return res;
    13. }

    ncclComm_t是指向ncclComm的指针,ncclComm是一个大杂烩,包含了通信用到的所有上下文信息,里面的字段等用到的时候再介绍,然后通过commAlloc分配newcom,并且完成初始化,比如当前是哪个卡,对应的pcie busid是什么,然后执行initTransportsRank

    1. static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* commId) {
    2. // We use 3 AllGathers
    3. // 1. { peerInfo, comm }
    4. // 2. ConnectTransport[nranks], ConnectValue[nranks]
    5. // 3. { nThreads, nrings, compCap, prev[MAXCHANNELS], next[MAXCHANNELS] }
    6. int rank = comm->rank;
    7. int nranks = comm->nRanks;
    8. uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
    9. TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
    10. NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));
    11. // AllGather1 - begin
    12. struct {
    13. struct ncclPeerInfo peerInfo;
    14. struct ncclComm* comm;
    15. } *allGather1Data;
    16. NCCLCHECK(ncclCalloc(&allGather1Data, nranks));
    17. allGather1Data[rank].comm = comm;
    18. struct ncclPeerInfo* myInfo = &allGather1Data[rank].peerInfo;
    19. NCCLCHECK(fillInfo(comm, myInfo, commHash));
    20. NCCLCHECK(bootstrapAllGather(comm->bootstrap, allGather1Data, sizeof(*allGather1Data)));
    21. NCCLCHECK(ncclCalloc(&comm->peerInfo, nranks+1)); // Extra rank to represent CollNet root
    22. for (int i = 0; i < nranks; i++) {
    23. memcpy(comm->peerInfo+i, &allGather1Data[i].peerInfo, sizeof(struct ncclPeerInfo));
    24. if ((i != rank) && (comm->peerInfo[i].hostHash == myInfo->hostHash) && (comm->peerInfo[i].busId == myInfo->busId)) {
    25. WARN("Duplicate GPU detected : rank %d and rank %d both on CUDA device %x", rank, i, myInfo->busId);
    26. return ncclInvalidUsage;
    27. }
    28. }

    看下bootstrapInit

    1. ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
    2. ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
    3. bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
    4. struct extState* state;
    5. NCCLCHECK(ncclCalloc(&state, 1));
    6. state->rank = rank;
    7. state->nranks = nranks;
    8. *commState = state;
    9. TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
    10. struct extInfo info = { 0 };
    11. info.rank = rank;
    12. info.nranks = nranks;
    13. void *tmpSendComm, *tmpRecvComm;
    14. // Pass the remote address to listen via info
    15. if (idFromEnv) {
    16. memcpy(&info.extHandleListen, netHandle, sizeof(ncclNetHandle_t));
    17. memcpy(&info.extHandleListenRoot, netHandle, sizeof(ncclNetHandle_t));
    18. }
    19. // listen will return the local address via info (specify interface type 'findSubnetIf')
    20. state->dev = idFromEnv ? findSubnetIf : 0;
    21. void* extBstrapListenCommRoot;
    22. NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListen, &state->extBstrapListenComm));
    23. NCCLCHECK(bootstrapNetListen(state->dev, &info.extHandleListenRoot, &extBstrapListenCommRoot));
    24. // stagger connection times to avoid an overload of the root at very high rank counts
    25. if (nranks > 128) {
    26. long msec = rank;
    27. struct timespec tv;
    28. tv.tv_sec = msec / 1000;
    29. tv.tv_nsec = 1000000 * (msec % 1000);
    30. TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
    31. (void) nanosleep(&tv, NULL);
    32. }
    33. // send info on my listening socket to root
    34. NCCLCHECK(bootstrapNetConnect(state->dev, netHandle, &tmpSendComm));
    35. NCCLCHECK(bootstrapNetSend(tmpSendComm, &info, sizeof(info)));
    36. NCCLCHECK(bootstrapNetCloseSend(tmpSendComm));
    37. // get info on my "next" rank in the bootstrap ring from root
    38. }

    首先看下commState,即ncclComm的bootstrap,类型为extState

    1. struct extState {
    2. void* extBstrapListenComm;
    3. void* extBstrapRingRecvComm;
    4. void* extBstrapRingSendComm;
    5. ncclNetHandle_t* peerBstrapHandles;
    6. struct unexConn* unexpectedConnections;
    7. int rank;
    8. int nranks;
    9. int dev;
    10. };

    其中extBstrapRingSendComm是当前节点连接next的socket连接,extBstrapRingRecvComm是当前节点和prev节点的socket连接,extBstrapListenComm是当前节点的监听socket,peerBstrapHandles是所有rank的ip port(对应extBstrapListenComm),dev默认为0,表示用第几个ip地址

    然后通过bootstrapNetListen创建extHandleListen和extHandleListenRoot两个bootstrap comm,如前文所述,bootstrap comm其实就是保存了fd,这里创建两个comm的原因是extHandleListen是rank之间实际使用的bootstrap连接,extHandleListenRoot是rank0节点和其他所有rank进行通信使用的连接

    static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm)

    bootstrapNetListen函数上节有介绍过,会获取到第dev个当前机器的ip,然后listen获取监听fd,将ip port写到nethandle,获取到的bootstrap comm写到listencomm

    然后将rank,nrank,extHandleListen和extHandleListenRoot写到extInfo里

    1. struct extInfo {
    2. int rank;
    3. int nranks;
    4. ncclNetHandle_t extHandleListenRoot;
    5. ncclNetHandle_t extHandleListen;
    6. };

    netHandle为ncclUniqueId,即rank0的ip port,然后通过bootstrapNetConnect创建bootstrap send comm,类比bootstrapNetListen,bootstrapNetConnect就是建立到netHandle的socket连接,将socket写到sendComm里,这里dev并没有用到

    static ncclResult_t bootstrapNetConnect(int dev, ncclNetHandle_t* netHandle, void** sendComm)

    然后通过bootstrapNetSend将extInfo发送出去,即发给rank0

    1. static ncclResult_t bootstrapNetSend(void* sendComm, void* data, int size) {
    2. struct bootstrapNetComm* comm = (struct bootstrapNetComm*)sendComm;
    3. NCCLCHECK(socketSend(comm->fd, &size, sizeof(int)));
    4. NCCLCHECK(socketSend(comm->fd, data, size));
    5. return ncclSuccess;
    6. }

    其中socketSend就是执行send接口发送数据

    然后通过bootstrapNetCloseSend关闭fd

    rank0收到数据后会做什么工作呢,回顾一下,rank0的节执行ncclGetUniqueId生成ncclUniqueId,其中在执行bootstrapCreateRoot的最后会启动一个线程执行bootstrapRoot

    1. static void *bootstrapRoot(void* listenComm) {
    2. struct extInfo info;
    3. ncclNetHandle_t *rankHandles = NULL;
    4. ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
    5. ncclNetHandle_t zero = { 0 }; // for sanity checking
    6. void* tmpComm;
    7. ncclResult_t res;
    8. setFilesLimit();
    9. TRACE(NCCL_INIT, "BEGIN");
    10. /* Receive addresses from all ranks */
    11. int nranks = 0, c = 0;
    12. do {
    13. NCCLCHECKGOTO(bootstrapNetAccept(listenComm, &tmpComm), res, out);
    14. NCCLCHECKGOTO(bootstrapNetRecv(tmpComm, &info, sizeof(info)), res, out);
    15. NCCLCHECKGOTO(bootstrapNetCloseRecv(tmpComm), res, out);
    16. if (c == 0) {
    17. nranks = info.nranks;
    18. NCCLCHECKGOTO(ncclCalloc(&rankHandles, nranks), res, out);
    19. NCCLCHECKGOTO(ncclCalloc(&rankHandlesRoot, nranks), res, out);
    20. }
    21. if (nranks != info.nranks) {
    22. WARN("Bootstrap Root : mismatch in rank count from procs %d : %d", nranks, info.nranks);
    23. goto out;
    24. }
    25. if (memcmp(&zero, &rankHandlesRoot[info.rank], sizeof(ncclNetHandle_t)) != 0) {
    26. WARN("Bootstrap Root : rank %d of %d ranks has already checked in", info.rank, nranks);
    27. goto out;
    28. }
    29. // Save the connection handle for that rank
    30. memcpy(rankHandlesRoot+info.rank, info.extHandleListenRoot, sizeof(ncclNetHandle_t));
    31. memcpy(rankHandles+info.rank, info.extHandleListen, sizeof(ncclNetHandle_t));
    32. ++c;
    33. TRACE(NCCL_INIT, "Received connect from rank %d total %d/%d", info.rank, c, nranks);
    34. } while (c < nranks);
    35. TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
    36. // Send the connect handle for the next rank in the AllGather ring
    37. for (int r=0; r
    38. int next = (r+1) % nranks;
    39. void *tmpSendComm;
    40. NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
    41. NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
    42. NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
    43. }
    44. TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
    45. out:
    46. bootstrapNetCloseListen(listenComm);
    47. if (rankHandles) free(rankHandles);
    48. if (rankHandlesRoot) free(rankHandlesRoot);
    49. TRACE(NCCL_INIT, "DONE");
    50. return NULL;
    51. }

    listenComm是上一个博文中rank0创建的监听fd,bootstrapNetAccept是从listenComm中获取一个新连接,使用新连接的fd创建recvcomm

    static ncclResult_t bootstrapNetAccept(void* listenComm, void** recvComm)

    然后通过bootstrapNetRecv读取tmpComm的数据,即其他rank发送来的extInfo,然后保存其他rank的extHandleListen和extHandleListenRoot,这个时候rank0就获取到其他所有rank的ip和port了。获取完所有rank的info之后开始建环,将节点(r+1) % nranks的extHandleListen发送给节点r,就是说将节点r的next节点的nethandle发送给节点r。这里可以看出,每个节点创建了两个listen comm,其中rank0使用extHandleListenRoot进行通信,其他节点之间通过extHandleListen进行通信

    然后再回去接着看bootstrapInit

    1. ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
    2. // get info on my "next" rank in the bootstrap ring from root
    3. ncclNetHandle_t extHandleNext;
    4. NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
    5. NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
    6. NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
    7. NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));
    8. NCCLCHECK(bootstrapNetConnect(state->dev, &extHandleNext, &state->extBstrapRingSendComm));
    9. // Accept the connect request from the previous rank in the AllGather ring
    10. NCCLCHECK(bootstrapNetAccept(state->extBstrapListenComm, &state->extBstrapRingRecvComm));
    11. // AllGather all listen handlers
    12. NCCLCHECK(ncclCalloc(&state->peerBstrapHandles, nranks));
    13. memcpy(state->peerBstrapHandles+rank, info.extHandleListen, sizeof(ncclNetHandle_t));
    14. NCCLCHECK(bootstrapAllGather(state, state->peerBstrapHandles, sizeof(ncclNetHandle_t)));
    15. TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
    16. return ncclSuccess;
    17. }

    接着所有rank都会在extHandleListenRoot上接收新连接创建tmpRecvComm,然后接收到当前rank的next的ip,port;然后连接next创建bscomm到state->extBstrapRingSendComm,接收prev的连接创建bscomm到state->extBstrapRingRecvComm,到现在bootstrap网络连接就完全建立起来了,如下图

     最后gather所有rank的ip port,首先将自己的nethandle放到peerBstrapHandles的对应位置,如下所示

    然后执行bootstrapAllGather

    1. ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
    2. struct extState* state = (struct extState*)commState;
    3. char* data = (char*)allData;
    4. int rank = state->rank;
    5. int nranks = state->nranks;
    6. TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);
    7. /* Simple ring based AllGather
    8. * At each step i receive data from (rank-i-1) from left
    9. * and send previous step's data from (rank-i) to right
    10. */
    11. for (int i=0; i-1; i++) {
    12. size_t rslice = (rank - i - 1 + nranks) % nranks;
    13. size_t sslice = (rank - i + nranks) % nranks;
    14. // Send slice to the right
    15. NCCLCHECK(bootstrapNetSend(state->extBstrapRingSendComm, data+sslice*size, size));
    16. // Recv slice from the left
    17. NCCLCHECK(bootstrapNetRecv(state->extBstrapRingRecvComm, data+rslice*size, size));
    18. }
    19. TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
    20. return ncclSuccess;
    21. }

     每一次将自己的data发送给对应的rank,然后接收其他rank发送过来的data,如下图

    第一步:

    第二步:

     到这里每个rank就都有了全局所有rank的ip port

    最后总结一下,本节主要创建了bootstrap环形网络连接,并保存到ncclComm里。

  • 相关阅读:
    SkeyeVSS果园防盗智能监控系统 保障果农收益好帮手
    Mysql表的约束
    leetCode 416.分割等和子集 + 01背包 + 动态规划 + 记忆化搜索 + 递推 + 空间优化
    【模型推理优化学习笔记】张量并行和流水线并行简介
    2023 年值得关注的软件测试趋势
    LeetCode50天刷题计划第二季(Day 17 — 克隆图 (10.00-11.20)
    Linux权限的认识
    Cesium-移动实体
    Docker consul的容器服务更新与发现
    kubeadm重新拉取集群的方式
  • 原文地址:https://blog.csdn.net/KIDGIN7439/article/details/126938077