https://huggingface.co/blog/AviSoori1x/makemoe2
https://github.com/AviSoori1x/makeMoE
在预训练稀疏混合专家语言模型或任何大型语言模型时,该过程通常跨越多个GPU,甚至涉及多台机器。跨这些硬件资源并行训练的方式对于平衡计算负载至关重要。然而,如果某些专家或一组专家被过度偏爱,不仅可能导致模型性能问题,还可能导致集群中的计算负载不平衡。
Switch Transformer 使用专家容量来规避这一问题。专家容量确定了在训练或推断过程中每个专家负责处理多少标记,并设置了每个专家处理的标记数量上限。它根据批次中的标记数量和可用专家数量进行定义,通常通过容量因子进行调整。该因子允许在分配中灵活调整,提供缓冲以应对数据分布的变化,并确保没有单个专家因过载而成为瓶颈。在训练这些大型模型时,硬件故障是常见的,持续时间可能长达数周甚至数月,因此这一点非常重要。
通常计算专家容量的方法如下: 专家容量 = (每批batch 内 token数 / 专家数量) × 容量因子 其中:
每批batch 内 token数 是 一个batch中需要处理的标记总数。专家数量是MoE层中可用的专家总数,用于处理数据。容量因子是用于调整基础容量(每批标记数除以专家数量)的乘数。大于1的容量因子允许每个专家处理超出均匀分配份额的缓冲,以适应标记分配的不平衡。该值的一般范围为1-1.25。
以下代码块进行了轻微调整,以实现专家能力的简单版本
class SparseMoE(nn.Module):
def \_\_init\_\_(self, n\_embed, num\_experts, top\_k, capacity\_factor=1.0):
super(SparseMoE, self).__init__()
self.router = NoisyTopkRouter(n_embed, num_experts, top_k)
self.experts = nn.ModuleList([Expert(n_embed) for _ in range(num_experts)])
self.top_k = top_k
self.capacity_factor = capacity_factor
self.num_experts = num_experts
def forward(self, x):
# Assuming x has shape [batch\_size, seq\_len, n\_embd]
batch_size, seq_len, _ = x.shape
gating_output, indices = self.router(x)
final_output = torch.zeros_like(x)
# Flatten the batch and sequence dimensions to treat each token independently
flat_x = x.view(-1, x.size(-1)) # Now shape [batch\_size * seq\_len, n\_embd]
flat_gating_output = gating_output.view(-1, gating_output.size(-1))
tokens_per_batch = batch_size * seq_len * self.top_k
expert_capacity = int((tokens_per_batch / self.num_experts) * self.capacity_factor)
updates = torch.zeros_like(flat_x)
for i, expert in enumerate(self.experts):
expert_mask = (indices == i).any(dim=-1)
flat_mask = expert_mask.view(-1)
selected_indices = torch.nonzero(flat_mask).squeeze(-1)
limited_indices = selected_indices[:expert_capacity] if selected_indices.numel() > expert_capacity else selected_indices
if limited_indices.numel() > 0:
expert_input = flat_x[limited_indices]
expert_output = expert(expert_input)
gating_scores = flat_gating_output[limited_indices, i].unsqueeze(1)
weighted_output = expert_output * gating_scores
updates.index_add_(0, limited_indices, weighted_output)
# Reshape updates to match the original dimensions of x
final_output += updates.view(batch_size, seq_len, -1)
return final_output
首先来看一下专家容量计算。
expert_capacity = int((tokens_per_batch / self.num_experts) * self.capacity_factor)
这很简单。将其包含在正向传递中的原因是要考虑使用动态批量大小的情况。
接下来重要的一行是:
limited_indices = selected_indices[:expert_capacity] if selected_indices.numel() > expert_capacity else selected_indices
if limited_indices.numel() > 0:
#remaining logic to process and accumulate weighted expert outputs for selected tokens.
张量标识了专家处理的第i个标记。如果分配给该专家的总标记数超过其容量,那么张量将被截断以匹配专家的最大处理能力。
否则,它将被直接用于改专家下的计算。这些计算涉及通过专家确定每个标记的输出,然后应用相应的门控值来得出加权输出。这个加权输出被逐步地与最终输出张量结合。
这种管理专家能力的方法相对基础。在文献中探讨了更高级的策略,比如谷歌论文中讨论的Switch Transformer架构,可在https://arxiv.org/abs/2101.03961
上找到。