动机、参考资料、涉及内容

动机

  • 熟悉 🤗 Transformers 的相关 API 与源码
  • 熟悉 🤗 Tokenizers 的相关 API 与源码
  • 深入理解 T5 的训练与推理步骤,包括每一步的计算过程
  • 适当补充相关知识

参考资料

  • 🤗 Transformers 4.26.1 源代码
  • 🤗 Transformers 官方文档
  • T5原始论文
    • 论文地址:https://arxiv.org/pdf/1910.10683.pdf
    • 标题:Exploring the Limits of Transfer Learning with a Unified Text-to-Text Transformer
    • 机构:Google

注意事项

主要从两个视角来写这篇博客:

  • 原理视角:主要是论文里描述为主,但缺点是某些地方可能会有一定的模糊
  • 实现视角:以 🤗 Transformers 的实际实现为准

Overview: T5

T5 模型尝试将所有的 NLP 任务做了一个统一处理,即:将所有的 NLP 任务都转化为 Text-to-Text 任务。如原论文下图所示:

绿色的框是一个翻译任务(英文翻译为德文),按照以往标准的翻译模型的做法,模型的输入为:That is good.,期望模型的输出为:Das ist gut.,而 T5 的做法是将输入转化为:translate English to German: That is good.,期望的输出依然维持原样。也就是将 NLP 任务的描述也加在了模型输入里。原文中附录 D 中给出了更多的例子。

在模型结构上,T5 模型采用了 Encoder-Decoder 的架构,从大体上说,对于训练过程,伪代码如下:

x, y = "translate English to German: That is good.", "Das ist gut."
x = tokenizer(x)  # [0, 23, 45, 89, 1230, 4, 9], 其中0代表<BOS>, 在实现中<PAD>也是0
y = tokenizer(y)  # [0, 44, 156, 4, 1], 其中1代表<EOS>
x_embedding = encoder.embed_layer(x)  # 将token转换为embedding, x_embedding的形状为(7, 768)
encoded_x = encoder.other_layer(x_embedding)  # 经过encoder后encoded_x的形状为(7, 768)

input_y = y[:-1]  # [0,  44,  156, 4]
# 将token转化为emdedding, input_y_emdedding的形状为(4, 768)
input_y_emdedding = decoder.embed_layer(input_y)  # 在T5的设计中,encoder.embed_layer与decoder.embed_layer共享参数
target_y = y[1:]  # [44, 156, 4,   1]

# decoder_output的形状为(4, 768)
decoder_output = decoder.other_layer(encoded_x, input_y_emdedding)

# logits 的形状为(4, vocab_size=32128)
logits = linear_layer(decoder_output)  # 在T5的设计中,decoder.embed_layer与linear_layer共享参数

# 接下来使用 softmax 与普通的交叉熵计算损失
loss = loss_fn(logits, target_y)

Overview: 🤗 Transformers

对于 🤗 Transformers 的源码阅读而言,本文主要的关注点在于以下部分,首先 🤗 Transformers github 项目的目录结构如下(节选)

examples                         # 一些示例代码, 可供学习, 但不确保能与当前版本兼容
  - flax/language-modeling/t5_tokenizer_model.py  # t5 tokenizer 训练参考
  - flax/language-modeling/run_t5_mlm_flax.py     # t5 mask-LM 预训练参考
  - pytorch/summarization                         # t5 生成式模型训练参考
src/transformers
  - generation/
    - beam_constraints.py        # constraint_beam_search 辅助方法/类: Constraint, ConstraintListState
    - beam_search.py             # beam_search 辅助方法/类: BeamSearchScorer, ConstrainedBeamSearchScorer, BeamHypotheses
    - configuration_utils.py     # 生成式模型的统一配置文件, 用来控制生成算法及各类超参数, 例如生成长度惩罚
    - logit_process.py           # 生成过程时对log-softmax score的后处理:LogitsProcessor, LogitsWarpper
    - stopping_criteria.py       # 生成中止条件:StoppingCriteria
    - streamer.py                # transformers 4.28.0 版本新增, 用于生成字符时流式逐词输出
    - utils.py                   # GenerationMixin 的实现
    ...
  - models/  # 每个模型为一个单独的文件夹, 每个文件夹的文件结构比较固定, 参考t5子文件夹
    - t5/
      - __init__.py
      - convert_t5_original_tf_checkpoint_to_pytorch.py  # 有些模型原始官方仓库的权重需要通过转换得到 🤗 Transformers 中模型定义下模型载入的格式, 这种情况下会维护一个转换脚本
      - modeling_flax_t5.py      # flax版本的模型结构代码, 本文不涉及
      - modeling_tf_t5.py        # tensorflow版本的模型结构代码, 本文不涉及
      - modeling_t5.py           # pytorch版本的模型结构代码
      - configuration_t5.py      # 每个模型都有一个自己的模型结构参数配置文件
      - tokenization_t5.py       # 每个模型的slow/python版本的tokenizer实现, 速度相对较慢
      - tokenization_t5_fast.py  # 每个模型的fast版本的tokenizer实现, 速度较快, 依赖于 🤗 Tokenizers
      - ...
    - ...
  - pipelines/                   # 封装tokenizer与model, 简化使用, 本文不涉及
  - modeling_outputs.py          # 模型输出结果的数据结构
  - modeling_utils.py            # 所有模型的基类: PreTrainedModel
  - tokenization_utils_base.py   # 所有tokenizer的基类: PreTrainedTokenizerBase
  - tokenization_utils.py        # 所有slow版本tokenizer的基类: PreTrainedTokenizer
  - tokenization_utils_fast.py   # 所有fast版本tokenizer的基类: PreTrainedTokenizerFast
  - trainer.py                   # Trainer类, 本文不涉及
  - trainer_callback.py          # Trainer类中使用到的 TrainerCallback/TrainerState/TrainerControl/CallbackHandler
  - integrations.py              # 高级日志记录工具, 例如: TensorBoardCallback
  - ...

原理解析:T5 训练过程的前向计算流程

encoder

首先给出总体的结构图

T5 模型的 Encoder 部分由若干个 Block 构成,每个 Block 都具有相同的结构:一个 Self-Attention Layer 和一个 Feed-Forward Layer。这里也首先给出伪代码:

class Encoder:
    def forward(self, x_token, x_attention_mask):
        # x_token: (B, L=512), long
        # x_attention: (B, L), 0/1 mask
        x_embedding = embedding_layer(x_token)
        hidden = dropout(x_embedding)  # (B, L, C=768)
        
        positional_bias = None
        for block in blocks:
            hidden_1 = block.layernorm_layer(hidden)  # LayerNorm层, hidden_1: (B, L, C)
            # Self-Attention层, attention_hidden: (B, L, C), postional_bias: (1, n_heads, L, L)
            # postional_bias在第一层被产生, 后面每一层都使用它(共享参数)
            attention_hidden, positional_bias = block.attention_layer(hidden_1, x_attention_mask, positional_bias)
            hidden = block.dropout(attention_hidden) + hidden  # 残差连接: hidden: (B, L, C)
            
            hidden = block.ff_layer(hidden)  # Feed-Forward层: hidden (B, L, C)
        
        hidden = layernorm_layer(hidden)  # hidden (B, L, C)
        hidden = dropout(hidden)  # hidden (B, L, C)
        return hidden

备注:在 🤗 Transformers 的实现中,将此处的 block.layernorm_layer, block.attention_layerblock.dropout 的计算逻辑包装在了一起,称为 T5LayerSelfAttention。而此处的 block.ff_layerT5LayerFF

LayerNorm Layer (Encoder)

class LayerNorm(torch.nn.Module):
    def __init__(self, hidden_size, eps=1e-6):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.variance_epsilon = eps

    def forward(self, hidden_states):
        # T5用的是简化版的layernorm对最后一维l2归一化后再每一维乘上一个权重, 不带偏置项
        # hidden_states: (B, L, C)
        # return: (B, L, C)
        variance = hidden_states.to(torch.float32).pow(2).mean(-1, keepdim=True)
        hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
        return self.weight * hidden_states

Self-Attention Layer (Encoder)

relative positional embedding

总共的 postional embedding 数目为 (num_bucket, n_head), T5 的 postional embedding 的 index 的取值范围为 [0, num_bucket)

双向 mask 的情况下, $n=num_bucket, m=max_distance$

\[\begin{equation*} index(i, j) = \frac{n}{2} * \mathbb{1}[i-j<0] + \left\{ \begin{aligned} &abs(i - j), &abs(i - j) < \frac{n}{4} \\ &\min(\frac{n}{2}-1, \frac{n}{4}\times(1+\frac{log(4\times abs(i - j)/n)}{log(4\times m/n)})), &abs(i - j) \ge \frac{n}{4} \end{aligned} \right. \end{equation*}\]
def relative_position_bidirectional(i, j, num_buckets=32, max_distance=128):
    position = i - j
    abs_position = abs(position)
    num_buckets = num_buckets // 2
    max_exact = num_buckets // 2
    offset = num_buckets if position < 0 else 0
    if abs_position < max_exact:
        return abs_position + offset
    else:
        ratio = math.log(abs_position/ max_exact) / math.log(max_distance / max_exact)
        return min(int(max_exact*(1+ratio)), num_buckets - 1) + offset

casual mask 的情况下,

\[\begin{equation*} index(i, j) = \left\{ \begin{aligned} &0, &i \ge j \\ &abs(i - j), &i < j\ and\ abs(i - j) < \frac{n}{2} \\ &\min(n-1, \frac{n}{2}\times(1+\frac{log(2\times abs(i - j)/n)}{log(2\times m/n)})), &i < j\ and\ abs(i - j) \ge \frac{n}{2} \end{aligned} \right. \end{equation*}\]
def relative_position_onedirectional(i, j, num_buckets=32, max_distance=128):
    position = i - j
    if position <= 0:
        return 0
    elif position < (num_buckets // 2):
        return position
    else:
        ratio = math.log(2 * position / num_buckets) / math.log(2 * max_distance / num_buckets)
        return min(int(num_buckets // 2 * (1 + ratio)), num_buckets - 1)

在 T5 模型的实验设置中:

num_bucket, max_distance = 32, 128

在 encoder 与 decoder 的第一层加上了 positional bias:

bias = nn.Embedding(num_buckect, n_heads)
positional_idx = ...  # 即上面的公式, (L, L)
scores = q @ k.T  # (B, L, L, n_heads)
positional_bias = bias(positional_idx)  # (L, L, n_heads)
scores += positional_bias
# weights = softmax(scores)

self-attention

class EncoderSelfAttention(torch.nn.Module):
    def __init__(self, d_model=768, d_qkv=64, n_heads=12,
        relative_attention_num_buckets=32, has_relative_bias=False, dropout_rate=0.1):
        """
        relative_attention_num_buckets: 见后面关于positional bias的说明
        has_relative_bias: 第1个EncoderBlock取值为True, 其余均为False
        """
        super().__init__()
        self.inner_dim = d_qkv * n_heads
        self.q, self.k, self.v = [nn.Linear(d_model, self.inner_dim) for i in range(3)]
        self.o = nn.Linear(self.inner_dim, d_model)
        self.dropout_rate = dropout_rate
        if has_relative_bias:
            self.relative_attention_bias = nn.Embedding(self.relative_attention_num_buckets, self.n_heads)

    def compute_bias(self, q_len=512, k_len=512):
        # q_len和k_len都是encoder输入的序列长度
        # 在decoder的self-attention的训练阶段, q_len和k_len都是decoder的输入长度
        
        # positions: (q_len, k_len) long tensor
        # 每个元素的取值范围都是[0, self.relative_attention_num_buckets=32)
        positions = get_relative_idx(q_len, k_len)
        
        bias = self.relative_attention_bias(positions).unsqueeze(0)  # (1, q_len, k_len, n_heads)
        bias = bias.transpose(0, 3, 1, 2)
        # bias: (1, n_heads, q_len, k_len), 其中第0维在计算中被广播, 即(B, n_heads, q_len, k_len)
        return bias
    
    def forward(self, hidden, attention_mask, bias=None):
        """
        Args:
            hidden: (B, L, d_model)
            attention_mask: (B, L) LongTensor, 有token的地方为1, pad处为0
            bias: 第1层输入为None, 后续层将第一层输出的bias作为输入
        """
        # q, k, v: (B, L, self.inner_dim)
        q, k, v = self.q(hidden), self.k(hidden), self.v(hidden)
        q = q.reshape(B, L, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, L=q_len, d_qkv)
        k = k.reshape(B, L, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, L=k_len, d_qkv)
        v = v.reshape(B, L, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, L=k_len, d_qkv)
        
        scores = torch.matmul(q, k.transpose(2, 3))  # (B, n_head, L, L)
        if bias is None:
            bias = self.compute_bias(L, L)  # (1, n_head, L, L)
            extended_mask = torch.where(attention_mask[:, None, None, :]==1, 0, -inf)  # (B, 1, 1, L)
            bias = bias + extended_mask  # (B, n_head, L, L)
        scores += bias
        attn_weights = nn.functional.softmax(scores, dim=-1)
        attn_weights = nn.functional.dropout(attn_weights, self.dropout_rate)
        hidden = torch.matmul(atten_weights, v)  # (B, n_heads, L, d_qkv)
        hidden = hidden.transpose(1, 2).view(B, L, self.inner_dim)  # (B, L, inner_dim)
        hidden = self.o(hidden)  # (B, L, d_model)
        return hidden, bias

Feed-Forward

见下图,含义自明

decoder

首先给出总体的结构图

Self-Attention Layer (Decoder)

Self-Attention Layer(Encoder) 的计算过程一致, 但有如下两个区别:

  • positional bias 使用单向的方式进行获取

  • mask 有些变化:

    bias = self.compute_bias(L, L)  # (1, n_head, L=trg_len, L=trg_len)
    mask = torch.triu(torch.ones((B, 1, L, L)))  # (B, 1, L, L), 下三角含对角线为1, 其余均为0
    extended_mask = torch.where(mask==1, 0, -inf)  # 下三角含对角线为0, 其余均为-inf
    bias = bias + extended_mask  # (B, n_head, L, L)
    

Cross-Attention Layer (Decoder)

class DecoderCrossAttention(torch.nn.Module):
    def __init__(self, d_model=768, d_qkv=64, n_heads=12, dropout_rate=0.1):
        # 没有postion bias的计算
        super().__init__()
        self.inner_dim = d_qkv * n_heads
        self.q, self.k, self.v = [nn.Linear(d_model, self.inner_dim) for i in range(3)]
        self.o = nn.Linear(self.inner_dim, d_model)
        self.dropout_rate = dropout_rate
    
    def forward(self, decoder_hidden, encoder_hidden, encoder_attention_mask):
        """
        Args:
            decoder_hidden: (B, trg_len, d_model)
            encoder_hidden: (B, src_len, d_model)
            encoder_attention_mask: (B, L) LongTensor, 输入序列有token的地方为1, pad处为0
        """
        q, k, v = self.q(decoder_hidden), self.k(encoder_hidden), self.v(encoder_hidden)
        q = q.reshape(B, trg_len, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, q_len=trg_len, d_qkv)
        k = k.reshape(B, src_len, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, k_len=src_len, d_qkv)
        v = v.reshape(B, src_len, n_heads, d_qkv).transpose(1, 2)  # (B, n_heads, k_len=src_len, d_qkv)
        
        scores = torch.matmul(q, k.transpose(2, 3))  # (B, n_heads, trg_len, src_len)
        
        bias = torch.zeros(B, n_heads, trg_len, src_len)  # (1, n_heads, trg_len, src_len)
        extended_mask = torch.where(attention_mask[:, None, None, :]==1, 0, -inf)  # (B, 1, 1, src_len)
        bias = bias + extended_mask  # (B, n_heads, trg_len, src_len)
        scores += bias

        attn_weights = nn.functional.softmax(scores, dim=-1)
        attn_weights = nn.functional.dropout(attn_weights, self.dropout_rate)
        hidden = torch.matmul(atten_weights, v)  # (B, n_heads, trg_len, d_qkv) = (B, n_heads, trg_len, src_len) * (B, n_heads, src_len, d_qkv)
        hidden = hidden.transpose(1, 2).view(B, trg_len, self.inner_dim)  # (B, trg_len, inner_dim)
        hidden = self.o(hidden)  # (B, trg_len, d_model)
        return hidden, bias

源码解析:🤗 Transformers 中 T5 训练过程的前向计算流程

如果对T5的计算逻辑基本熟悉的话,这里给出 🤗 Transformers 中的模型层次,可以帮助快速理解源码的实现逻辑:

# 注意: T5Attention 这个类同时实现了三类注意力机制
T5ForConditionalGeneration:
  - nn.Embedding  # A
  - encoder: T5Stack
    - nn.Embedding  # 与 A 是同一个
    - T5Block 
      - T5LayerSelfAttention
        - T5LayerNorm
        - T5Attention  # 全自注意力, 位于第一个T5Block中此模块含有一个nn.Embedding用于学习relative postional bias, 可学习参数形状:(num_bucket=32, num_heads=64)
        - nn.Dropout
      - T5LayerFF
    - T5Block
    ...
    - T5Block
    - T5LayerNorm
    - nn.Dropout
  - decoder: T5Stack
    - nn.Embedding  # 与 A 是同一个
    - T5Block
      - T5LayerSelfAttention
        - T5LayerNorm
        - T5Attention  # 因果自注意力, 位于第一个T5Block中此模块含有一个nn.Embedding用于学习relative postional bias, 可学习参数形状:(num_bucket=32, num_heads=64)
        - nn.Dropout
      - T5LayerCrossAttention
        - T5LayerNorm
        - T5Attention  # 与encoder的输出做注意力, 没有relative postional bias
        - nn.Dropout
      - T5LayerFF
    - T5Block
    ...
    - T5Block
    - T5LayerNorm
    - nn.Dropout
  - nn.Linear  # T5中的设计里与 A 共享参数

备注:在 🤗 Transformers 的源码实现里 T5Attention 比较复杂,它需要承担几项不同的工作:

  • 训练阶段:
    • 在 encoder 中执行全自注意力机制
    • 在 decoder 中的 T5LayerSelfAttention 中执行因果自注意力机制(训练时因为可以并行计算整个decoder序列的各个隐层向量,不需要考虑decoder前序token的key和value的缓存)
    • 在 decoder 中的 T5LayerCrossAttention 中执行对encoder输出的注意力机制(训练时因为可以并行计算整个decoder序列的各个隐层向量,不需要考虑encoder最后一层的key和value的缓存)
  • 推理阶段:
    • 在 encoder 中执行全自注意力机制,与训练时完全一致
    • 在 decoder 中的 T5LayerSelfAttention 中执行因果自注意力机制(推理时是串行解码,因此需要缓存decoder的之前所有token的key和value的缓存,计算当前token的隐层向量时也把当前token的key和value也缓存下来供后续计算)
    • 在 decoder 中的 T5LayerCrossAttention 中执行对encoder输出的注意力机制(推理时是串行解码,因此解码第一个字符时会缓存每一层针对encoder输出向量的key和value,解码后续字符时直接使用这些key和value缓存进行计算)

下面将不再按照 🤗 Transformers 的源码进行梳理,而是直接手写大部分层的实现来讲解,手写实现与 🤗 Transformers 实现的对应也在各小节给出。更为完整的对应关系可以参考:../assets/code/t5

源码解析:🤗 Transformers PretrainedModel【TODO】

repeat yourself 的典型例子:t5 与 mt5 代码完全相同

PretrainedModel 类有 4 个基类:

  • nn.Module: pytorch 模型基类
  • ModuleUtilsMixin: 见下面描述
  • GenerationMixin: 与文本生成相关的方法, 见后文描述
  • PushToHubMixin: 对外方法仅有一个 push_to_hub, 作用是将模型推送至 🤗 Hub 仓库, 此处不赘述

ModuleUtilsMixin 的方法穷举

memory hook

相关方法如下:

  • add_memory_hooks: 为 self.modules() 增加内存增加监控的 hook,使用到了以下的三个方法
  • _hook_rss_memory_pre_forward
  • _hook_rss_memory_post_forward
  • reset_memory_hooks_state

mask 相关【待补充】

  • invert_attention_mask
  • create_extended_attention_mask_for_decoder
  • get_extended_attention_mask
  • get_head_mask
  • _convert_head_mask_to_5d

device、dtype、num_parameters、estimate_tokens、floating_point_ops

  • device: 注意 nn.Module 没有这个属性, 这里用 self 里的 tensor 来获取 device 信息 (假设所有的 tensor 都在同一个 device 上)
  • dtype: 与 device 原理相同
  • num_parameters(only_trainable=False, exclude_embeddings=False): 统计模型的参数量
  • estimate_tokens(input_dict: Dict[str, torch.Tensor]): 辅助函数, 大多数时候用于统计 input_ids 的元素个数
  • floating_point_ops(input_dict, exclude_embeddings=True): 参考论文, 给出模型计算的浮点数运算次数, 估计值为:
    6 * self.estimate_tokens(input_dict) * self.num_parameters(exclude_embeddings=exclude_embeddings)
    

PretrainedModel 的方法与类属性穷举【待补充】

属性

class PretrainedModel:
  config_class = None
  base_model_prefix = ""
  main_input_name = "input_ids"
  _auto_class = None
  _no_split_modules = None
  _keep_in_fp32_modules = None

  # a list of `re` patterns of `state_dict` keys that should be removed from the list of missing
  # keys we find (keys inside the model but not in the checkpoint) and avoid unnecessary warnings.
  _keys_to_ignore_on_load_missing = None
  # a list of `re` patterns of `state_dict` keys that should be removed from the list of
  # unexpected keys we find (keys inside the checkpoint but not the model) and avoid unnecessary
  # warnings.
  _keys_to_ignore_on_load_unexpected = None
  # a list of `state_dict` keys to ignore when saving the model (useful for keys that aren't
  # trained, but which are either deterministic or tied variables)
  _keys_to_ignore_on_save = None

  is_parallelizable = False
  supports_gradient_checkpointing = False

实例化相关

牵涉到如下方法

  • from_pretrained: 最为重要的方法
    • __init__
    • _load_pretrained_model
    • _load_pretrained_model_low_mem
  • post_init
    • init_weights
      • prune_heads: 需要实现self.base_model._prune_heads
      • _init_weights: 由子类重载
      • tie_weights:
        • _tie_encoder_decoder_weights
        • _tie_or_clone_weights
        • 调用 self.modules() 实现的 _tie_weights(如果有实现的话)
    • _backward_compatibility_gradient_checkpointing
  • _from_config: classmethod

resize token

牵涉到的方法如下

  • resize_token_embeddings
    • _resize_token_embeddings
  • _get_resized_embeddings
  • _get_resized_lm_head
  • resize_position_embeddings: 需要子类实现【是否必须】
  • get_position_embeddings: 需要子类实现【是否必须】

save_pretrained

  • save_pretrained

获取信息

  • dummy_inputs:
  • framework:
  • can_generate:
  • base_model: return getattr(self, self.base_model_prefix, self)
    • get_input_embeddings: 默认使用 self.base_model.get_input_embeddings()
    • set_input_embeddings: 默认使用 self.base_model.set_input_embeddings()
  • get_output_embeddings: 子类重载此方法【作用是什么】
  • retrieve_modules_from_names

activation checkpointing

  • gradient_checkpointing_enable
  • gradient_checkpointing_disable
  • is_gradient_checkpointing

auto class

  • register_for_auto_class

其他【待补充】

  • _set_default_torch_dtype
  • get_memory_footprint
  • to
  • half
  • float

原理/源码解析:🤗 Transformers 中的文本生成策略

本节介绍 🤗 Transformers 里各种生成方式的详细算法

关于文本生成,🤗 Transformers 官方有如下几篇博客值得阅读:

使用 🤗 Transformers 生成文本,用法如下:

from transformers import T5Tokenizer, T5ForConditionalGeneration
pretrained_name_or_path = "t5-small"
tokenizer = T5Tokenizer.from_pretrained(pretrained_name_or_path)
model = T5ForConditionalGeneration.from_pretrained(pretrained_name_or_path)
inputs = tokenizer(["I'm a student, ", "Deep learning"])
generated_ids = model.generate(
  input_ids=inputs["input_ids"],
  attention_mask=inputs["attention_mask"], 
  max_length=32, 
  num_beams=5,
  repetition_penalty=2.5, 
  length_penalty=1.0, 
  early_stopping=True
  )

这里的 generate 函数是实现在 T5ForConditionalGeneration 的基类 PreTrainedModel 中的,而 generate 方法会根据传入的参数(例如这个例子里的 max_length, num_beams, repetition_penalty, length_penalty, early_stopping)选择更为具体的生成方式进行生成, 这些生成方式也都是在基类 PreTrainedModel 中进行实现的, 由此可以看出在生成策略上其实对于不同的模型是统一的。这些更为具体的生成方式列举如下,后面再加以详细介绍:

  • greedy_search:贪心策略
  • beam_search:beam search
  • sample:对每次得到的概率分布上进行采样
  • beam_sample:对每次得到的概率分布上进行采样后再进行 beam search
  • group_beam_search:对 beam_size 进行分组, 每组分别使用 beam search, 以提升 beam search 结果的多样性
  • constrained_beam_search:带约束条件的 beam search, 例如生成结果里必须要包含特定的词
  • contrastive_search:这篇论文里提出的一种生成方式,改善了 greedy search/beam search 生成结果经常出现重复句子(文献中成为 model degeneration)的情况, 也改善了 sample/beam sample 方法容易出现语义前后不一致的现象。

阅读 how-to-generate 博客的读者,可能会疑惑于 top k sampling 与 top p sampling (top p)实现在哪里, 这里给出简要的解释:这两个被抽象为对抽样过程概率分布的调整,因此插在概率分布与采样之间。另外,目前为止比较公认的相对优秀生成方式为带有 top p/top k 的 beam sample 方式,并且需要适当调节 length_penalty 参数用以促进/抑制生成的序列长度,调节 repetition_penalty 等相关参数控制重复词组出现的次数。总的来说,调整这些参数对生成质量还是有一些影响的,因此在 decoder 的解码策略上也一直有论文进行研究(例如:contrastive search 是 2022 年的工作),不同的大模型在开发过程中也可能会探索出新的生成策略。

本节后续内容组织如下:

  • 关于 generate 方法的简要介绍,涉及到 PreTrainedModel 的一些继承关系
  • GenerationConfig 的简介:只简单介绍一部分参数,其余的参考官方文档。重点介绍跟生成策略强相关的参数
  • 最简版本的 greedy search 介绍
  • LogitsProcessorLogitsWarpper 简介
  • beam_search
  • sample/beam_sample
  • group_beam_search
  • constrained_beam_search: 这个方法自成体系且实现上有些复杂, 需要额外介绍相关的实现细节
  • contrastive_search

PreTrainedModel, GenerationMixin, generate

本节主要按照 🤗 Transformers 的源码进行介绍,按照 🤗 Transformers 中的实现,T5ForConditionalGeneration 的继承关系如下:

class T5ForConditionalGeneration(T5PreTrainedModel):
  # 用于 load 权重时可以忽略
  _keys_to_ignore_on_load_missing = [
        r"encoder.embed_tokens.weight",
        r"decoder.embed_tokens.weight",
        r"lm_head.weight",
  ]
  _keys_to_ignore_on_load_unexpected = [
      r"decoder.block.0.layer.1.EncDecAttention.relative_attention_bias.weight",
  ]
  # ...

class T5PreTrainedModel(PreTrainedModel):
  config_class = T5Config
  base_model_prefix = "transformer"  # 在 from_pretrained 函数中 load 权重时有用
  def _init_weights(self, module):
    ...
  # 其他一些方法和属性从略

class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin, PushToHubMixin):
  pass

而与生成相关的代码主要实现在 GenerationMixin 中,一般而言,通过这个类的 generate 方法进行使用,根据不同的参数设定,会实际上通过调用以下 7 种方法来完成实际的生成:

generate 方法如下:

def generate(
  self,
  inputs: Optional[torch.Tensor] = None,
  generation_config: Optional[GenerationConfig] = None,
  logits_processor: Optional[LogitsProcessorList] = None,
  stopping_criteria: Optional[StoppingCriteriaList] = None,
  prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None,
  synced_gpus: Optional[bool] = False,
  **kwargs):
  # 只摘录核心部分
  if generation_config is None:
    generation_config = self.generation_config

  generation_config = copy.deepcopy(generation_config)
  # 根据kwargs更新
  model_kwargs = generation_config.update(**kwargs)  # All unused kwargs must be model kwargs
  self._validate_model_kwargs(model_kwargs.copy())

  logits_processor = logits_processor if logits_processor is not None else LogitsProcessorList()
  stopping_criteria = stopping_criteria if stopping_criteria is not None else StoppingCriteriaList()
  inputs_tensor, model_input_name, model_kwargs = self._prepare_model_inputs(
    inputs, generation_config.bos_token_id, model_kwargs
  )
  # 再根据generation_config增加一部分logits_processor
  logits_processor = self._get_logits_processor(generation_config, input_ids_seq_length, encoder_input_ids,
    prefix_allowed_tokens_fn, logits_processor)
  # 再根据generation_config增加一部分stopping_criteria
  stopping_criteria = self._get_stopping_criteria(generation_config, stopping_criteria)
  # 根据不同的generation_config设置, 分别调用上述7种方法
  ...

备注:此处的 generation_config 变量的类型为 GenerationConfig, 而 self.generation_configPreTrainedModel 实例化时得到的。

【此处需增加一个隐藏按钮】

class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin, PushToHubMixin):
  def __init__(self, config, *inputs, **kwargs):
    # *inputs, **kwargs 在此处未被使用到
    super().__init__()  # nn.Module的__init__函数, 其余继承类均为Mixin, 没有__init__函数
    self.config = config
    self.name_or_path = config.name_or_path
    self.warnings_issued = {}
    self.generation_config = GenerationConfig.from_model_config(config) if self.can_generate() else None
  @classmethod
  def from_pretrained(self, pretrained_model_name_or_path, *model_args, **kwargs):
    # 只摘录重要的部分
    config = kwargs.pop("config", None)
    if not isinstance(config, PretrainedConfig):
      config_path = config if config is not None else pretrained_model_name_or_path
      config, model_kwargs = cls.config_class.from_pretrained(..., **kwargs)
    else:
      model_kwargs = kwargs
    model = cls(config, *model_args, **model_kwargs)  # 调用 __init__
    state_dict = load_state_dict(resolved_archive_file)
    model, ... = cls._load_pretrained_model(model, state_dict, ...)  # load权重至模型
    model.eval()  # 将模型设置为eval模式
    if model.can_generate():  # 如果pretrained_model_name_or_path目录下包含generation_config.json文件, 则按这个文件重新初始化model.generation_config
      model.generation_config = GenerationConfig.from_pretrained(pretrained_model_name_or_path, ..., **kwargs)
  def save_pretrained(self, save_directory, ...):
    # 只摘录重要的部分
    os.makedirs(save_directory, exist_ok=True)
    model_to_save = unwrap_model(self)
    if is_main_process:
      model_to_save.config.save_pretrained(save_directory)
      if self.can_generate():
          model_to_save.generation_config.save_pretrained(save_directory)
    if state_dict is None:
      state_dict = model_to_save.state_dict()
    # 某些参数在保存时可以忽略
    if self._keys_to_ignore_on_save is not None:
        for ignore_key in self._keys_to_ignore_on_save:
            if ignore_key in state_dict.keys():
                del state_dict[ignore_key]
    # 在正常情况下(模型参数不多), shard是一个只有一个键值对的字典, key为"pytorch_model.bin", value即为state_dict
    weights_name = SAFE_WEIGHTS_NAME if safe_serialization else WEIGHTS_NAME
    shards, index = shard_checkpoint(state_dict, max_shard_size=max_shard_size, weights_name=weights_name)
    for shard_file, shard in shards.items():
        if safe_serialization:
            safe_save_file(shard, os.path.join(save_directory, shard_file), metadata={"format": "pt"})
        else:
            save_function(shard, os.path.join(save_directory, shard_file))
    # 保存index
    ...

GenerationConfig 的主要参数【TODO】

图解如下

LogitsProcessorLogitsWarpperStoppingCriteria【TODO】

  • 当 beam_size 为 1 时退化为 greedy_search
logits_processor: 实际上是对当前预测的log-softmax分数进行后处理(例如重复出现的字做些score上的惩罚)
stopping_criteria: 判断是否应该结束, 返回True表示应该结束(最典型的是beam达到最大长度)

input_ids: 形状为(batch_size*beam_size, 1)  # 全部为decoder_satrt_token
beam_scores: 形状为(batch_size, beam_size)  # 其中每一行的第一个元素为0, 其余元素为-inf
beam_scores.view(-1, 1)
beam_hypotheses: batch_size个候选池
is_done: 初始化为batch_size个False

while True:

  截取最后一个input_ids得到input_tensor(batch_size*beam_size, 1)
  通过前向计算得到logits(batch_size*beam_size, vocab_size)后
  进行log_softmax之后得到next_token_scores
  next_token_scores_processed = logits_processor(input_ids, next_token_scores)  # (batch_size*beam_size, vocab_size)
  next_token_scores = beam_scores + next_token_scores_processed  # (batch_size*beam_size, vocab_size)

  next_token_scores.view(batch_size, beam_size*vocab_size)
  # 对于batch中的每一个, 都留下2*beam_size个可选项(注意根据此处的规则这些可选项里至多有beam_size个eos)
  next_token_scores, next_tokens = next_token_scores.topk(2*beam_size)

  # 这个过程的逻辑如下:
  对于每个样本
    如果is_done取值为True, 则为每个beam填充pad_token, continue
    从next_token_scores最大的开始
      如果对应的预测token为eos:
        如果此score是前beam_size大的score, 则将其加入该样本对应的候选集beam_hypotheses
          加入规则如下:
            如果候选池当前不足beam_size个样本, 则直接将其加入, 并计算score计算长度惩罚后,更新池子中的最差分数
            如果当前候选池已有beam_size个样本, 则对此score计算长度惩罚后与池子里的score进行比较,如果优于最差分数,则加入并剔除池子里最差的那个序列, 之后更新池子的最差分数
            备注: 池子中的score均为长度惩罚后的score
        如果此score不是前beam_size大的score, 则直接忽略这个样本
      如果对应的预测token不是eos:
        将其加入到beam_scores, beam_next_tokens中直到达到beam_size个
    判断beam_hypotheses是否完成,由此更新is_done
      判断规则如下:
        (1) 如果beam_hypotheses的模式为early_stop, 那么只要池子里有num_beam个样本, 就认为搜索结束
        (2) 非early_stop模式, 则根据beam_scores中的最大者是否在计算长度惩罚后比候选池中的最差分数大, 如果更大则继续搜索(is_done=False), 如果更小则认为搜索结束(is_done=True)

  beam_scores, beam_next_tokens = beam_scorer.process(input_ids, next_token_scores, next_tokens) # 逻辑见前面一大段的说明
  input_ids = cat(input_ids, beam_next_tokens)

  如果is_done均为True或者stopping_criteria(input_ids, scores)为True, 则跳出while True

最后做收尾:
  对于还没结束的beam, 尝试添加至beam_hypotheses中
  将输出序列使用pad_token补齐

group_beam_search与beam_search的区别在于, 将当前的beam分为若干组, 每组group_size个序列, 每次对这个序列做beam_search, 并留下group_size个序列, 这样总共仍留有beam_size个序列

  • 当 group_size 与 beam_size 相等时, 退化为beam_search

beam_sample/sample

beam_sample与beam_search的区别在于将这几行

next_token_scores_processed = logits_processor(input_ids, next_token_scores)  # (batch_size*beam_size, vocab_size)
next_token_scores = beam_scores + next_token_scores_processed  # (batch_size*beam_size, vocab_size)
next_token_scores.view(batch_size, beam_size*vocab_size)
next_token_scores, next_tokens = next_token_scores.topk(2*beam_size)

替换为

logit_warpper: 通常进行top-k/top-p修改分数/不修改分数, 影响后续的抽样结果

next_token_scores_processed = logits_processor(input_ids, next_token_scores)  # (batch_size*beam_size, vocab_size)
next_token_scores = beam_scores + next_token_scores_processed  # (batch_size*beam_size, vocab_size)
next_token_scores.view(batch_size, beam_size*vocab_size)

next_token_scores = logits_warper(input_ids, next_token_scores)
probs = nn.functional.softmax(next_token_scores, dim=-1)
next_tokens = torch.multinomial(probs, num_samples=2 * num_beams)
next_token_scores根据next_tokens的选择得到

constrained_beam_search【TODO】

<<<第一项改动>>>
对于每个样本,进行正常的beam_size得到:

beam_scores: beam_size个分数
input_ids: (beam_size, cur_len)
next_tokens: (beam_size)

对每个input_id, 根据约束计算下一步可能的token,假设增加了 H 个新的可选项,得到
full_hypo: (beam_size+H, cur_len+1) 所有的假设
full_score: (beam_size+H,) 所有假设的分数
bank: (beam_size+H,) 整数值, 对beam_size+H中每个假设计算一个满足约束的分数: 所有约束条件的最大长度*已完成的约束+进行中的约束已完成的长度

zipped = bank * 100 + full_score  # 100是hf中写死的超参数

按照zipped进行从大到小排序得到indice, 并按照此顺序重排bank,得到sorted_bank,例如:

indice=[5, 4, 1, 2, 0, 3]  # 即zipped中下标为5的元素最大,相应的bank值为
sorted_bank=[2, 2, 1, 0, 2, 3]
由sort_bank计算dup_num = [0, 1, 0, 0, 0, 0],dup_num为重复前一个bank值得次数

假设beam_size为3, 则最终得indice为[5, 1, 2]  # 按dup_num从小到大稳定排序, 因此会跳过indice[1]
<<<第二项改动>>>
添加至hyp时需要检查是否满足条件,满足则添加,不满足则不添加

constrained_beam_search相关

class Constraint(ABC):
    def __init__(self):
        self.test()  # 测试子类的定义是否合法

    def test(self):
        counter = 0
        completed = False
        while not completed:
            if counter == 1:  # 如果需要至少2步才能完成,则可以测试reset是否能正常工作
                # self.reset函数语义(改变状态): 重新初始化内部的状态
                self.reset()
            # self.advance函数语义(不改变状态): 给出一个能满足下一步约束条件的token_id(如果有多个token_id能满足时则返回token_id列表)
            advance = self.advance()  # 得到一个能完成下一个步骤的token_id
            # self.does_advance语义(不改变状态): 判断输入的值是否满足下一步约束的条件
            if not self.does_advance(advance):  # 验证一定能走到下一步
                raise Exception(
                    "Custom Constraint is not defined correctly. self.does_advance(self.advance()) must be true."
                )
            # self.update语义: stepped表示当前输入是否能满足下一步的约束条件, completed为是否完全结束, reset表示是否需要重置
            # 并且根据各种情况改变状态, 例如修改内部的状态或者调用self.reset
            stepped, completed, reset = self.update(advance)
            counter += 1

            if counter > 10000:
                raise Exception("update() does not fulfill the constraint.")
        # self.remaining语义: 剩余步数
        if self.remaining() != 0:  # 在completed的时候, 剩余步数应该为0
            raise Exception("Custom Constraint is not defined correctly.")
    
    # 复制(复制当前状态/不复制当前状态)
    @abstractmethod
    def copy(self, stateful=False):
        pass

🤗 Transformers 4.26.1 版本中目前只实现了两类 Constraint

  • PhrasalConstraint: 生成的结果里必须出现指定的 token_id 序列
    constraint = PhrasalConstraint([1, 2, 5])  # 限制输出序列必须包含连续的token序列[1, 2, 5]
    
  • DisjunctiveConstraint: 生成的结果里必须出现指定的 token_id 序列(满足其中之一即可)
    constraint = PhrasalConstraint([[1, 2, 5], [3, 4]])  # 限制输出序列必须包含连续的token序列[1, 2, 5]或[3, 4]
    
  • ConstraintListState
    constraints = [PhrasalConstraint([1, 2, 5]), PhrasalConstraint([1, 3])]
    ConstraintListState(constraints)  # 限制输出序列必须满足多个限制条件
    

DisjunctiveConstraint 的实现依赖于一个辅助类, 本质上是将一个列表的列表转换为了一个字典(前缀树)

class DisjunctiveTrie:
    def __init__(self, nested_token_ids: List[List[int]], no_subsets=True):
        ...

t = DisjunctiveTrie([[1, 2], [1, 3, 4], [1, 4, 5]])
t.trie
# {
#   1: {
#     2: {},
#     3: {4: {}},
#     4: {5: {}}
#   }
# }
class ConstraintListState:
    def __init__(self, constraints: List[Constraint]):
        self.constraints = constraints

        # max # of steps required to fulfill a given constraint
        self.max_seqlen = max([c.seqlen for c in constraints])
        self.n_constraints = len(constraints)
        self.completed = False

        self.init_state()

    def init_state(self):
        # complete_constraints + inprogress_constraint + pending_constraints = 全部的constraint
        self.complete_constraints = []
        self.inprogress_constraint = None  # 只能为None或者其中一个constraint
        self.pending_constraints = [constraint.copy(stateful=False) for constraint in self.constraints]
    def advance(self) -> List[int]:
        ...
    def reset(self, token_ids: Optional[List[int]]):
        # 在传入token_ids参数时, 会调用self.add函数
        ...
    def add(self, token_id: int):
        # 如果inprogress_constraint为空, 则在pending_constraints找有没有能update的约束, 如果有, 就将它update并作为inprogress_constraint
        # 如果inprogress_constraint不为空, 则update inprogress_constraint直至这个约束达到完成状态
        return complete: bool, stepped: bool

contrastive_search【TODO: 待补充】

Huggingface 官方博客(2022/10 引入):https://huggingface.co/blog/introducing-csearch

算法伪代码如下:

首先初始化decoder的input_ids: (0,)*B
将其输入至decoder计算出 next_logits: (B, vocab_size), cur_hidden: (B, 1, d_model)
L = 1
while True:
  # input_ids: 已确定序列, next_logits: 已确定序列的logits, cur_hidden: 已确定序列的decoder的输出
  对next_logits进行后处理
  取出next_logits中前top_k个可能的token: (B, top_k), 拼接input_ids序列 possible_input_ids: (B*top_k, L)
  将possible序列作为decoder的输入, 得到 hidden: (B*top_k, d_model), new_next_logits: (B*top_k, vocab_size)
  根据contrastive search的评分规则根据 next_logits, cur_hidden, hidden 挑选出每个样本最优的下一个序列 input_ids: (B, L+1)
  根据input_ids挑选得到cur_hidden: (B, L+1, d_model), 挑选得到 new_next_logits: (B, vocab_size)
  next_logits = new_next_logits
  L += 1
  判断是否达到最大长度或其他中止条件是否成立:break
返回input_ids

Streamer

通常我们必须等 generate 方法生成 eos 才能一次性拿到返回结果, 但对于大模型来说, 生成 token 的速度可能比较慢, 因此需要流式输出, 🤗 Transformers 在 4.28.0 版本左右增加了一个 API

# 官方文档示例: https://huggingface.co/docs/transformers/internal/generation_utils#transformers.TextStreamer
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer, TextIteratorStreamer

tok = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
streamer = TextStreamer(tok)

# Despite returning the usual output, the streamer will also print the generated text to stdout.
_ = model.generate(**inputs, streamer=streamer, max_new_tokens=20)


# 官方文档示例: https://huggingface.co/docs/transformers/internal/generation_utils#transformers.TextIteratorStreamer
# 主要用于 Gradio
from threading import Thread
tok = AutoTokenizer.from_pretrained("gpt2")
streamer = TextIteratorStreamer(tok)
# Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
thread = Thread(target=model.generate, kwargs=generation_kwargs)  # 新开一个线程用于产生token, token被put到streamer内部的queue.Queue中
thread.start()
generated_text = ""
for new_text in streamer:  # 主线程从streamer内部的queue.Queue取数据
    generated_text += new_text
generated_text

内部逻辑其实比较简单, Streamer 相关的 API 仅有如下 3 个

注意: 目前只能用于 batch_size 为 1 的情形, 并且不能用于 beam_size > 1

# transformers/generation/streamers.py
class BaseStreamer:
  def put(self, value):
    # value 是一个一维tensor, 或者第0维为1的二维tensor, 代表token_id序列
    raise NotImplementError()
  def end(self):
    raise NotImplementError()
class TextStreamer(BaseStreamer):
  ...
class TextIteratorStreamer(TextStreamer):
  ...

在 🤗 Transformers 的 generate 中, 使用 Streamer 的逻辑是: 首先调用 put 将 prompt 推入 Streamer 中, 每次生成了一个新的 token 之后, 将新生成的 token 推入 Streamer 中(如果是TextStreamer, 则打印最新的完整word), 生成结束后, 调用 end 方法

以下是一个相对独立地使用 TextStreamer 的例子, 供参考

from transformers import AutoTokenizer, TextStreamer
tokenizer = AutoTokenizer.from_pretrained("gpt2")
streamer = TextIteratorStreamer(tokenizer)
text = "A B C D E F"
tokens = tokenizer(text, return_tensors="pt")["input_ids"][0]

import time
import random
cur_idx = 0
n = len(tokens)
while True:
  num = random.randint(1, 3)
  end = cur_idx + num
  streamer.put(token[cur_idx:end])
  cur_idx = num
  time.sleep(0.2)
  if end >= n:
    streamer.end()
    break

AutoClass: AutoModel, AutoTokenizer, AutoConfig

本节内容的出发点在于解释这类代码在执行时究竟发生了什么

from transformers import AutoModelForCausalLM
# 这里是 transformer 4.32.0 版本才有的 AutoGPTQ 集成的模型
model = AutoModelForCausalLM.from_pretrained("TheBloke/Llama-2-7b-Chat-GPTQ", torch_dtype=torch.float16, device_map="auto")

这涉及到 Auto 类的实现, 相应的代码位于 src/transformers/models/auto 文件夹下:

src/transformers/models/
  - auto/
    - __init__.py
    - auto_factory.py
    - configuration_auto.py
    - modeling_auto.py
    - tokenization_auto.py
    # 以下本节不涉及
    - feature_extraction_auto.py
    - image_processing_auto.py
    - processing_auto.py
    - modeling_flax_auto.py
    - modeling_tf_auto.py
  - deprecated/    # 一些弃用的模型代码
    - open_llama/  # 目录结构与其他模型例如 bert 一致
  - bert/
  - bart/

Auto 类如下:

  • AutoTokenizer: 直接继承自 object
  • AutoConfig: 直接继承自 object
  • _BaseAutoModelClass: 直接继承自 object
    • AutoModelForMaskedLM: 父类为 _BaseAutoModelClass
    • AutoModelForCausalLM: 父类为 _BaseAutoModelClass

为此我们先看看 transformers/models/auto 相关代码里硬编码写的一些字典映射, 这种字典的键和值有如下类型:

  • model-type: 可以用中划线或下划线连接, 例如 "deberta-v2"
  • model-display-name: 可以用空格连接, 例如 "Audio Spectrogram Transformer", 对应于文档页面上的模型名称
  • module-name: 这种在字典映射中不直接出现, 大多数情况下使用下划线代替 model-type 中的中划线即为 module-name, 例如 "deberta_v2", 对应于目录结构 src/transformers/models/deberta_v2
  • model-class-name: 例如 "LlamaForCausalLM", 对应于类名 src/transformers/models/llama/modeling_llama.py:LlamaForCausalLM
  • config-class-name: 例如 "LlamaConfig", 对应于类名 src/transformers/models/llama/configuration_llama.py:LlamaConfig
  • slow-tokenizer-class-name: 例如 "LlamaTokenizer", 对应于类名 src/transformers/models/llama/tokenization_llama.py:LlamaTokenizer
  • fast-tokenizer-class-name: 例如 "LlamaTokenizerFast", 对应于类名 src/transformers/models/llama/tokenization_llama_fast.py:LlamaTokenizerFast

相关的字典映射具体如下:

AutoTokenizer 相关

# transformers/models/auto/tokenization_auto.py

# model-type -> (slow-tokenizer-class-name, fast-tokenizer-class-name)
TOKENIZER_MAPPING_NAMES = OrderedDict([
  ("albert", (
    "AlbertTokenizer" if is_sentencepiece_available() else None,
    "AlbertTokenizerFast" if is_tokenizers_available() else None,
    )
  ),
  ("bert", ("BertTokenizer", "BertTokenizerFast" if is_tokenizers_available() else None)),
])

TOKENIZER_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, TOKENIZER_MAPPING_NAMES)

# config-class-name -> model-type
# 这个字典在整个代码库没有被使用到, 因此可以忽略, 替代物应该是
# configuration_auto.py 中的 config_class_to_model_type 方法
CONFIG_TO_TYPE = {v: k for k, v in CONFIG_MAPPING_NAMES.items()}

AutoConfig 相关

# transformers/models/auto/configuration_auto.py

# model-type -> config-class-name
CONFIG_MAPPING_NAMES = OrderedDict([
  ("gpt2", "GPT2Config"),  # GPT2Config 代表 models/gpt2/configuration_gpt2.py:GPT2Config
  ("llama", "LlamaConfig"),
])

# 大体上就是只有在取 CONFIG_MAPPING["gpt2"] 时, 才会进行包导入, 得到 GPT2Config 类
CONFIG_MAPPING = _LazyConfigMapping(CONFIG_MAPPING_NAMES)

# model-type -> archive-map
CONFIG_ARCHIVE_MAP_MAPPING_NAMES = OrderedDict([
  ("bart", "BART_PRETRAINED_CONFIG_ARCHIVE_MAP"),
  ("bert", "BERT_PRETRAINED_CONFIG_ARCHIVE_MAP"),
])

# 大体上 ALL_PRETRAINED_CONFIG_ARCHIVE_MAP 其实就是一个完整的字典:
# {
#  "facebook/bart-large": "https://huggingface.co/.../config.json",
#   "bert-base-uncased": "https://huggingface.co/.../config.json",
# }
ALL_PRETRAINED_CONFIG_ARCHIVE_MAP = _LazyLoadAllMappings(CONFIG_ARCHIVE_MAP_MAPPING_NAMES)
# 备注: BART_PRETRAINED_CONFIG_ARCHIVE_MAP 在 models/bart/configuration_bart.py 中被定义
# BART_PRETRAINED_CONFIG_ARCHIVE_MAP = {
#   "facebook/bart-large": "https://huggingface.co/facebook/bart-large/resolve/main/config.json",
# }

# model-type -> model-display-name
MODEL_NAMES_MAPPING = OrderedDict([
    ("albert", "ALBERT"),
    ("audio-spectrogram-transformer", "Audio Spectrogram Transformer"),
])

# model-type
# 这些模型的代码位于 models/deprecated 目录下, 例如: models/deprecated/bort
DEPRECATED_MODELS = ["bort", "mctct", "mmbt",]

# model-type -> module-class-name
SPECIAL_MODEL_TYPE_TO_MODULE_NAME = OrderedDict([
    ("openai-gpt", "openai"),
    ("data2vec-audio", "data2vec"),
])

AutoModel 相关

# transformers/models/auto/modeling_auto.py
# model-type -> model-class-name
MODEL_MAPPING_NAMES = OrderedDict([
  ("albert", "AlbertModel"),
  ("align", "AlignModel"),
])
MODEL_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, MODEL_MAPPING_NAMES)

# model-type -> model-class-name
MODEL_FOR_CAUSAL_LM_MAPPING_NAMES = OrderedDict([
  ("gpt2", "GPT2LMHeadModel"),
  ("llama", "LlamaForCausalLM")
])
MODEL_FOR_CAUSAL_LM_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, MODEL_FOR_CAUSAL_LM_MAPPING_NAMES)

Auto 类及相关的辅助方法(即上述三个文件里的其余内容)如下:

AutoTokenizer 相关

# transformers/models/auto/tokenization_auto.py

# "AlbertTokenizer" -> AlbertTokenizer
def tokenizer_class_from_name(class_name: str): pass

# "facebook/bart-large" -> 读取相应的 tokenizer_config.json 文件 (此文件不包含词表等信息, 只包含padding 方向等信息)
def get_tokenizer_config(pretrained_model_name_or_path, ...): pass

class AutoTokenizer:
  def from_pretrained(cls, pretrained_model_name_or_path, ...): pass
  
  def register(
    config_class,
    slow_tokenizer_class=None,
    fast_tokenizer_class=None,
    exist_ok=False): pass
  # 主要是调用: TOKENIZER_MAPPING.register 方法

AutoConfig 相关

# transformers/models/auto/configuration_auto.py
# "deberta-v2" -> transformers.models.deberta_v2
def model_type_to_module_name(key): pass

# "BertConfig" -> "bert"
def config_class_to_model_type(config): pass

# 上面的 CONFIG_MAPPING 即为这个类的一个实例
class _LazyConfigMapping(OrderedDict):
  def __init__(self, mapping):
    self._mapping = mapping
    self._extra_content = {}
    self._modules = {}
  # 重写了一些字典的方法, 从略
  # 增加了 register 方法
  def register(self, key, value, exist_ok=False):
    # 主要就是为 self._extra_content 添加内容
    self._extra_content[key] = value

# 上面的 ALL_PRETRAINED_CONFIG_ARCHIVE_MAP 即为这个类的一个实例
class _LazyLoadAllMappings(OrderedDict): pass

def _get_class_name(...): pass
def _list_model_options(...): pass
def replace_list_option_in_docstrings(...): pass

class AutoConfig:
  def for_model(...): pass
  def from_pretrained(...): pass
  def register(model_type, config, exist_ok=False):
    # 基本上就是调用这个
    CONFIG_MAPPING.register(model_type, config, exist_ok=exist_ok)

AutoModel 相关

# transformers/models/auto/modeling_auto.py
class AutoModelForCausalLM(_BaseAutoModelClass):
    _model_mapping = MODEL_FOR_CAUSAL_LM_MAPPING

AutoModelForCausalLM = auto_class_update(AutoModelForCausalLM, head_doc="causal language modeling")

在深入之前, 这里先简单提及一下上面所有 Auto 类的特点:

  • 它们均不能实例化, 一般使用 from_pretrained 方法返回的是一个具体的类的实例, 例如:
    • AutoConfig.from_pretrained(...) 可能返回的是 LlamaConfig 的实例
    • AutoTokenizer.from_pretrained(...) 可能返回的是 LlamaTokenizer 的实例
    • AutoModelForCausalLM.from_pretrained(...) 可能返回的是 LlamaForCausalLM 的实例
  • 每个 Auto 类都绑定了一个“字典”, 例如:
    • AutoConfig: 绑定了 CONFIG_MAPPING
    • AutoTokenizer: 绑定了 TOKENIZER_MAPPING
    • AutoModel: 绑定了 MODEL_MAPPING
    • AutoModelForCausalLM: 绑定了 MODEL_FOR_CAUSAL_LM_MAPPING

下面回到最开始的出发点, 解释各个 Auto 类 from_pretrained 时究竟发生了什么

AutoModel

# transformers/models/auto/modeling_auto.py
MODEL_FOR_CAUSAL_LM_MAPPING = _LazyAutoMapping(CONFIG_MAPPING_NAMES, MODEL_FOR_CAUSAL_LM_MAPPING_NAMES)

class AutoModelForCausalLM(_BaseAutoModelClass):
    _model_mapping = MODEL_FOR_CAUSAL_LM_MAPPING

AutoModelForCausalLM = auto_class_update(AutoModelForCausalLM, head_doc="causal language modeling")

因此接下来需要研究如下几个东西:

  • _LazyAutoMapping: 这个类的实例主要是 TOKENIZER_MAPPING, MODEL_MAPPING, MODEL_FOR_CAUSAL_LM_MAPPING
  • _BaseAutoModelClass
  • auto_class_update

_LazyAutoMapping

本质上 _LazyAutoMapping 是一个字典, 在使用上:

  • 键: config-class, 注意是类不是字符串, 例如 transformers.models.bert.BertConfig
  • 值: model-classtokenizer-class, 注意是类不是字符串

这一过程是动态的, 因为不同的模型/tokenizer可能有不同的依赖包, 动态添加可以只加载与使用的模型相关的模块, 例如在使用 bert 相关的代码时, 只会加载 transformers.models.bert 模块, 而不会加载其他模块

测试代码 1:

import transformers
from transformers import TOKENIZER_MAPPING, MODEL_MAPPING, MODEL_FOR_CAUSAL_LM_MAPPING

TOKENIZER_MAPPING[transformers.models.bert.BertConfig]
# (transformers.models.bert.tokenization_bert.BertTokenizer, transformers.models.bert.tokenization_bert_fast.BertTokenizerFast)

# 注意目前加载的模块只有一个, 其他几个 MAPPING 同理
TOKENIZER_MAPPING._modules
# {'bert': <module 'transformers.models.bert' from `xx/yy.py`}

MODEL_MAPPING[transformers.models.bert.BertConfig]
# transformers.models.bert.modeling_bert.BertModel

MODEL_FOR_CAUSAL_LM_MAPPING[transformers.models.bert.BertConfig]
# transformers.models.bert.modeling_bert.BertLMHeadModel

测试代码 2:

import transformers
from transformers import TOKENIZER_MAPPING, MODEL_MAPPING, MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING

# from transformers.models.bert import BertForSequenceClassification
# model = BertForSequenceClassification.from_pretrained("./chinese-roberta-wwm-ext")

from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained("./chinese-roberta-wwm-ext")

# 上面两种情况下, 输出分别是
print(MODEL_MAPPING._modules)    # {}, {}
print(TOKENIZER_MAPPING._modules)  # {}, {}
print(MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING._modules)  # {}, {"albert": transformers.models.albert}

# sys.modules 的内容待探究

以下是 _LazyAutoMapping 类的关键源码

# config-class -> model-class/tokenizer-class
class _LazyAutoMapping(OrderedDict):
    def __init__(self, config_mapping, model_mapping):
        self._config_mapping = config_mapping  # model-type -> config-class-name
        self._reverse_config_mapping = {v: k for k, v in config_mapping.items()} # config-class-name -> model-type
        self._model_mapping = model_mapping  # model-type -> model-class-name / (slow-tokenizer-class-name, fast-tokenizer-class-name)
        self._model_mapping._model_mapping = self
        # 调用 register 方法时添加
        self._extra_content = {}  # config-class -> model-class/tokenizer-class
        # 调用 __getitem__ 方法时动态加载相应的 module
        self._modules = {}  # module-name -> module

    def __len__(self):
        common_keys = set(self._config_mapping.keys()).intersection(self._model_mapping.keys())
        return len(common_keys) + len(self._extra_content)

    def __getitem__(self, key):
        if key in self._extra_content:
            return self._extra_content[key]
        model_type = self._reverse_config_mapping[key.__name__]
        if model_type in self._model_mapping:
            model_name = self._model_mapping[model_type]
            return self._load_attr_from_module(model_type, model_name)

        # Maybe there was several model types associated with this config.
        model_types = [k for k, v in self._config_mapping.items() if v == key.__name__]
        for mtype in model_types:
            if mtype in self._model_mapping:
                model_name = self._model_mapping[mtype]
                return self._load_attr_from_module(mtype, model_name)
        raise KeyError(key)

    def _load_attr_from_module(self, model_type, attr):
        module_name = model_type_to_module_name(model_type)
        if module_name not in self._modules:
            self._modules[module_name] = importlib.import_module(f".{module_name}", "transformers.models")
        return getattribute_from_module(self._modules[module_name], attr)

    def keys(self): pass
    def get(self, key, default): pass
    def __bool__(self): pass
    def values(self): pass
    def items(self): pass
    def __iter__(self): pass
    def __contains__(self, item): pass

    def register(self, key, value, exist_ok=False):
        if hasattr(key, "__name__") and key.__name__ in self._reverse_config_mapping:
            model_type = self._reverse_config_mapping[key.__name__]
            if model_type in self._model_mapping.keys() and not exist_ok:
                raise ValueError(f"'{key}' is already used by a Transformers model.")

        self._extra_content[key] = value

_BaseAutoModelClass

auto_class_update

def auto_class_update(cls, checkpoint_for_example="bert-base-cased", head_doc=""):
    # Create a new class with the right name from the base class
    model_mapping = cls._model_mapping
    name = cls.__name__
    class_docstring = insert_head_doc(CLASS_DOCSTRING, head_doc=head_doc)
    cls.__doc__ = class_docstring.replace("BaseAutoModelClass", name)

    # Now we need to copy and re-register `from_config` and `from_pretrained` as class methods otherwise we can't
    # have a specific docstrings for them.
    from_config = copy_func(_BaseAutoModelClass.from_config)
    from_config_docstring = insert_head_doc(FROM_CONFIG_DOCSTRING, head_doc=head_doc)
    from_config_docstring = from_config_docstring.replace("BaseAutoModelClass", name)
    from_config_docstring = from_config_docstring.replace("checkpoint_placeholder", checkpoint_for_example)
    from_config.__doc__ = from_config_docstring
    from_config = replace_list_option_in_docstrings(model_mapping._model_mapping, use_model_types=False)(from_config)
    cls.from_config = classmethod(from_config)

    if name.startswith("TF"):
        from_pretrained_docstring = FROM_PRETRAINED_TF_DOCSTRING
    elif name.startswith("Flax"):
        from_pretrained_docstring = FROM_PRETRAINED_FLAX_DOCSTRING
    else:
        from_pretrained_docstring = FROM_PRETRAINED_TORCH_DOCSTRING
    from_pretrained = copy_func(_BaseAutoModelClass.from_pretrained)
    from_pretrained_docstring = insert_head_doc(from_pretrained_docstring, head_doc=head_doc)
    from_pretrained_docstring = from_pretrained_docstring.replace("BaseAutoModelClass", name)
    from_pretrained_docstring = from_pretrained_docstring.replace("checkpoint_placeholder", checkpoint_for_example)
    shortcut = checkpoint_for_example.split("/")[-1].split("-")[0]
    from_pretrained_docstring = from_pretrained_docstring.replace("shortcut_placeholder", shortcut)
    from_pretrained.__doc__ = from_pretrained_docstring
    from_pretrained = replace_list_option_in_docstrings(model_mapping._model_mapping)(from_pretrained)
    cls.from_pretrained = classmethod(from_pretrained)
    return cls

Trainer【TODO: 考虑移除】

Pipeline【TODO: 考虑移除】

后记【TODO】

本文原本只是想理清 T5 预训练、微调、推理的细节。因此有必要基于这条主线对全文的行文进行梳理

  • 怎么预训练 T5
    • C4 数据集
    • 怎么训练一个 tokenizer
      • 训练一个 T5Tokenizer/T5TokenizerFast
    • 怎么使用 tokenizer
    • 预训练脚本
      • 🤗 Datasets 或自定义 torch.data.utils.Dataset
      • 🤗 Transformers Trainer 或 raw pytorch 或 raw pytorch with 🤗 Accelerate 或 Lightning 或 deepspeed
  • 微调 T5
    • 微调数据集
    • 微调脚本
  • T5 推理
    • 怎么生成文本
      • 生成策略及参数的含义
      • pipeline

主要参考资料【TODO】