• CogView中的Word embeddings (parallel)


    入门小菜鸟,希望像做笔记记录自己学的东西,也希望能帮助到同样入门的人,更希望大佬们帮忙纠错啦~侵权立删。

    目录

    一、用途

    二、代码解析

    1、__init__

    A、参数意义

    B、对查询表的设定

    C、对模型的一些其他设定(包括对权重,是否填充等等)

    D、分别获取分布式训练中给每个进程开始词和结束词的index列表

    E、获得相应的初始化权重参数矩阵

    2、forward

    A、判断输入的词的index是否在进程index中

    B、构建输入词在该进程中的index列表,同时对是否全在该范围内进行标识

    C、获取嵌入词

    D、标识记录并进行数据汇总


    一、用途

    转化为词向量

    二、代码解析

     这一部分在mpu/layers.py里的VocabParallelEmbedding类里实现

    1、__init__

    1. def __init__(self, num_embeddings, embedding_dim,
    2. init_method=init.xavier_normal_):
    3. super(VocabParallelEmbedding, self).__init__()

    A、参数意义

    num_embeddings查询表(词典)的大小(即有多少个词);

    embedding_dim每个查询向量的维度(即为每个词创建一个多少维的向量来表示);

    init_method初始化权重方法————这里初始化为服从正态分布

    B、对查询表的设定

    1. # Keep the input dimensions.
    2. self.num_embeddings = num_embeddings
    3. self.embedding_dim = embedding_dim

    C、对模型的一些其他设定(包括对权重,是否填充等等)

    1. # Set the detauls for compatibility.
    2. self.padding_idx = None
    3. #padding_idx选择了不填充
    4. self.max_norm = None
    5. #max_norm权重约束————这里选择了最大范数不做约束
    6. self.norm_type = 2.
    7. #norm_type指定利用2范数计算。
    8. self.scale_grad_by_freq = False
    9. #scale_grad_by_freq根据单词在mini-batch(一个batchsize)中出现的频率,对梯度进行放缩.这里不启用。
    10. self.sparse = False
    11. #这里不启用专门处理稀疏张量的模块
    12. self._weight = None

    (1)padding_idx——自然语言中使用批处理时候, 每个句子的长度并不一定是等长的, 这时候就需要对较短的句子进行padding。这里默认选择了不填充

    (2)max_norm权重约束——最大范数:如果嵌入向量的范数超过了这个界限,就要进行再归一化。这里默认不做约束

    (3)norm_type指定利用什么范数计算,并用于对比max_norm,这里选择2范数。

    (4)scale_grad_by_freq根据单词在mini-batch(一个batchsize)中出现的频率,对梯度进行放缩

    (5)sparse:torch.sparse是一个专门处理稀疏张量的模块。(通常,张量会按一定的顺序连续地进行存取。但是,对于一个存在很多空值的稀疏张量来说,顺序存储的效率显得较为低下)只有足够稀疏的张量使用这种方式进行存储才能获得更高的效率。——这里不启用。

    补充说明:sparse中,有意义的值被称为specified elements,而无意义的值(空值,通常为0,但是也可以是其他值)则被称为fill value。

    (6)_weight:这个好像没什么用啊(如果他有什么用处的话可不可以在评论区里吱我一声呀,谢谢啦~)

    D、分别获取分布式训练中给每个进程开始词和结束词的index列表

    1. #分别获取分布式训练中给每个进程开始词和结束词的index列表
    2. self.vocab_start_index, self.vocab_end_index = \
    3. VocabUtility.vocab_range_from_global_vocab_size(
    4. self.num_embeddings, get_model_parallel_rank(),
    5. get_model_parallel_world_size())

    这里调用了mpu/utils.py里的VocabUtility类中的vocab_range_from_global_vocab_size方法,mpu/initialize.py里的get_model_parallel_rank函数和get_model_parallel_world_size函数。

    (1)VocabUtility类中的vocab_range_from_global_vocab_size方法

    1. @staticmethod
    2. #参数设定:global_vocab_size——词典中词的数量;rank——模型并行组的进程标识(模型并行组中每个进程都有唯一标识符);world_size——分布式组中的进程数
    3. def vocab_range_from_global_vocab_size(global_vocab_size, rank, world_size):
    4. per_partition_vocab_size = divide(global_vocab_size, world_size)#得到一个进程里有多少词
    5. return VocabUtility.vocab_range_from_per_partition_vocab_size(
    6. per_partition_vocab_size, rank, world_size)

    ✨参数设定:

    • global_vocab_size——词典中词的数量
    • rank——模型并行组的进程标识(模型并行组中每个进程都有唯一标识符)
    • world_size——分布式组中的进程数

    ✨divide函数(mpu/utils.py里)——就是实现个保证可以整除,这里为了得到一个进程中的词数量

    1. def ensure_divisibility(numerator, denominator):
    2. """Ensure that numerator is divisible by the denominator."""
    3. assert numerator % denominator == 0, '{} is not divisible by {}'.format(
    4. numerator, denominator)
    5. def divide(numerator, denominator):
    6. """Ensure that numerator is divisible by the denominator and return
    7. the division value."""
    8. ensure_divisibility(numerator, denominator)#调用上面的函数:判断是否整除
    9. return numerator // denominator #直接进行一个强制整除返回

    ✨VocabUtility类中的vocab_range_from_per_partition_vocab_size方法——获取进程开始词和结束词的index(这里的world_size应该是没有用的)

    1. #获取进程开始词和结束词的index
    2. #参数说明:per_partition_vocab_size——一个进程中词的数目;rank——每个进程的标识
    3. @staticmethod
    4. def vocab_range_from_per_partition_vocab_size(per_partition_vocab_size,
    5. rank, world_size):
    6. index_f = rank * per_partition_vocab_size#计算每个进程开始词的表示index
    7. index_l = index_f + per_partition_vocab_size#计算每个进程组结束词的表示index
    8. return index_f, index_l

    (2)get_model_parallel_rank()——返回模型并行组的进程标识

    1. #返回模型并行组的进程标识
    2. def get_model_parallel_rank():
    3. """Return my rank for the model parallel group."""
    4. return torch.distributed.get_rank(group=get_model_parallel_group())

    (3)get_model_parallel_world_size()——返回分布式组中的进程数

    1. #返回分布式组中的进程数
    2. def get_model_parallel_world_size():
    3. """Return world size for the model parallel group."""
    4. return torch.distributed.get_world_size(group=get_model_parallel_group())

    (4)get_model_parallel_group()——判断是否有模型并行组,有则返回该模型并行组。

    (get_model_parallel_rank()和get_model_parallel_world_size()都用到了这个函数

    1. def get_model_parallel_group():
    2. """Get the model parallel group the caller rank belongs to."""
    3. assert _MODEL_PARALLEL_GROUP is not None, \
    4. 'model parallel group is not initialized'
    5. return _MODEL_PARALLEL_GROUP

    E、获得相应的初始化权重参数矩阵

    (1)获取每个进程词的数目(即输入的矩阵的行数)的列表

    1. #获取每个进程词的数目(即输入的矩阵的行数)
    2. self.num_embeddings_per_partition = self.vocab_end_index - \
    3. self.vocab_start_index

    (2)调用torch.nn.parameter的Parameter类获得权重矩阵的格式

    1. #获得权重矩阵的格式
    2. self.weight = Parameter(torch.Tensor(self.num_embeddings_per_partition,
    3. self.embedding_dim))

    (3)启用模型并行

            self.weight.model_parallel = True

    (4)初始化权重矩阵

    1. #初始化权重矩阵
    2. _initialize_affine_weight(
    3. self.weight, self.num_embeddings, self.embedding_dim,
    4. self.num_embeddings_per_partition, 0, init_method)

    这里调用了_initialize_affine_weight函数

    1. #参数设定:weight——权重矩阵格式;output_size权重矩阵的行数;input_size权重矩阵的列数;per_partition_size记录每个进程的大小的列表;partition_dim进程深度(分割张量的维度);init_method初始化权重矩阵的方法;stride步幅;return_master_weight是否返回矩阵
    2. def _initialize_affine_weight(weight, output_size, input_size,
    3. per_partition_size, partition_dim, init_method,
    4. stride=1, return_master_weight=False):
    5. """Initialize affine weight for model parallel.
    6. Build the master weight on all processes and scatter
    7. the relevant chunk."""

    ✨参数设定:

    • weight——权重矩阵格式;
    • output_size权重矩阵的行数;
    • input_size权重矩阵的列数;
    • per_partition_size记录每个进程的大小的列表;
    • partition_dim进程深度(分割张量的维度);
    • init_method初始化权重矩阵的方法;
    • stride步幅;
    • return_master_weight是否返回矩阵

    ✨获取进程数(每个进程组里有多少个进程)——默认情况下,只有一个进程组

        world_size = get_model_parallel_world_size()#获取进程数(每个进程组里有多少个进程)

    ✨若只有一个进程,直接按初始化方法初始化权重后结束

    1. if world_size == 1:
    2. init_method(weight)
    3. if return_master_weight:
    4. return weight
    5. return None

    ✨初始化一个output_size*input_size的矩阵

    1. master_weight = torch.empty(output_size, input_size,
    2. dtype=weight.dtype,
    3. requires_grad=False)
    4. init_method(master_weight)#用特定的初始化方法初始化该矩阵

    ✨按进程拆分矩阵

    1. per_partition_per_stride_size = divide(per_partition_size, stride)#得到一个进程每一份的大小(行数)
    2. weight_list = torch.split(master_weight, per_partition_per_stride_size,
    3. dim=partition_dim)#将权重矩阵分为per_partition_per_stride_size大小的几份——这里相当于按进程划分
    4. rank = get_model_parallel_rank()#获取模型并行组的进程标识
    5. my_weight_list = weight_list[rank::world_size]#以rank为起始以进程数为步长的权重列表

    ✨拼接

    1. with torch.no_grad():
    2. torch.cat(my_weight_list, dim=partition_dim, out=weight)#拼接,输出张量为weight
    3. if return_master_weight:
    4. return master_weight
    5. return None

    2、forward

    A、判断输入的词的index是否在进程index中

    1. def forward(self, input_):
    2. # Build the mask.
    3. input_mask = (input_ < self.vocab_start_index) | \
    4. (input_ >= self.vocab_end_index)#输入的index是否全在该进程index中(在为false)

    B、构建输入词在该进程中的index列表,同时对是否全在该范围内进行标识

    1. masked_input = input_.clone() - self.vocab_start_index#得出input是在该进程中的第几个词
    2. masked_input[input_mask] = 0#标识输入的index是否全在该进程index中(第0元素为0说明全都在)

    C、获取嵌入词

    1. output_parallel = F.embedding(masked_input, self.weight,
    2. self.padding_idx, self.max_norm,
    3. self.norm_type, self.scale_grad_by_freq,
    4. self.sparse)

    D、标识记录并进行数据汇总

    1. # Mask the output embedding.
    2. output_parallel[input_mask, :] = 0.0#将标识的那一行全部变为0
    3. # Reduce across all the model parallel GPUs.
    4. output = reduce_from_model_parallel_region(output_parallel)#对所有进程内的数据进行汇总,并且让所有进程都获取最终结果
    5. return output

    这里的reduce_from_model_parallel_region()调用了torch.distributed.all_reduce()函数。

    原理如下:


    欢迎大家在评论区中批评指正,谢谢~

  • 相关阅读:
    CST仿真软件数据后处理--S参数
    29、亲身体验Young GC风暴:模拟教程带你走进GC的神秘世界!
    procast模拟浇注时没有出现金属液体怎么解决
    【EC200U】何为QuecPython以及QPYcom基础操作
    仓库管理无纸化,WMS仓库管理软件+条形码技术
    宇泰(UTEK)5口全电口以太网交换机工业级导轨UT-LLDC-5TDS-24正品
    【kyuubi k8s】kyuubi发布k8s执行spark sql
    显示屏分辨率计算
    C语言之通讯录的实现篇优化版
    移动端适配和响应式页面中的常用单位
  • 原文地址:https://blog.csdn.net/weixin_55073640/article/details/126547357