pytorch(10.2.2) 注意力汇聚理论 代码测试

发布时间 2023-10-17 16:04:41作者: MKT-porter

 

https://zh.d2l.ai/chapter_attention-mechanisms/nadaraya-waston.html

 

from d2l import torch as d2l
import torch
from torch import nn


#@save
def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),
                  cmap='Reds'):
    """显示矩阵热图"""
    d2l.use_svg_display()
    num_rows, num_cols = matrices.shape[0], matrices.shape[1]
    fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize,
                                 sharex=True, sharey=True, squeeze=False)
    for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
        for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
            pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)
            if i == num_rows - 1:
                ax.set_xlabel(xlabel)
            if j == 0:
                ax.set_ylabel(ylabel)
            if titles:
                ax.set_title(titles[j])
    fig.colorbar(pcm, ax=axes, shrink=0.6)
    d2l.plt.show()


# attention_weights = torch.eye(10).reshape((1, 1, 10, 10))
# show_heatmaps(attention_weights, xlabel='Keys', ylabel='Queries')


#====================1 训练数据 ====================

n_train = 50  # 训练样本数
x_source=torch.rand(n_train) * 5  # 原始数据  #包含了从区间[0, 1)的均匀分布中抽取的一组随机数。张量的形状由参数sizes定义。 0-1 50个数 0.1-4.9
x_train, _ = torch.sort(x_source)   # 排序后的训练样本
print('x_train',x_train)

def f(x):
    return 2 * torch.sin(x) + x**0.8

y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,))  # 训练样本的输出

#====================2 测试数据 ====================
x_test = torch.arange(0, 5, 0.1)  # 测试样本 0-5 0.1  50个数据
y_truth = f(x_test)  # 测试样本的真实输出
# n_test = len(x_test)  # 测试样本数
# n_test


#3 下面的函数将绘制所有的训练样本(样本由圆圈表示), 不带噪声项的真实数据生成函数
#(标记为“Truth”), 以及学习得到的预测函数(标记为“Pred”)。
def plot_kernel_reg(y_hat):
    d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
             xlim=[0, 5], ylim=[-1, 5])
    d2l.plt.plot(x_train, y_train, 'o', alpha=0.5)
    d2l.plt.show()

#=======================4-1 平均汇聚
# 前面跳过了训练过程, 直接显示的给了计算代替网络预测,直接最后一步平均汇聚层

run=0
if run:
    y_out =y_train.mean()# y 的输出 最后一层平均汇聚层

    y_hat = torch.repeat_interleave(y_out, n_train)
    # 1*n_train列输出  每一个yi  求平均后,都是均值 ,在回复维度[]

    #print('y_hat',y_hat)
    # 传入多维张量,默认`展平`
    # >>> y = torch.tensor([[1, 2], [3, 4]])
    # >>> torch.repeat_interleave(y, 2)
    # tensor([1, 1, 2, 2, 3, 3, 4, 4])
    
    # 横坐标x_test, [真值y_truth, 平均汇聚层预测y_hat],
    plot_kernel_reg(y_hat)

#====================4-2 非参数注意力汇聚==========================

run=0
if run:
    # 根据输入的位置对输出yi进行加权:
    # X_repeat的形状:(n_test,n_train),
    # 每一行都包含着相同的测试输入(例如:同样的查询)
    print('x_test',x_test.shape,x_test)
    # torch.Size([50])
    '''
    x_test torch.Size([50]) tensor([0.0000, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000,
            0.9000, 1.0000, 1.1000, 1.2000, 1.3000, 1.4000, 1.5000, 1.6000, 1.7000,
            1.8000, 1.9000, 2.0000, 2.1000, 2.2000, 2.3000, 2.4000, 2.5000, 2.6000,
            2.7000, 2.8000, 2.9000, 3.0000, 3.1000, 3.2000, 3.3000, 3.4000, 3.5000,
            3.6000, 3.7000, 3.8000, 3.9000, 4.0000, 4.1000, 4.2000, 4.3000, 4.4000,
            4.5000, 4.6000, 4.7000, 4.8000, 4.9000])
    '''
    #print('x_test.repeat_interleave(n_train)',x_test.repeat_interleave(n_train).shape,x_test.repeat_interleave(n_train))
    # torch.Size([2500])
    # 构造查询表
    X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train))
    print('X_repeat',X_repeat.shape,X_repeat)
    # torch.Size([50, 50])
    '''
    X_repeat
    torch.Size([50, 50])
    tensor([[0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
            [0.1000, 0.1000, 0.1000,  ..., 0.1000, 0.1000, 0.1000],
            [0.2000, 0.2000, 0.2000,  ..., 0.2000, 0.2000, 0.2000],
            ...,
            [4.7000, 4.7000, 4.7000,  ..., 4.7000, 4.7000, 4.7000],
            [4.8000, 4.8000, 4.8000,  ..., 4.8000, 4.8000, 4.8000],
            [4.9000, 4.9000, 4.9000,  ..., 4.9000, 4.9000, 4.9000]])

    x_train
    torch.Size([1, 50])
    tensor([0.1249, 0.2723, 0.3242, 0.3747, 0.6435, 0.7526, 0.7749, 0.9694, 0.9709,
            1.1660, 1.3965, 1.4592, 1.5059, 1.6240, 1.6567, 1.9198, 1.9289, 1.9650,
            1.9665, 2.0000, 2.0822, 2.1460, 2.2586, 2.2702, 2.3153, 2.4764, 2.6111,
            2.6732, 2.9376, 3.1270, 3.2933, 3.3839, 3.3909, 3.4030, 3.4695, 3.6524,
            3.6915, 3.7456, 3.8196, 3.8434, 3.8556, 3.9236, 4.2003, 4.2841, 4.2882,
            4.5061, 4.5877, 4.6141, 4.7991, 4.8649])

    '''

    # x_train包含着键。attention_weights的形状:(n_test,n_train),
    # 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重
    # x_train [0,0.1]
    print((X_repeat - x_train).shape)#torch.Size([50, 50])
    # X_repeat 查询表
    # x_train  原始数据
    # 位置权重
    attention_weights = nn.functional.softmax(-(X_repeat - x_train)**2 / 2, dim=1)
    # 
    # 键 y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重
    y_hat = torch.matmul(attention_weights, y_train)
    plot_kernel_reg(y_hat)
    print('y_hat',y_hat.shape,y_hat)
    #y_hat torch.Size([50])

    show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
                    xlabel='Sorted training inputs',
                    ylabel='Sorted testing inputs')



#====================4-3 带参数注意力汇聚==========================

class NWKernelRegression(nn.Module):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.w = nn.Parameter(torch.rand((1,), requires_grad=True))

    def forward(self, queries, keys, values):
        # queries和attention_weights的形状为(查询个数,“键-值”对个数)
        queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1]))
        self.attention_weights = nn.functional.softmax(
            -((queries - keys) * self.w)**2 / 2, dim=1)
        # values的形状为(查询个数,“键-值”对个数)
        return torch.bmm(self.attention_weights.unsqueeze(1),
                         values.unsqueeze(-1)).reshape(-1)
    
# 1 数据初始化
# X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入
X_tile = x_train.repeat((n_train, 1))
print('X_tile',X_tile.shape,X_tile)
'''
X_tile 
torch.Size([50, 50]) 
tensor([[0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        ...,
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565]])
'''
# Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出
Y_tile = y_train.repeat((n_train, 1))
'''
torch.eye(3)
tensor([[ 1.,  0.,  0.],
        [ 0.,  1.,  0.],
        [ 0.,  0.,  1.]])
'''
x_f=(1 - torch.eye(n_train)).type(torch.bool)
'''
x_f 
torch.Size([50, 50]) 
tensor([[False,  True,  True,  ...,  True,  True,  True],
        [ True, False,  True,  ...,  True,  True,  True],
        [ True,  True, False,  ...,  True,  True,  True],
        ...,
        [ True,  True,  True,  ..., False,  True,  True],
        [ True,  True,  True,  ...,  True, False,  True],
        [ True,  True,  True,  ...,  True,  True, False]])
'''
print('x_f',x_f.shape,x_f)
# keys的形状:('n_train','n_train'-1) 返回一个二维张量,对角线上为 1,其他位置为 0。#
#任何一个训练样本的输入都会和除自己以外的所有训练样本的“键-值”对进行计算, 从而得到其对应的预测输出。
keys = X_tile[x_f].reshape((n_train, -1))
print('keys',keys.shape,keys)
'''
X_tile 
torch.Size([50, 50]) 
tensor([[0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        ...,
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.8935, 4.9268, 4.9565]])
'''
#任何一个训练样本的输入都会和除自己以外的所有训练样本的“键-值”对进行计算, 从而得到其对应的预测输出。
'''
torch.Size([50, 49])
tensor([[0.0979, 0.2568, 0.2891,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.2568, 0.2891,  ..., 4.8935, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2891,  ..., 4.8935, 4.9268, 4.9565],
        ...,
        [0.0618, 0.0979, 0.2568,  ..., 4.7557, 4.9268, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.7557, 4.8935, 4.9565],
        [0.0618, 0.0979, 0.2568,  ..., 4.7557, 4.8935, 4.9268]])

'''

# values的形状:('n_train','n_train'-1)
values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1))

# 2 创建模型
net = NWKernelRegression()
# 3 创建损失
loss = nn.MSELoss(reduction='none')
# 4 更新迭代器
trainer = torch.optim.SGD(net.parameters(), lr=0.5)
# 画图
animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5])

for epoch in range(5):
    trainer.zero_grad()
    y_predict=net(x_train, keys, values)
    l = loss(y_predict, y_train)
    l.sum().backward()
    trainer.step()
    print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}')
    animator.add(epoch + 1, float(l.sum()))

##########+=======================测试=============
n_test=n_train
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = x_train.repeat((n_test, 1))
# value的形状:(n_test,n_train)
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)


show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),
                  xlabel='Sorted training inputs',
                  ylabel='Sorted testing inputs')