victory的博客

长安一片月,万户捣衣声

0%

Transformer | 注意力机制代码实现

  • 自注意力机制(Self-Attention)

    自注意力机制是Transfromer中的重要组件,它通过计算Query (Q)、Key (K)、Value (V)获取token之间的相关性。Q、K、V矩阵是通过输入嵌入(或前一层的输出)与权重矩阵进行线性变换得到的。

    自注意力机制的输入格式为(batch_size, seq_len, d_model),batch_size是批次大小,seq_len是序列长度,d_model是嵌入维度。

    Q、K、V的计算:Q=X x W_Q, K=X x W_K, V=X x W_V。

    自注意力输出的计算:output = (QxK_T) x V。

  • 自注意力机制pytorch代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    import torch
    import torch.nn.functional as F
    from torch import nn


    class SingleHeadAttention(nn.Module):
    def __init__(self, embed_size):
    super(SingleHeadAttention, self).__init__()

    # 输入的embedding维度
    self.embed_size = embed_size

    # 定义查询、键和值的线性变换
    self.query_fc = nn.Linear(embed_size, embed_size)
    self.key_fc = nn.Linear(embed_size, embed_size)
    self.value_fc = nn.Linear(embed_size, embed_size)

    # 输出的线性变换
    self.out_fc = nn.Linear(embed_size, embed_size)

    def forward(self, X, mask=None):
    print("X.shape: ", X.shape)
    # Step1: 通过线性层生成查询、键和值的向量
    Q = self.query_fc(X) # (batch_size, seq_len, embed_size)
    print("Q.shape: ", Q.shape)
    K = self.key_fc(X) # (batch_size, seq_len, embed_size)
    print("K.shape: ", K.shape)
    V = self.value_fc(X) # (batch_size, seq_len, embed_size)
    print("V.shape: ", V.shape)

    # Step2: 计算注意力得分
    attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.embed_size ** 0.5)
    print("attention_scores.shape: ", attention_scores.shape)

    # 如果有mask,应用mask
    if mask is not None:
    attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

    # Step3: 计算注意力权重(softmax)
    attention_weights = F.softmax(attention_scores, dim=-1) # (batch_size, seq_len, seq_len)

    # Step4: 加权求和得到输出
    output = torch.matmul(attention_weights, V) # (batch_size, seq_len, embed_size)

    return output


    if __name__ == "__main__":
    batch_size = 2
    seq_len = 4
    embed_size = 8

    # 随机生成输入数据
    X = torch.randn(batch_size, seq_len, embed_size)

    # 创建自注意力模型
    attention_layer = SingleHeadAttention(embed_size)

    # 前向传播
    output = attention_layer(X)

    print(f"Output: {output.shape}") # (batch_size, seq_len, embed_size)

  • 多头注意力机制(Multi-Head Attention)

    多头注意力机制是Transformer模型中的一个核心组成部分,它通过并行计算多个注意力头来捕捉输入序列的不同信息,每个注意力头都有独立的Q、K、V,能够关注输入的不同子空间,从而增强模型对不同特征的表达能力。

    多头注意力计算过程:

    1. 线性变换:输入的向量首先会通过不同的线性变换(权重矩阵)生成多个查询(Q)、键(K)和值(V)向量。
    2. 计算注意力:每个注意力头根据查询、键和值计算注意力权重,并通过加权求和得到一个输出。
    3. 拼接:所有头的输出会被拼接在一起。
    4. 线性变换:拼接后的结果通过一个线性变换,最终输出。
  • 多头注意力机制pytorch代码实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    import torch
    import torch.nn as nn
    import torch.nn.functional as F


    class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
    super(MultiHeadAttention, self).__init__()
    self.embed_size = embed_size
    self.num_heads = num_heads
    self.head_dim = embed_size // num_heads

    assert self.head_dim * num_heads == embed_size, "Embedding size must be divisible by num_heads"

    # 定义查询、键、值的线性变换
    self.query_fc = nn.Linear(embed_size, embed_size)
    self.key_fc = nn.Linear(embed_size, embed_size)
    self.value_fc = nn.Linear(embed_size, embed_size)

    # 定义输出的线性变换
    self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, X):
    batch_size = X.shape[1]

    # 通过线性变换得到 Q, K, V
    Q = self.query_fc(X) # (seq_len, batch_size, embed_size)
    print("Q.shape: ", Q.shape)
    K = self.key_fc(X)
    print("K.shape: ", K.shape)
    V = self.value_fc(X)
    print("V.shape: ", V.shape)

    # 将Q, K, V 切分成多个头
    Q = Q.view(X.shape[0], batch_size, self.num_heads, self.head_dim).transpose(1,
    2) # (seq_len, batch_size, num_heads, head_dim)
    print("Q_multi_head.shape: ", Q.shape)
    K = K.view(X.shape[0], batch_size, self.num_heads, self.head_dim).transpose(1, 2)
    print("K_multi_head.shape: ", K.shape)
    V = V.view(X.shape[0], batch_size, self.num_heads, self.head_dim).transpose(1, 2)
    print("V_multi_head.shape: ", V.shape)

    # 计算注意力得分
    energy = torch.matmul(Q, K.transpose(-2, -1)) # (seq_len, batch_size, num_heads, seq_len)
    attention = torch.softmax(energy / (self.head_dim ** 0.5), dim=-1) # 注意力得分
    print("Q*K.shape", attention.shape)

    # 计算加权求和的输出
    out = torch.matmul(attention, V) # (seq_len, batch_size, num_heads, head_dim)

    # 将多个头合并
    out = out.transpose(1, 2).contiguous().view(X.shape[0], batch_size, self.num_heads * self.head_dim)

    # 通过输出的线性层
    out = self.fc_out(out)

    return out


    # 测试
    embed_size = 64
    num_heads = 8
    seq_len = 10
    batch_size = 32

    multihead_attention = MultiHeadAttention(embed_size, num_heads)

    # 输入张量,shape: (seq_len, batch_size, embed_size)
    X = torch.rand(seq_len, batch_size, embed_size)

    out = multihead_attention(X)
    print(out.shape) # (seq_len, batch_size, embed_size)