• NCCL源码解析⑦:机器间Channel连接


    32a6fc3732539a2950bae8856dd7fe69.jpeg

    作者|KIDGINBROOK

    上节中完成了单机内部的channel搜索,仍然以ringGraph为例的话,相当于在单台机器内部搜索出来了一系列的环,接下来需要将机器之间的环连接起来。

    为了方便理解,假设两机十六卡的情况下第一台机器的一个ring为:

    1. graph->intra: GPU/0 GPU/7 GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1
    2. graph->inter: NET/0 NET/0

    第二个机器对应的ring为:

    1. graph->intra: GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11
    2. graph->inter: NET/0 NET/0

    allGather3Data用于rank间聚合channel的信息,ncclGraphInfo记录了环的信息,比如speed和type

    1. struct ncclGraphInfo {
    2. int sameChannels;
    3. float speedIntra;
    4. float speedInter;
    5. int typeIntra;
    6. };
    7. struct {
    8. int cudaCompCap;
    9. int fullCudaCompCap;
    10. int nChannels;
    11. struct ncclGraphInfo tree;
    12. struct ncclGraphInfo ring;
    13. struct ncclGraphInfo collNet;
    14. struct ncclTopoRanks topoRanks;
    15. } *allGather3Data;
    16. NCCLCHECK(ncclCalloc(&allGather3Data, nranks));
    17. allGather3Data[rank].cudaCompCap = ncclCudaCompCap();
    18. allGather3Data[rank].nChannels = comm->nChannels = treeGraph.nChannels = ringGraph.nChannels =
    19. std::min(treeGraph.nChannels, ringGraph.nChannels);
    20. ...
    21. allGather3Data[rank].ring.sameChannels = ringGraph.sameChannels;
    22. allGather3Data[rank].ring.speedIntra = ringGraph.speedIntra;
    23. allGather3Data[rank].ring.speedInter = ringGraph.speedInter;
    24. allGather3Data[rank].ring.typeIntra = ringGraph.typeIntra;
    25. ...

    然后开始设置ncclTopoRanks,获取当前rank在ring中的prev和next,其中第一个rank的prev和最后一个rank的next为-1,如rank6的prev为7,next为3;获取当前ring的ringRecv和ringSend,即ring的第一个节点和最后一个节点,最后将搜索到的环复制了一遍,这里在官方issue中看到相关解释是为了进一步的并行以充分利用带宽。

    1. struct ncclTopoRanks {
    2. int ringRecv[MAXCHANNELS];
    3. int ringSend[MAXCHANNELS];
    4. int ringPrev[MAXCHANNELS];
    5. int ringNext[MAXCHANNELS];
    6. int treeUpRecv[MAXCHANNELS];
    7. int treeUpSend[MAXCHANNELS];
    8. int treeDnRecv[MAXCHANNELS];
    9. int treeDnSend[MAXCHANNELS];
    10. };
    11. ncclResult_t ncclTopoPreset(struct ncclComm* comm,
    12. struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph,
    13. struct ncclTopoRanks* topoRanks) {
    14. int rank = comm->rank;
    15. int localRanks = comm->localRanks;
    16. int nChannels = comm->nChannels;
    17. for (int c=0; c
    18. struct ncclChannel* channel = comm->channels+c;
    19. channel->ring.prev = channel->ring.next = -1;
    20. ...
    21. int* ringIntra = ringGraph->intra+c*localRanks;
    22. int* treeIntra = treeGraph->intra+c*localRanks;
    23. int* collNetIntra = collNetGraph->intra+c*localRanks;
    24. for (int i=0; i
    25. if (ringIntra[i] == rank) {
    26. topoRanks->ringRecv[c] = ringIntra[0];
    27. topoRanks->ringSend[c] = ringIntra[localRanks-1];
    28. channel->ring.prev = (i == 0) ? -1 : ringIntra[i-1];
    29. channel->ring.next = (i == localRanks-1) ? -1 : ringIntra[i+1];
    30. }
    31. ...
    32. }
    33. topoRanks->ringPrev[c] = channel->ring.prev;
    34. topoRanks->ringNext[c] = channel->ring.next;
    35. }
    36. // Duplicate channels rings/trees
    37. struct ncclChannel* channel0 = comm->channels;
    38. struct ncclChannel* channel1 = channel0+nChannels;
    39. memcpy(channel1, channel0, nChannels*sizeof(struct ncclChannel));
    40. return ncclSuccess;
    41. }

    然后通过bootstrapAllGather获取全局的allGather3Data信息,计算出当前rank所在的node保存在comm->node,以及每个node的第一个rank保存在nodesFirstRank,因此例子中:

    1. nodesFirstRank[0]: 0
    2. nodesFirstRank[1]: 10

    然后开始将每个机器的环首尾相连组成大环。

    1. ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct ncclTopoRanks** allTopoRanks, int* rings) {
    2. // Gather data from all ranks
    3. int *ringRecv, *ringSend, *ringPrev, *ringNext, *treeUpRecv, *treeUpSend, *treeDnRecv,*treeDnSend;
    4. int nranks = comm->nRanks;
    5. int nChannels = comm->nChannels;
    6. NCCLCHECK(ncclCalloc(&ringRecv, nranks*MAXCHANNELS));
    7. NCCLCHECK(ncclCalloc(&ringSend, nranks*MAXCHANNELS));
    8. NCCLCHECK(ncclCalloc(&ringPrev, nranks*MAXCHANNELS));
    9. NCCLCHECK(ncclCalloc(&ringNext, nranks*MAXCHANNELS));
    10. NCCLCHECK(ncclCalloc(&treeUpRecv, nranks*MAXCHANNELS));
    11. NCCLCHECK(ncclCalloc(&treeUpSend, nranks*MAXCHANNELS));
    12. NCCLCHECK(ncclCalloc(&treeDnRecv, nranks*MAXCHANNELS));
    13. NCCLCHECK(ncclCalloc(&treeDnSend, nranks*MAXCHANNELS));
    14. for (int i=0; i<nranks; i++) {
    15. for (int c=0; c<nChannels;c++) {
    16. ringRecv[c*nranks+i] = allTopoRanks[i]->ringRecv[c];
    17. ringSend[c*nranks+i] = allTopoRanks[i]->ringSend[c];
    18. ringPrev[c*nranks+i] = allTopoRanks[i]->ringPrev[c];
    19. ringNext[c*nranks+i] = allTopoRanks[i]->ringNext[c];
    20. treeUpRecv[c*nranks+i] = allTopoRanks[i]->treeUpRecv[c];
    21. treeUpSend[c*nranks+i] = allTopoRanks[i]->treeUpSend[c];
    22. treeDnRecv[c*nranks+i] = allTopoRanks[i]->treeDnRecv[c];
    23. treeDnSend[c*nranks+i] = allTopoRanks[i]->treeDnSend[c];
    24. }
    25. }
    26. // Connect rings and trees. This should also duplicate the channels.
    27. NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks));
    28. NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks));
    29. // Duplicate ringPrev/ringNext for ncclBuildRing
    30. memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int));
    31. memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int));
    32. // Duplication should be complete now
    33. nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2);
    34. // Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS.
    35. // We permit combining max, then min, to only use the first channels, then duplicate them.
    36. nChannels = comm->nChannels = std::min((int)ncclMaxNchannels(), nChannels);
    37. int c;
    38. for (c=nChannels; c<ncclMinNchannels(); c++) {
    39. memcpy(ringPrev+c*nranks, ringPrev+(c-nChannels)*nranks, nranks*sizeof(int));
    40. memcpy(ringNext+c*nranks, ringNext+(c-nChannels)*nranks, nranks*sizeof(int));
    41. memcpy(comm->channels+c, comm->channels+c-nChannels, sizeof(struct ncclChannel));
    42. }
    43. nChannels = comm->nChannels = c;
    44. // Create rings array and check all is fine
    45. NCCLCHECK(ncclBuildRings(nChannels, rings, comm->rank, comm->nRanks, ringPrev, ringNext));
    46. free(ringRecv);
    47. free(ringSend);
    48. free(ringPrev);
    49. free(ringNext);
    50. free(treeUpRecv);
    51. free(treeUpSend);
    52. free(treeDnRecv);
    53. free(treeDnSend);
    54. return ncclSuccess;
    55. }

    这里将所有channel的prev,next,send,recv信息打平到数组中,例如recv[0]表示第一个ring中rank0的recv是哪个rank,然后开始计算当前机器第一个rank的prev和最后一个rank的next。

    1. static ncclResult_t connectRings(struct ncclComm* comm, int* ringRecv, int* ringSend, int* ringPrev, int* ringNext, int* firstRanks) {
    2. int nChannels = comm->nChannels;
    3. int nNodes = comm->nNodes;
    4. for (int c=0; c
    5. int* recv = ringRecv+c*comm->nRanks;
    6. int* send = ringSend+c*comm->nRanks;
    7. int* prev = ringPrev+c*comm->nRanks;
    8. int* next = ringNext+c*comm->nRanks;
    9. struct ncclChannel* channel0 = comm->channels+c;
    10. struct ncclChannel* channel1 = channel0+nChannels;
    11. for (int n=0; n
    12. int recvRank = recv[firstRanks[n]];
    13. int prevSendRank = send[firstRanks[(n-1+nNodes)%nNodes]];
    14. prev[recvRank] = prevSendRank;
    15. if (comm->rank == recvRank) {
    16. channel0->ring.prev = prevSendRank;
    17. channel1->ring.prev = prevSendRank;
    18. }
    19. int sendRank = send[firstRanks[n]];
    20. int nextRecvRank = recv[firstRanks[(n+1)%nNodes]];
    21. next[sendRank] = nextRecvRank;
    22. if (comm->rank == sendRank) {
    23. channel0->ring.next = nextRecvRank;
    24. channel1->ring.next = nextRecvRank;
    25. }
    26. }
    27. TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c, channel0->ring.prev, comm->rank, channel0->ring.next);
    28. TRACE(NCCL_GRAPH, "Ring %d : %d -> %d -> %d", c+nChannels, channel1->ring.prev, comm->rank, channel1->ring.next);
    29. }
    30. return ncclSuccess;
    31. }

    如上所示,当前机器recv rank的prev就是前一个机器的send rank,当前机器send rank的next就是下一个机器的recv rank。然后执行ncclBuildRings按照大环的顺序依次记录rank到rings。

    1. ncclResult_t ncclBuildRings(int nrings, int* rings, int rank, int nranks, int* prev, int* next) {
    2. for (int r=0; r
    3. char prefix[30];
    4. int current = rank;
    5. for (int i=0; i
    6. rings[r*nranks+i] = current;
    7. current = next[r*nranks+current];
    8. }
    9. ...
    10. // Check that all ranks are there
    11. for (int i=0; i
    12. int found = 0;
    13. for (int j=0; j
    14. if (rings[r*nranks+j] == i) {
    15. found = 1;
    16. break;
    17. }
    18. }
    19. if (found == 0) {
    20. WARN("Error : ring %d does not contain rank %d", r, i);
    21. return ncclInternalError;
    22. }
    23. }
    24. }
    25. return ncclSuccess;
    26. }

    还是以上述为例,其中rank6记录的rings的第一个大环为:

    GPU/6 GPU/3 GPU/2 GPU/5 GPU/4 GPU/1 GPU/10 GPU/9 GPU/8 GPU/13 GPU/12 GPU/15 GPU/14 GPU/11 GPU/0 GPU/7

    到这里就完成了机器之间大环建立,每个rank都知道自己的上一个和下一个rank是谁,那么就可以建立实际的通信链路了。

    接下来每个rank都要为通信分配一些内存,为了提高性能,这里会在分配buffer之前设置cpu亲和性,使得分配的内存尽量是当前numa本地的。

    1. cpu_set_t affinitySave;
    2. sched_getaffinity(0, sizeof(cpu_set_t), &affinitySave);
    3. NCCLCHECK(ncclTopoSetAffinity(comm->topo, comm->rank));
    4. ncclResult_t ncclTopoSetAffinity(struct ncclTopoSystem* system, int rank) {
    5. struct ncclTopoNode* cpu = NULL, *gpu = NULL;
    6. for (int g=0; gnodes[GPU].count; g++) {
    7. if (system->nodes[GPU].nodes[g].gpu.rank == rank) {
    8. gpu = system->nodes[GPU].nodes+g;
    9. // Find closer CPU
    10. int cpuIndex = -1, minHops = 0;
    11. for (int c=0; cnodes[CPU].count; c++) {
    12. int nHops = system->nodes[GPU].nodes[g].paths[CPU][c].count;
    13. if (cpuIndex == -1 || nHops < minHops) {
    14. cpuIndex = c;
    15. minHops = nHops;
    16. }
    17. }
    18. cpu = system->nodes[CPU].nodes+cpuIndex;
    19. }
    20. }
    21. if (cpu == NULL) {
    22. WARN("Set CPU affinity : unable to find GPU/CPU for rank %d", rank);
    23. return ncclInternalError;
    24. }
    25. // Query the CPU affinity set we were provided
    26. cpu_set_t mask;
    27. SYSCHECK(sched_getaffinity(0, sizeof(cpu_set_t), &mask), "sched_getaffinity");
    28. // Get the affinity of the CPU close to our GPU.
    29. cpu_set_t cpuMask = cpu->cpu.affinity;
    30. cpu_set_t finalMask;
    31. if (ncclParamIgnoreCpuAffinity())
    32. // Ignore the CPU affinity set and use the GPU one instead
    33. finalMask = cpuMask;
    34. else
    35. // Use a subset of the GPU affinity set
    36. CPU_AND(&finalMask, &mask, &cpuMask);
    37. // If there is a non empty set, use it to set affinity
    38. if (CPU_COUNT(&finalMask)) {
    39. char affinityStr[sizeof(cpu_set_t)*2];
    40. NCCLCHECK(ncclCpusetToStr(&finalMask, affinityStr));
    41. INFO(NCCL_INIT, "Setting affinity for GPU %d to %s", gpu->gpu.dev, affinityStr);
    42. SYSCHECK(sched_setaffinity(0, sizeof(cpu_set_t), &finalMask), "sched_setaffinity");
    43. }
    44. return ncclSuccess;
    45. }

    首先获取当前线程的cpu亲和性保存到affinitySave,分配好buffer之后会用affinitySave来恢复亲和性。

    然后通过ncclTopoSetAffinity设置cpu亲和性,找到当前rank对应的cpu节点之后,可以获取到该cpu对应的core,即cpuMask,然后获取当前线程对应的亲和性,即mask,默认会取cpuMask和mask的交集finalMask,如果交集不为空的话,会将finalMask设置给当前线程。

    1. struct ncclConnect {
    2. char data[CONNECT_SIZE];
    3. };
    4. struct ncclConnect *connect;
    5. NCCLCHECKGOTO(ncclCalloc(&connect, 2), ret, affinity_restore);
    6. for (int c=0; cnChannels; c++) {
    7. struct ncclChannel* channel = comm->channels+c;
    8. NCCLCHECKGOTO(setupChannel(comm, c, rank, nranks, rings+c*nranks), ret, affinity_restore);
    9. if (comm->nRanks == 1) continue;
    10. NCCLCHECKGOTO(ncclTransportP2pSetup(comm, &ringGraph, channel, 1, &channel->ring.prev, 1, &channel->ring.next), ret, affinity_restore);
    11. ...
    12. }

    然后简单看下ncclChannel数据结构,其中collectives保存了用户向nccl提交的通信操作,比如ncclSend,ncclRecv等都会向collectives里加一项,ncclColl则保存了这些操作对应的参数;collectives是一个环形队列,所以collStart指向了开始位置,collCount表示队列中操作数量;FifoHead和FifoTail用于协调kernel产出数据和NET发送数据,其实就是生产者消费者,ncclPeer保存了通信相关的信息,后续再具体介绍。

    1. struct ncclRing {
    2. // Shortcuts for userRanks[1] and userRanks[n-1]
    3. int prev; // 记录环中当前rank的上一个rank
    4. int next; // 记录环中当前rank的下一个rank
    5. // Maps an internal nccl index to user-specified rank order. This is necessary
    6. // since we need to know how the user expects data to be ordered across
    7. // devices. Ordered from current device.
    8. int* userRanks; // 以当前rank为起点记录整个环
    9. int* devUserRanks; // device断的userRanks
    10. };
    11. struct ncclChannel {
    12. union {
    13. struct {
    14. struct ncclRing ring;
    15. struct ncclTree treeUp;
    16. struct ncclTree treeDn;
    17. struct ncclTree collTreeUp;
    18. struct ncclTree collTreeDn;
    19. int id;
    20. // Communication structures
    21. struct ncclPeer* peers;
    22. struct ncclPeer* devPeers;
    23. // Operation list for aggregation
    24. struct ncclColl* collectives;
    25. int collStart;
    26. int collCount;
    27. int collFifoHead; // Only used by GPU
    28. int collFifoTail; // Only used by CPU
    29. };
    30. int data[0x80];
    31. };
    32. };

    然后开始初始化channel,initChannel主要是buffer的分配,分配userRanks和devUserRanks,设置ncclPeer,分配collectives,因为host和device都会访问collectives这个数据结构,所以需要通过cudaHostAlloc分配host端的锁页内存,并通过flag cudaHostAllocMapped将其映射到cuda的地址空间。不过在uva系统上,cudaMallocHost,cudaHostAlloc + cudaHostAllocDefault以及cudaHostAlloc + cudaHostAllocMapped这三种方式没啥区别,host和device都可以访问。

    1. ncclResult_t initChannel(struct ncclComm* comm, int channelid) {
    2. struct ncclChannel* channel = comm->channels+channelid;
    3. if (channel->id != -1) return ncclSuccess;
    4. channel->id = channelid;
    5. // Ring index to user rank table.
    6. NCCLCHECK(ncclCudaCalloc(&channel->ring.devUserRanks, comm->nRanks));
    7. NCCLCHECK(ncclCalloc(&channel->ring.userRanks, comm->nRanks));
    8. // Communication structures with peers.
    9. NCCLCHECK(ncclCudaCalloc(&channel->devPeers, comm->nRanks+1)); // The extra one rank is for collnet root (i.e. network)
    10. NCCLCHECK(ncclCalloc(&channel->peers, comm->nRanks+1));
    11. for (size_t i=0; inRanks+1; ++i) {
    12. channel->peers[i].send.comm = comm;
    13. channel->peers[i].recv.comm = comm;
    14. }
    15. // Per-channel operation list.
    16. NCCLCHECK(ncclCudaHostCalloc(&channel->collectives, NCCL_MAX_OPS));
    17. return ncclSuccess;
    18. }
    19. template
    20. static ncclResult_t ncclCudaHostCalloc(T** ptr, size_t nelem) {
    21. CUDACHECK(cudaHostAlloc(ptr, nelem*sizeof(T), cudaHostAllocMapped));
    22. memset(*ptr, 0, nelem*sizeof(T));
    23. return ncclSuccess;
    24. }

    然后从当前rank为起点,将环写到userRanks。

    1. static ncclResult_t setupChannel(struct ncclComm* comm, int channelId, int rank, int nranks, int* ringRanks) {
    2. TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
    3. NCCLCHECK(initChannel(comm, channelId));
    4. struct ncclRing* ring = &comm->channels[channelId].ring;
    5. // Reorganize ranks to start with rank.
    6. int shift;
    7. for (shift = 0; shiftshift++) {
    8. if (ringRanks[shift] == rank) {
    9. break;
    10. }
    11. }
    12. for (int i=0; i
    13. ring->userRanks[i] = ringRanks[(i+shift)%nranks];
    14. }
    15. return ncclSuccess;
    16. }

    然后执行ncclTransportP2pSetup建立当前rank和prev,next的通信链路。

    到这里就完成了机器之间channel的连接,下节会了解到通信链路的建立过程。

    (本文经授权后由OneFlow发布。原文:https://blog.csdn.net/KIDGIN7439/article/details/128144057)

    其他人都在看

    试用OneFlow: github.com/Oneflow-Inc/oneflow/

    e3da96ff795b503443474732d52bf043.png

  • 相关阅读:
    基于Spring接口,集成Caffeine+Redis两级缓存
    【每日一题】最大矩形
    javaEE -9(7000字详解TCP/IP协议)
    异步 IO 机制 io_uring
    【Python机器学习】零基础掌握SpectralCoclustering聚类
    10.一键生成个人微信朋友圈数据电子书
    JDBC一般使用过程+两种statement的区别
    官网电子安全证书端口即将到期!新型诈骗短信!
    【线性代数】MIT Linear Algebra Lecture 2: Elimination with matrices
    武汉新时标文化传媒有限公司短视频用户规模达9.54亿
  • 原文地址:https://blog.csdn.net/OneFlow_Official/article/details/133191624