• NVIDIA NCCL 源码学习(一)- 初始化及ncclUniqueId的产生


    NCCL是英伟达开源的GPU通信库,支持集合通信和点对点通信

    看下官方给的一个demo

    1. #include
    2. #include "cuda_runtime.h"
    3. #include "nccl.h"
    4. #include "mpi.h"
    5. #include
    6. #include
    7. #define MPICHECK(cmd) do { \
    8. int e = cmd; \
    9. if( e != MPI_SUCCESS ) { \
    10. printf("Failed: MPI error %s:%d '%d'\n", \
    11. __FILE__,__LINE__, e); \
    12. exit(EXIT_FAILURE); \
    13. } \
    14. } while(0)
    15. #define CUDACHECK(cmd) do { \
    16. cudaError_t e = cmd; \
    17. if( e != cudaSuccess ) { \
    18. printf("Failed: Cuda error %s:%d '%s'\n", \
    19. __FILE__,__LINE__,cudaGetErrorString(e)); \
    20. exit(EXIT_FAILURE); \
    21. } \
    22. } while(0)
    23. #define NCCLCHECK(cmd) do { \
    24. ncclResult_t r = cmd; \
    25. if (r!= ncclSuccess) { \
    26. printf("Failed, NCCL error %s:%d '%s'\n", \
    27. __FILE__,__LINE__,ncclGetErrorString(r)); \
    28. exit(EXIT_FAILURE); \
    29. } \
    30. } while(0)
    31. static uint64_t getHostHash(const char* string) {
    32. // Based on DJB2a, result = result * 33 ^ char
    33. uint64_t result = 5381;
    34. for (int c = 0; string[c] != '\0'; c++){
    35. result = ((result << 5) + result) ^ string[c];
    36. }
    37. return result;
    38. }
    39. static void getHostName(char* hostname, int maxlen) {
    40. gethostname(hostname, maxlen);
    41. for (int i=0; i< maxlen; i++) {
    42. if (hostname[i] == '.') {
    43. hostname[i] = '\0';
    44. return;
    45. }
    46. }
    47. }
    48. int main(int argc, char* argv[])
    49. {
    50. int size = 32*1024*1024;
    51. int myRank, nRanks, localRank = 0;
    52. //initializing MPI
    53. MPICHECK(MPI_Init(&argc, &argv));
    54. MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
    55. MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
    56. //calculating localRank which is used in selecting a GPU
    57. uint64_t hostHashs[nRanks];
    58. char hostname[1024];
    59. getHostName(hostname, 1024);
    60. hostHashs[myRank] = getHostHash(hostname);
    61. MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
    62. for (int p=0; p
    63. if (p == myRank) break;
    64. if (hostHashs[p] == hostHashs[myRank]) localRank++;
    65. }
    66. //each process is using two GPUs
    67. int nDev = 2;
    68. float** sendbuff = (float**)malloc(nDev * sizeof(float*));
    69. float** recvbuff = (float**)malloc(nDev * sizeof(float*));
    70. cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
    71. //picking GPUs based on localRank
    72. for (int i = 0; i < nDev; ++i) {
    73. CUDACHECK(cudaSetDevice(localRank*nDev + i));
    74. CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
    75. CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
    76. CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
    77. CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
    78. CUDACHECK(cudaStreamCreate(s+i));
    79. }
    80. ncclUniqueId id;
    81. ncclComm_t comms[nDev];
    82. //generating NCCL unique ID at one process and broadcasting it to all
    83. if (myRank == 0) ncclGetUniqueId(&id);
    84. MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
    85. //initializing NCCL, group API is required around ncclCommInitRank as it is
    86. //called across multiple GPUs in each thread/process
    87. NCCLCHECK(ncclGroupStart());
    88. for (int i=0; i
    89. CUDACHECK(cudaSetDevice(localRank*nDev + i));
    90. NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));
    91. }
    92. NCCLCHECK(ncclGroupEnd());
    93. //calling NCCL communication API. Group API is required when using
    94. //multiple devices per thread/process
    95. NCCLCHECK(ncclGroupStart());
    96. for (int i=0; i
    97. NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
    98. comms[i], s[i]));
    99. NCCLCHECK(ncclGroupEnd());
    100. //synchronizing on CUDA stream to complete NCCL communication
    101. for (int i=0; i
    102. CUDACHECK(cudaStreamSynchronize(s[i]));
    103. //freeing device memory
    104. for (int i=0; i
    105. CUDACHECK(cudaFree(sendbuff[i]));
    106. CUDACHECK(cudaFree(recvbuff[i]));
    107. }
    108. //finalizing NCCL
    109. for (int i=0; i
    110. ncclCommDestroy(comms[i]);
    111. }
    112. //finalizing MPI
    113. MPICHECK(MPI_Finalize());
    114. printf("[MPI Rank %d] Success \n", myRank);
    115. return 0;
    116. }

    rank0会执行ncclGetUniqueId获取Id,然后通过mpi广播给其他rank,接下来看下UniqueId是怎么产生的

    1. ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {
    2. NCCLCHECK(ncclInit());
    3. NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
    4. return bootstrapGetUniqueId(out);
    5. }

    然后看下ncclInit

    首先执行initEnv,设置环境变量

    然后执行initNet,用来初始化nccl所需要的网络,包括两个,一个是bootstrap网络,另外一个是数据通信网络,bootstrap网络主要用于初始化时交换一些简单的信息,比如每个机器的ip端口,由于数据量很小,而且主要是在初始化阶段执行一次,因此bootstrap使用的是tcp;而通信网络是用于实际数据的传输,因此会优先使用rdma(支持gdr的话会优先使用gdr)

    1. ncclResult_t initNet() {
    2. // Always initialize bootstrap network
    3. NCCLCHECK(bootstrapNetInit());
    4. NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
    5. if (ncclNet != NULL) return ncclSuccess;
    6. if (initNet(&ncclNetIb) == ncclSuccess) {
    7. ncclNet = &ncclNetIb;
    8. } else {
    9. NCCLCHECK(initNet(&ncclNetSocket));
    10. ncclNet = &ncclNetSocket;
    11. }
    12. return ncclSuccess;
    13. }

    bootstrapNetInit就是bootstrap网络的初始化,主要就是通过findInterfaces遍历机器上所有的网卡信息,通过prefixList匹配选择使用哪些网卡,将可用网卡的信息保存下来,将ifa_name保存到全局的bootstrapNetIfNames,ip地址保存到全局bootstrapNetIfAddrs,默认除了docker和lo其他的网卡都可以使用,例如在测试机器上有三张网卡,分别是xgbe0,xgbe1,xgbe2,那么就会把这三个ifaname和对应的ip地址保存下来,另外nccl提供了环境变量NCCL_SOCKET_IFNAME可以用来指定想用的网卡名,例如通过export NCCL_SOCKET_IFNAME=xgbe0来指定使用xgbe0,其实就是通过prefixList来匹配做到的

    1. static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {
    2. struct netIf userIfs[MAX_IFS];
    3. bool searchNot = prefixList && prefixList[0] == '^';
    4. if (searchNot) prefixList++;
    5. bool searchExact = prefixList && prefixList[0] == '=';
    6. if (searchExact) prefixList++;
    7. int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);
    8. int found = 0;
    9. struct ifaddrs *interfaces, *interface;
    10. getifaddrs(&interfaces);
    11. for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {
    12. if (interface->ifa_addr == NULL) continue;
    13. int family = interface->ifa_addr->sa_family;
    14. if (family != AF_INET && family != AF_INET6)
    15. continue;
    16. if (sock_family != -1 && family != sock_family)
    17. continue;
    18. if (family == AF_INET6) {
    19. struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);
    20. if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
    21. }
    22. if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {
    23. continue;
    24. }
    25. bool duplicate = false;
    26. for (int i = 0; i < found; i++) {
    27. if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; }
    28. }
    29. if (!duplicate) {
    30. strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);
    31. int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
    32. memcpy(addrs+found, interface->ifa_addr, salen);
    33. found++;
    34. }
    35. }
    36. freeifaddrs(interfaces);
    37. return found;
    38. }

    然后开始初始化通信网络

    ncclNet_t结构体是一系列的函数指针,比如初始化,发送,接收等;socket,IB等通信方式都实现了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通信网络的过程就是依次看哪个通信模式可用,然后赋值给全局的ncclNet

    1. ncclResult_t initNet() {
    2. // Always initialize bootstrap network
    3. NCCLCHECK(bootstrapNetInit());
    4. NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
    5. if (ncclNet != NULL) return ncclSuccess;
    6. if (initNet(&ncclNetIb) == ncclSuccess) {
    7. ncclNet = &ncclNetIb;
    8. } else {
    9. NCCLCHECK(initNet(&ncclNetSocket));
    10. ncclNet = &ncclNetSocket;
    11. }
    12. return ncclSuccess;
    13. }

    首先执行initNetPlugin,查看是否有libnccl-net.so,测试环境没有这个so,所以直接返回

    然后尝试使用IB网络:

    首先执行ncclNetIb的init函数,就是ncclIbInit

    1. ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
    2. static int shownIbHcaEnv = 0;
    3. if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; }
    4. if (ncclParamIbDisable()) return ncclInternalError;
    5. if (ncclNIbDevs == -1) {
    6. pthread_mutex_lock(&ncclIbLock);
    7. wrap_ibv_fork_init();
    8. if (ncclNIbDevs == -1) {
    9. ncclNIbDevs = 0;
    10. if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {
    11. WARN("NET/IB : No IP interface found.");
    12. return ncclInternalError;
    13. }
    14. // Detect IB cards
    15. int nIbDevs;
    16. struct ibv_device** devices;
    17. // Check if user defined which IB device:port to use
    18. char* userIbEnv = getenv("NCCL_IB_HCA");
    19. if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);
    20. struct netIf userIfs[MAX_IB_DEVS];
    21. bool searchNot = userIbEnv && userIbEnv[0] == '^';
    22. if (searchNot) userIbEnv++;
    23. bool searchExact = userIbEnv && userIbEnv[0] == '=';
    24. if (searchExact) userIbEnv++;
    25. int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);
    26. if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;
    27. for (int d=0; d
    28. struct ibv_context * context;
    29. if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {
    30. WARN("NET/IB : Unable to open device %s", devices[d]->name);
    31. continue;
    32. }
    33. int nPorts = 0;
    34. struct ibv_device_attr devAttr;
    35. memset(&devAttr, 0, sizeof(devAttr));
    36. if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {
    37. WARN("NET/IB : Unable to query device %s", devices[d]->name);
    38. if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
    39. continue;
    40. }
    41. for (int port = 1; port <= devAttr.phys_port_cnt; port++) {
    42. struct ibv_port_attr portAttr;
    43. if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {
    44. WARN("NET/IB : Unable to query port %d", port);
    45. continue;
    46. }
    47. if (portAttr.state != IBV_PORT_ACTIVE) continue;
    48. if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND
    49. && portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;
    50. // check against user specified HCAs/ports
    51. if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {
    52. continue;
    53. }
    54. TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port,
    55. portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
    56. ncclIbDevs[ncclNIbDevs].device = d;
    57. ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;
    58. ncclIbDevs[ncclNIbDevs].port = port;
    59. ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;
    60. ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);
    61. ncclIbDevs[ncclNIbDevs].context = context;
    62. strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);
    63. NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));
    64. ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;
    65. ncclNIbDevs++;
    66. nPorts++;
    67. pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);
    68. }
    69. if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
    70. }
    71. if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; };
    72. }
    73. if (ncclNIbDevs == 0) {
    74. INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");
    75. } else {
    76. char line[1024];
    77. line[0] = '\0';
    78. for (int d=0; d
    79. snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName,
    80. ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
    81. }
    82. line[1023] = '\0';
    83. char addrline[1024];
    84. INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));
    85. }
    86. pthread_mutex_unlock(&ncclIbLock);
    87. }
    88. return ncclSuccess;
    89. }

    首先第三行通过wrap_ibv_symbols加载动态库libibverbs.so,然后获取动态库的各个函数

    然后通过wrap_ibv_fork_init避免fork引起rdma网卡读写出错

    后面会讲到ib网络也会用到socket进行带外网络的传输,所以这里也通过findInterfaces获取一个可用的网卡保存到ncclIbIfAddr

    然后通过ibv_get_device_list获取所有rdma设备到devices中,遍历devices的每个device,因为每个HCA可能有多个物理port,所以对每个device遍历每一个物理port,获取每个port的信息,然后将相关信息保存到全局的ncclIbDevs中,比如是哪个device的哪个port,使用的是IB还是ROCE,device的pci路径,maxqp,device的name等,注意这里也有类似bootstrap网络NCCL_SOCKET_IFNAME的环境变量,叫NCCL_IB_HCA,可以指定使用哪个IB HCA

    到这里整个初始化的过程就完成了,一句话总结就是获取了当前机器上所有可用的IB网卡和普通以太网卡然后保存下来

    然后开始生成UniqueId

    1. ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
    2. ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
    3. void* listenComm;
    4. NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
    5. pthread_t thread;
    6. pthread_create(&thread, NULL, bootstrapRoot, listenComm);
    7. return ncclSuccess;
    8. }

    ncclNetHandle_t也是一个字符数组,然后执行bootstrapNetListen

    1. static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {
    2. union socketAddress* connectAddr = (union socketAddress*) netHandle;
    3. static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");
    4. // if dev >= 0, listen based on dev
    5. if (dev >= 0) {
    6. NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));
    7. } else if (dev == findSubnetIf) {
    8. ...
    9. } // Otherwise, handle stores a local address
    10. struct bootstrapNetComm* comm;
    11. NCCLCHECK(bootstrapNetNewComm(&comm));
    12. NCCLCHECK(createListenSocket(&comm->fd, connectAddr));
    13. *listenComm = comm;
    14. return ncclSuccess;
    15. }

    然后依次看下这三个函数,首先是通过bootstrapNetGetSocketAddr获取一个可用的ip地址

    1. static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {
    2. if (dev >= bootstrapNetIfs) return ncclInternalError;
    3. memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));
    4. return ncclSuccess;
    5. }

    此时dev是0, bootstrapNetIfs是初始化bootstrap网络的时候一共找到了几个可用的网卡,这里就是获取了第0个可用的ip地址

    然后是通过bootstrapNetNewComm创建bootstrapNetComm,bootstrapNetComm其实就是fd,bootstrapNetNewComm其实就是new了一个bootstrapNetComm

    1. struct bootstrapNetComm {
    2. int fd;
    3. };

    然后通过createListenSocket启动socker server

    1. static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {
    2. /* IPv4/IPv6 support */
    3. int family = localAddr->sa.sa_family;
    4. int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
    5. /* Create socket and bind it to a port */
    6. int sockfd = socket(family, SOCK_STREAM, 0);
    7. if (sockfd == -1) {
    8. WARN("Net : Socket creation failed : %s", strerror(errno));
    9. return ncclSystemError;
    10. }
    11. if (socketToPort(&localAddr->sa)) {
    12. // Port is forced by env. Make sure we get the port.
    13. int opt = 1;
    14. #if defined(SO_REUSEPORT)
    15. SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
    16. #else
    17. SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
    18. #endif
    19. }
    20. // localAddr port should be 0 (Any port)
    21. SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");
    22. /* Get the assigned Port */
    23. socklen_t size = salen;
    24. SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");
    25. #ifdef ENABLE_TRACE
    26. char line[1024];
    27. TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));
    28. #endif
    29. /* Put the socket in listen mode
    30. * NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn
    31. */
    32. SYSCHECK(listen(sockfd, 16384), "listen");
    33. *fd = sockfd;
    34. return ncclSuccess;
    35. }

    创建监听fd,ip由localaddr指定,初始端口为0,bind时随机找一个可用端口,并通过getsockname(sockfd, &localAddr->sa, &size)将ip端口写回到localaddr,这里localaddr就是UniqueId

    到这里UniqueId也就产生了,其实就是当前机器的ip和port

  • 相关阅读:
    【springcloud】环境搭建与Rest使快速上手
    [附源码]java毕业设计幼儿园管理系统
    LiveMedia视频中间件如何与第三方系统实现事件录像关联
    linux出现oom分析流程
    【过程记录】通过ssh上传Github仓库
    CSDN编程竞赛第三期 参赛经历分享
    win和linux动静态链接库介绍,以及win下动态库生成和调用方法
    Jenkins安装和配置 (一)
    大数据课程M2——ELK的ELASTICSEARCH概述
    让我看看谁还不会现在火爆的Spring Cloud,赶紧跟着大佬学起来
  • 原文地址:https://blog.csdn.net/KIDGIN7439/article/details/126712106