Transformer详解

发布时间 2023-03-27 15:04:52作者: 剑断青丝ii

1.理论知识讲解

transfromer这个模型在机器翻译方面就是做如下事情由一种语言到另一种语言


下图中六个encoder在结构上是完全相同的但是每个encoder的内部的参数不完全相同,也就是在训练的时候6个encoder都在训练,并不是一个在训练,然后其它五个去拷贝这个encoder,六个decoder的结构也是完全相同的;每个encoder和decoder是不一样的

1.1 Encoder

1.1.1输入部分

该部分分为两个小部分

1⃣️Embedding

下图中有12个字,按字切分,然后每个字定义一个512维的字向量(可以使用word2vec或者随机初始化)

2⃣️位置嵌入

使用transformer为什么需要位置嵌入,因为在encoder中的第二个部分中多头注意力机制中是将输入的词并行输入的,并不是像rnn那样是一个词一个词的输入,但是这样并行输入有缺少了rnn那种词与词之间的先后关系,因此需要引入词位置的编码。
下图rnn是一个词一个词的输入,输入“我”,处理“我”,在输入“爱”,在接收“我”的信息之后再处理“爱”,这样保证词与词之间的先后顺序不会乱,但是一个词一个词的输入导致效率低

位置编码公式:
公式中的pos就是单词或者是字的位置,2i和2i+1分别对应,在偶数位置使用sin,在基数位置使用cos。对于爱这个字所在的位置pos,如果爱这个字对应的词向量为512维,则对于这512位的向量偶数位用cos来计算,基数位置用cos来计算
公式中的d_model它表示模型中每个输入和输出 token 的向量表示的维度大小。也就是说,模型中的每个单词都被表示为一个长度为 d_model 的向量,对于一下这个例子d_model的值为512,在我的代码中d_model的值为128。


由公式计算出位置编码之后,与原来的词向量进行相加得到最终整个transformer的输入。

1.1.2注意力机制

1⃣️基本的注意力机制
下图中,对于“婴儿在干嘛”这句话应该关注图片中的哪个区域,通过公式计算得到这句话应该更关注图片中的哪个位置


注意力机制的公式,其中Q,K,V,三个都是矩阵

2⃣️在transformer中怎么操作
x1与wq矩阵相乘的到q1矩阵,依次类推


计算attention值
Score的值等于,你当前关注的词的q,分别乘以这个句子中所有的词的k,就会得到句子中每个词对当前的词的打分结果

1.1.3残差和LayNorm


1⃣️残差


可以有效缓解梯度消失

1.1.3前馈神经网络

1.2decoder

在多头注意力机制中多了一个mask机制,表示将当前单词和之后的单词做mask


为什么使用mask的原因:

在 Transformer 模型中,为了生成下一个目标单词,解码器需要访问已经生成的单词和源序列的信息。在训练时,我们可以将目标序列的所有单词都输入到解码器中,并在每个时间步生成一个单词。但是,在生成过程中,我们需要逐步地生成目标序列中的单词,而在每个时间步中,我们只能访问当前时刻之前生成的单词。这意味着,在生成第 i 个单词时,我们不能使用第 i+1 个单词的信息。因此为了解决这一问题才引入了mask机制

k v矩阵是由encoder得到,而q矩阵是decoder中得到,encoder中得到的每个k v矩阵将会和decoder中的q矩阵进行交互

2.代码部分讲解

import torch
import numpy as np
from config import ngpu, device


# 计算角度:pos * 1/(10000^(2i/d))
def get_angles(pos, i, d_model):
    # 2*(i//2)保证了2i,这部分计算的是1/10000^(2i/d)
    angle_rates = 1 / np.power(10000, 2 * (i // 2) / np.float32(d_model))  # => [1, 512]
    return pos * angle_rates  # [50,1]*[1,512]=>[50, 512]

# np.arange()函数返回一个有终点和起点的固定步长的排列,如[1,2,3,4,5],起点是1,终点是5,步长为1
# 注意:起点终点是左开右闭区间,即start=1,end=6,才会产生[1,2,3,4,5]
# 只有一个参数时,参数值为终点,起点取默认值0,步长取默认值1。
def positional_encoding(position, d_model):  #d_model是位置编码的长度,相当于position encoding的embedding_dim?
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],  # [50, 1]
                            np.arange(d_model)[np.newaxis, :],  # [1, d_model=512]
                            d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])  #从0开始步长为2,2i
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])  #从1开始步长为2,2i+1

    pos_encoding = angle_rads[np.newaxis, ...]  #[50, 512] => [1,50,512]
    return torch.tensor(pos_encoding, dtype=torch.float32)

pos_encoding = positional_encoding(50, 512)

pad = 1  #重要!
def create_padding_mask(seq):  # seq [b, seq_len]
    # seq = torch.eq(seq, torch.tensor(0)).float() # pad=0的情况
    seq = torch.eq(seq, torch.tensor(pad)).float()  # pad!=0
    return seq[:, np.newaxis, np.newaxis, :]  # =>[b, 1, 1, seq_len]

def create_look_ahead_mask(size):  # seq_len
    mask = torch.triu(torch.ones((size, size)), diagonal=1)
    # mask = mask.device() #
    return mask  # [seq_len, seq_len]

def scaled_dot_product_attention(q, k, v, mask=None):
    """
    #计算注意力权重。
    q, k, v 必须具有匹配的前置维度。 且dq=dk
    k, v 必须有匹配的倒数第二个维度,例如:seq_len_k = seq_len_v。
    #虽然 mask 根据其类型(填充或前瞻)有不同的形状,
    #但是 mask 必须能进行广播转换以便求和。

    #参数:
        q: 请求的形状 == (..., seq_len_q, depth)
        k: 主键的形状 == (..., seq_len_k, depth)
        v: 数值的形状 == (..., seq_len_v, depth_v)  seq_len_k = seq_len_v
        mask: Float 张量,其形状能转换成
              (..., seq_len_q, seq_len_k)。默认为None。

    #返回值:
        #输出,注意力权重
    """
    # matmul(a,b)矩阵乘:a b的最后2个维度要能做乘法,即a的最后一个维度值==b的倒数第2个纬度值,
    # 除此之外,其他维度值必须相等或为1(为1时会广播)
    matmul_qk = torch.matmul(q, k.transpose(-1, -2))  # 矩阵乘 =>[..., seq_len_q, seq_len_k]

    # 缩放matmul_qk
    dk = torch.tensor(k.shape[-1], dtype=torch.float32)  # k的深度dk,或叫做depth_k
    scaled_attention_logits = matmul_qk / torch.sqrt(dk)  # [..., seq_len_q, seq_len_k]

    # 将 mask 加入到缩放的张量上(重要!)
    if mask is not None:  # mask: [b, 1, 1, seq_len]
        # mask=1的位置是pad,乘以-1e9(-1*10^9)成为负无穷,经过softmax后会趋于0
        scaled_attention_logits += (mask * -1e9)

    # softmax 在最后一个轴(seq_len_k)上归一化
    attention_weights = torch.nn.functional.softmax(scaled_attention_logits, dim=-1)  # [..., seq_len_q, seq_len_k]

    output = torch.matmul(attention_weights, v)  # =>[..., seq_len_q, depth_v]
    return output, attention_weights  # [..., seq_len_q, depth_v], [..., seq_len_q, seq_len_k]

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0  #因为输入要被(平均?)split到不同的head

        self.depth = d_model//self.num_heads  #512/8=64,所以在scaled dot-product atten中dq=dk=64,dv也是64
        self.wq = torch.nn.Linear(d_model, d_model)
        self.wk = torch.nn.Linear(d_model, d_model)
        self.wv = torch.nn.Linear(d_model, d_model)
        self.final_linear = torch.nn.Linear(d_model, d_model)

    def split_heads(self, x, batch_size):  # x [b, seq_len, d_model]
        x = x.view(batch_size, -1, self.num_heads,
                   self.depth)  # [b, seq_len, d_model=512]=>[b, seq_len, num_head=8, depth=64]
        return x.transpose(1, 2)  # [b, seq_len, num_head=8, depth=64]=>[b, num_head=8, seq_len, depth=64]

    def forward(self, q, k, v, mask):  # q=k=v=x [b, seq_len, embedding_dim] embedding_dim其实也=d_model
        batch_size = q.shape[0]

        q = self.wq(q)  # => [b, seq_len, d_model]
        k = self.wk(k)  # => [b, seq_len, d_model]
        v = self.wv(v)  # => [b, seq_len, d_model]

        q = self.split_heads(q, batch_size)  # => [b, num_head=8, seq_len, depth=64]
        k = self.split_heads(k, batch_size)  # => [b, num_head=8, seq_len, depth=64]
        v = self.split_heads(v, batch_size)  # => [b, num_head=8, seq_len, depth=64]

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        # => [b, num_head=8, seq_len_q, depth=64], [b, num_head=8, seq_len_q, seq_len_k]

        scaled_attention = scaled_attention.transpose(1, 2)  # =>[b, seq_len_q, num_head=8, depth=64]
        # 转置操作让张量存储结构扭曲,直接使用view方法会失败,可以使用reshape方法
        concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model)  # =>[b, seq_len_q, d_model=512]

        output = self.final_linear(concat_attention)  # =>[b, seq_len_q, d_model=512]
        return output, attention_weights  # [b, seq_len_q, d_model=512], [b, num_head=8, seq_len_q, seq_len_k]

# 点式前馈网络
def point_wise_feed_forward_network(d_model, dff):
    feed_forward_net = torch.nn.Sequential(
        torch.nn.Linear(d_model, dff),  # [b, seq_len, d_model]=>[b, seq_len, dff=2048]
        torch.nn.ReLU(),
        torch.nn.Linear(dff, d_model),  # [b, seq_len, dff=2048]=>[b, seq_len, d_model=512]
    )
    return feed_forward_net

class EncoderLayer(torch.nn.Module):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)  # 多头注意力(padding mask)(self-attention)
        self.ffn = point_wise_feed_forward_network(d_model, dff)#前馈神经网络层

        self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
        self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)

        self.dropout1 = torch.nn.Dropout(rate)
        self.dropout2 = torch.nn.Dropout(rate)

    # x [b, inp_seq_len, embedding_dim] embedding_dim其实也=d_model
    # mask [b,1,1,inp_seq_len]
    def forward(self, x, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # =>[b, seq_len, d_model] self-attention
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)  # 残差&层归一化 =>[b, seq_len, d_model]  残差网络思路

        ffn_output = self.ffn(out1)  # =>[b, seq_len, d_model]
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)  # 残差&层归一化 =>[b, seq_len, d_model]

        return out2  # [b, seq_len, d_model]

class DecoderLayer(torch.nn.Module):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model,
                                       num_heads)  # masked的多头注意力(look ahead mask 和 padding mask)(self-attention)
        self.mha2 = MultiHeadAttention(d_model, num_heads)  # 多头注意力(padding mask)(encoder-decoder attention)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
        self.layernorm2 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)
        self.layernorm3 = torch.nn.LayerNorm(normalized_shape=d_model, eps=1e-6)

        self.dropout1 = torch.nn.Dropout(rate)
        self.dropout2 = torch.nn.Dropout(rate)
        self.dropout3 = torch.nn.Dropout(rate)

    # x [b, targ_seq_len, embedding_dim] embedding_dim其实也=d_model=512
    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask
    # enc_output [b, inp_seq_len, d_model]
    # padding_mask [b, 1, 1, inp_seq_len]
    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len]  self-attention,训练阶段用于编码结果
        attn1 = self.dropout1(attn1)
        out1 = self.layernorm1(x + attn1)  # 残差&层归一化 [b, targ_seq_len, d_model]

        # Q: receives the output from decoder's first attention block,即 masked multi-head attention sublayer
        # K V: V (value) and K (key) receive the encoder output as inputs
        attn2, attn_weights_block2 = self.mha2(out1, enc_output, enc_output,
                                               padding_mask)  # =>[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, inp_seq_len]
        attn2 = self.dropout2(attn2)
        out2 = self.layernorm2(out1 + attn2)  # 残差&层归一化 [b, targ_seq_len, d_model]

        ffn_output = self.ffn(out2)  # =>[b, targ_seq_len, d_model]
        ffn_output = self.dropout3(ffn_output)
        out3 = self.layernorm3(out2 + ffn_output)  # 残差&层归一化 =>[b, targ_seq_len, d_model]

        return out3, attn_weights_block1, attn_weights_block2
        #[b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]

class Encoder(torch.nn.Module):
    def __init__(self,
                 num_layers,  # N个encoder layer
                 d_model,  # 当前网络的维度
                 num_heads,
                 dff,  # 点式前馈网络内层fn的维度
                 input_vocab_size,  # 输入词表大小(源语言(法语))
                 maximun_position_encoding,
                 rate=0.1):
        super(Encoder, self).__init__()

        self.num_layers = num_layers
        self.d_model = d_model

        self.embedding = torch.nn.Embedding(num_embeddings=input_vocab_size, embedding_dim=d_model)#可以对应看transformer的图,这个是编码器的词向量层
        self.pos_encoding = positional_encoding(maximun_position_encoding,
                                                d_model).to(device)  #在这个相当于关系图中编码的词嵌入层 =>[1, max_pos_encoding, d_model=512] # 相当于作者手动实现了pos_emb

        # self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行
        #对num_layers个encoder进行堆叠
        self.enc_layers = torch.nn.ModuleList([EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)])

        self.dropout = torch.nn.Dropout(rate)

    # x [b, inp_seq_len]
    # mask [b, 1, 1, inp_sel_len]
    def forward(self, x, mask):
        inp_seq_len = x.shape[-1]
        # adding embedding and position encoding
        x = self.embedding(x)  # [b, inp_seq_len]=>[b, inp_seq_len, d_model]
        # 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \sqrt{d_model}.
        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
        pos_encoding = self.pos_encoding[:, :inp_seq_len, :]
        #pos_encoding = pos_encoding.cuda()  #调用了显卡资源
        x += pos_encoding  #将得到的位置编码于原来数据的输入进行相加得到最终的输入 [b, inp_seq_len, d_model]
        x = self.dropout(x)
        for i in range(self.num_layers):#将每一层decoder的输出当作下一层decoder的输入
            x = self.enc_layers[i](x, mask)  # [b, inp_seq_len, d_model]=>[b, inp_seq_len, d_model]
        return x  # [b, inp_seq_len, d_model]

class Decoder(torch.nn.Module):
    def __init__(self,
                 num_layers,  # N个encoder layer
                 d_model,
                 num_heads,
                 dff,  # 点式前馈网络内层fn的维度
                 target_vocab_size,  # target词表大小(目标语言(英语))
                 maximun_position_encoding,
                 rate=0.1):
        super(Decoder, self).__init__()

        self.num_layers = num_layers
        self.d_model = d_model

        self.embedding = torch.nn.Embedding(num_embeddings=target_vocab_size, embedding_dim=d_model)
        self.pos_encoding = positional_encoding(maximun_position_encoding,
                                                d_model).to(device)  # =>[1, max_pos_encoding, d_model=512]

        # self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate).cuda() for _ in range(num_layers)] # 不行
        self.dec_layers = torch.nn.ModuleList([DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)])  # decoder比encoder多了一组attention层

        self.dropout = torch.nn.Dropout(rate)

    # x [b, targ_seq_len]
    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len] 这里传入的look_ahead_mask应该是已经结合了look_ahead_mask和padding mask的mask
    # enc_output [b, inp_seq_len, d_model]
    # padding_mask [b, 1, 1, inp_seq_len]
    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        targ_seq_len = x.shape[-1]

        attention_weights = {}

        # adding embedding and position encoding
        x = self.embedding(x)  # [b, targ_seq_len]=>[b, targ_seq_len, d_model]
        # 缩放 embedding 原始论文的3.4节有提到: In the embedding layers, we multiply those weights by \sqrt{d_model}.
        x *= torch.sqrt(torch.tensor(self.d_model, dtype=torch.float32))
        # x += self.pos_encoding[:, :targ_seq_len, :]  # [b, targ_seq_len, d_model]
        pos_encoding = self.pos_encoding[:, :targ_seq_len, :]  # [b, targ_seq_len, d_model]
        #pos_encoding = pos_encoding.cuda() #调用显卡资源
        x += pos_encoding  # [b, inp_seq_len, d_model]

        x = self.dropout(x)

        for i in range(self.num_layers):
            x, attn_block1, attn_block2 = self.dec_layers[i](x, enc_output, look_ahead_mask, padding_mask)  # 因为是解码器,需要将编码器的输出结果enc_output传入
            # => [b, targ_seq_len, d_model], [b, num_heads, targ_seq_len, targ_seq_len], [b, num_heads, targ_seq_len, inp_seq_len]

            attention_weights[f'decoder_layer{i + 1}_block1'] = attn_block1
            attention_weights[f'decoder_layer{i + 1}_block2'] = attn_block2
        return x, attention_weights

class Transformer(torch.nn.Module):
    def __init__(self,
                 num_layers,  # N个encoder layer
                 d_model,
                 num_heads,
                 dff,  # 点式前馈网络内层fn的维度
                 input_vocab_size,  # input此表大小(源语言(法语))
                 target_vocab_size,  # target词表大小(目标语言(英语))
                 pe_input,  # input max_pos_encoding
                 pe_target,  # input max_pos_encoding
                 rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers,# Encoder中的EncoderLayer层数
                               d_model,#模型的维度大小,即每个词语的向量维度大小
                               num_heads,#多头注意力机制中head的数量。
                               dff,#前向网络中的隐层大小
                               input_vocab_size,
                               pe_input,
                               rate)
        self.decoder = Decoder(num_layers,
                               d_model,
                               num_heads,
                               dff,
                               target_vocab_size,
                               pe_target,
                               rate)
        self.final_layer = torch.nn.Linear(d_model, target_vocab_size)

    # inp [b, inp_seq_len]
    # targ [b, targ_seq_len]
    # enc_padding_mask [b, 1, 1, inp_seq_len]
    # look_ahead_mask [b, 1, targ_seq_len, targ_seq_len]
    # dec_padding_mask [b, 1, 1, inp_seq_len] # 注意这里的维度是inp_seq_len
    def forward(self, inp, targ, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inp, enc_padding_mask)  # =>[b, inp_seq_len, d_model]
        dec_output, attention_weights = self.decoder(targ, enc_output, look_ahead_mask, dec_padding_mask)
        # => [b, targ_seq_len, d_model],
        # {'..block1': [b, num_heads, targ_seq_len, targ_seq_len],
        #  '..block2': [b, num_heads, targ_seq_len, inp_seq_len], ...}
        final_output = self.final_layer(dec_output)  # =>[b, targ_seq_len, target_vocab_size]
        return final_output, attention_weights