https://zh.d2l.ai/chapter_attention-mechanisms/nadaraya-waston.html
from d2l import torch as d2l import torch from torch import nn #@save def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5), cmap='Reds'): """显示矩阵热图""" d2l.use_svg_display() num_rows, num_cols = matrices.shape[0], matrices.shape[1] fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize, sharex=True, sharey=True, squeeze=False) for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)): for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)): pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap) if i == num_rows - 1: ax.set_xlabel(xlabel) if j == 0: ax.set_ylabel(ylabel) if titles: ax.set_title(titles[j]) fig.colorbar(pcm, ax=axes, shrink=0.6) d2l.plt.show() # attention_weights = torch.eye(10).reshape((1, 1, 10, 10)) # show_heatmaps(attention_weights, xlabel='Keys', ylabel='Queries') #====================1 训练数据 ==================== n_train = 50 # 训练样本数 x_source=torch.rand(n_train) * 5 # 原始数据 #包含了从区间[0, 1)的均匀分布中抽取的一组随机数。张量的形状由参数sizes定义。 0-1 50个数 0.1-4.9 x_train, _ = torch.sort(x_source) # 排序后的训练样本 print('x_train',x_train) def f(x): return 2 * torch.sin(x) + x**0.8 y_train = f(x_train) + torch.normal(0.0, 0.5, (n_train,)) # 训练样本的输出 #====================2 测试数据 ==================== x_test = torch.arange(0, 5, 0.1) # 测试样本 0-5 0.1 50个数据 y_truth = f(x_test) # 测试样本的真实输出 # n_test = len(x_test) # 测试样本数 # n_test #3 下面的函数将绘制所有的训练样本(样本由圆圈表示), 不带噪声项的真实数据生成函数 #(标记为“Truth”), 以及学习得到的预测函数(标记为“Pred”)。 def plot_kernel_reg(y_hat): d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'], xlim=[0, 5], ylim=[-1, 5]) d2l.plt.plot(x_train, y_train, 'o', alpha=0.5) d2l.plt.show() #=======================4-1 平均汇聚 # 前面跳过了训练过程, 直接显示的给了计算代替网络预测,直接最后一步平均汇聚层 run=0 if run: y_out =y_train.mean()# y 的输出 最后一层平均汇聚层 y_hat = torch.repeat_interleave(y_out, n_train) # 1*n_train列输出 每一个yi 求平均后,都是均值 ,在回复维度[] #print('y_hat',y_hat) # 传入多维张量,默认`展平` # >>> y = torch.tensor([[1, 2], [3, 4]]) # >>> torch.repeat_interleave(y, 2) # tensor([1, 1, 2, 2, 3, 3, 4, 4]) # 横坐标x_test, [真值y_truth, 平均汇聚层预测y_hat], plot_kernel_reg(y_hat) #====================4-2 非参数注意力汇聚========================== run=0 if run: # 根据输入的位置对输出yi进行加权: # X_repeat的形状:(n_test,n_train), # 每一行都包含着相同的测试输入(例如:同样的查询) print('x_test',x_test.shape,x_test) # torch.Size([50]) ''' x_test torch.Size([50]) tensor([0.0000, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, 0.9000, 1.0000, 1.1000, 1.2000, 1.3000, 1.4000, 1.5000, 1.6000, 1.7000, 1.8000, 1.9000, 2.0000, 2.1000, 2.2000, 2.3000, 2.4000, 2.5000, 2.6000, 2.7000, 2.8000, 2.9000, 3.0000, 3.1000, 3.2000, 3.3000, 3.4000, 3.5000, 3.6000, 3.7000, 3.8000, 3.9000, 4.0000, 4.1000, 4.2000, 4.3000, 4.4000, 4.5000, 4.6000, 4.7000, 4.8000, 4.9000]) ''' #print('x_test.repeat_interleave(n_train)',x_test.repeat_interleave(n_train).shape,x_test.repeat_interleave(n_train)) # torch.Size([2500]) # 构造查询表 X_repeat = x_test.repeat_interleave(n_train).reshape((-1, n_train)) print('X_repeat',X_repeat.shape,X_repeat) # torch.Size([50, 50]) ''' X_repeat torch.Size([50, 50]) tensor([[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000], [0.1000, 0.1000, 0.1000, ..., 0.1000, 0.1000, 0.1000], [0.2000, 0.2000, 0.2000, ..., 0.2000, 0.2000, 0.2000], ..., [4.7000, 4.7000, 4.7000, ..., 4.7000, 4.7000, 4.7000], [4.8000, 4.8000, 4.8000, ..., 4.8000, 4.8000, 4.8000], [4.9000, 4.9000, 4.9000, ..., 4.9000, 4.9000, 4.9000]]) x_train torch.Size([1, 50]) tensor([0.1249, 0.2723, 0.3242, 0.3747, 0.6435, 0.7526, 0.7749, 0.9694, 0.9709, 1.1660, 1.3965, 1.4592, 1.5059, 1.6240, 1.6567, 1.9198, 1.9289, 1.9650, 1.9665, 2.0000, 2.0822, 2.1460, 2.2586, 2.2702, 2.3153, 2.4764, 2.6111, 2.6732, 2.9376, 3.1270, 3.2933, 3.3839, 3.3909, 3.4030, 3.4695, 3.6524, 3.6915, 3.7456, 3.8196, 3.8434, 3.8556, 3.9236, 4.2003, 4.2841, 4.2882, 4.5061, 4.5877, 4.6141, 4.7991, 4.8649]) ''' # x_train包含着键。attention_weights的形状:(n_test,n_train), # 每一行都包含着要在给定的每个查询的值(y_train)之间分配的注意力权重 # x_train [0,0.1] print((X_repeat - x_train).shape)#torch.Size([50, 50]) # X_repeat 查询表 # x_train 原始数据 # 位置权重 attention_weights = nn.functional.softmax(-(X_repeat - x_train)**2 / 2, dim=1) # # 键 y_hat的每个元素都是值的加权平均值,其中的权重是注意力权重 y_hat = torch.matmul(attention_weights, y_train) plot_kernel_reg(y_hat) print('y_hat',y_hat.shape,y_hat) #y_hat torch.Size([50]) show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0), xlabel='Sorted training inputs', ylabel='Sorted testing inputs') #====================4-3 带参数注意力汇聚========================== class NWKernelRegression(nn.Module): def __init__(self, **kwargs): super().__init__(**kwargs) self.w = nn.Parameter(torch.rand((1,), requires_grad=True)) def forward(self, queries, keys, values): # queries和attention_weights的形状为(查询个数,“键-值”对个数) queries = queries.repeat_interleave(keys.shape[1]).reshape((-1, keys.shape[1])) self.attention_weights = nn.functional.softmax( -((queries - keys) * self.w)**2 / 2, dim=1) # values的形状为(查询个数,“键-值”对个数) return torch.bmm(self.attention_weights.unsqueeze(1), values.unsqueeze(-1)).reshape(-1) # 1 数据初始化 # X_tile的形状:(n_train,n_train),每一行都包含着相同的训练输入 X_tile = x_train.repeat((n_train, 1)) print('X_tile',X_tile.shape,X_tile) ''' X_tile torch.Size([50, 50]) tensor([[0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], ..., [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565]]) ''' # Y_tile的形状:(n_train,n_train),每一行都包含着相同的训练输出 Y_tile = y_train.repeat((n_train, 1)) ''' torch.eye(3) tensor([[ 1., 0., 0.], [ 0., 1., 0.], [ 0., 0., 1.]]) ''' x_f=(1 - torch.eye(n_train)).type(torch.bool) ''' x_f torch.Size([50, 50]) tensor([[False, True, True, ..., True, True, True], [ True, False, True, ..., True, True, True], [ True, True, False, ..., True, True, True], ..., [ True, True, True, ..., False, True, True], [ True, True, True, ..., True, False, True], [ True, True, True, ..., True, True, False]]) ''' print('x_f',x_f.shape,x_f) # keys的形状:('n_train','n_train'-1) 返回一个二维张量,对角线上为 1,其他位置为 0。# #任何一个训练样本的输入都会和除自己以外的所有训练样本的“键-值”对进行计算, 从而得到其对应的预测输出。 keys = X_tile[x_f].reshape((n_train, -1)) print('keys',keys.shape,keys) ''' X_tile torch.Size([50, 50]) tensor([[0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], ..., [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.8935, 4.9268, 4.9565]]) ''' #任何一个训练样本的输入都会和除自己以外的所有训练样本的“键-值”对进行计算, 从而得到其对应的预测输出。 ''' torch.Size([50, 49]) tensor([[0.0979, 0.2568, 0.2891, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.2568, 0.2891, ..., 4.8935, 4.9268, 4.9565], [0.0618, 0.0979, 0.2891, ..., 4.8935, 4.9268, 4.9565], ..., [0.0618, 0.0979, 0.2568, ..., 4.7557, 4.9268, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.7557, 4.8935, 4.9565], [0.0618, 0.0979, 0.2568, ..., 4.7557, 4.8935, 4.9268]]) ''' # values的形状:('n_train','n_train'-1) values = Y_tile[(1 - torch.eye(n_train)).type(torch.bool)].reshape((n_train, -1)) # 2 创建模型 net = NWKernelRegression() # 3 创建损失 loss = nn.MSELoss(reduction='none') # 4 更新迭代器 trainer = torch.optim.SGD(net.parameters(), lr=0.5) # 画图 animator = d2l.Animator(xlabel='epoch', ylabel='loss', xlim=[1, 5]) for epoch in range(5): trainer.zero_grad() y_predict=net(x_train, keys, values) l = loss(y_predict, y_train) l.sum().backward() trainer.step() print(f'epoch {epoch + 1}, loss {float(l.sum()):.6f}') animator.add(epoch + 1, float(l.sum())) ##########+=======================测试============= n_test=n_train # keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键) keys = x_train.repeat((n_test, 1)) # value的形状:(n_test,n_train) values = y_train.repeat((n_test, 1)) y_hat = net(x_test, keys, values).unsqueeze(1).detach() plot_kernel_reg(y_hat) show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0), xlabel='Sorted training inputs', ylabel='Sorted testing inputs')