面试凉经

估计没有二面了,这次被拷打的狠狠的,估计是无了,第一次视觉面,没有准备相关内容,第二天去看,果然已经流程结束
20240430181423

自我介绍

和前面一样先自我介绍。

项目拷打

这次我投的计算机视觉算法,面试的比较对口,面试官就问的很细,面试首先是问了几个项目的内容,我前面做的任务主要是通用视觉目标检测、姿态估计、分割等这些的项目,这些项目都没什么深度,主要是用,然后问的很细,就很难受,被拷打出来了没做啥有深度的内容,主要是应用,这块就不太行了。前面的面试面试官不是研究视觉的,就只大致问问做项目过程中遇到的难点,困难如何解决的,这次要先把做的任务说清楚,不清楚就会继续不断追问,然后说清楚做了什么,涉及较难的部分还会重点追问。但是这部分我往往都是只应用,并没有结构上的创新,然后具体用的什么模型,如何训练,如何处理,等等都会追问,问的非常细,项目虽然是自己做的,但是很多东西也没怎么想到,就非常难受。这里面除了一个配准的专利面试官可能没做过没问,其他所有项目比赛都非常仔细的抠细节。

现在想想,前面做过的项目也没有系统的整理里面的亮点和创新点等等,而且做的时候也很多时候只是用现有的技术做了,但是没有去想过创新点,含金量就不太够,还是要把做过的东西包装一下。

简单的数据处理脚本

然后面试官说让做题,首先是一个简单的路径字符串列表变成文件树的任务,这个非常简单的,这种类似的见得多了,但是当时太紧张了,然后这个我就没写出来,实在是不应该,其实另外一个写不出来的问题在于,平时写代码都有Github Copilot辅助,基本都是只需要写个大致的思路,辅助编程就能自动补全了,具体某个函数名字和参数什么的真不怎么记住,这个也是一个问题,以后要多写多记住一些基础的东西。

具体题目是:

1
2
3
4
news/car/suv
news/car/mpv
news/sport/basketball
news/sport/football
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def tree(str_list):
n = len(str_list)
category_dict = {}
for i in range(n):
strs = str_list[i].split('/')
category, name = strs[1], strs[2]
if category not in category_dict:
# 这一步我都没想起来,实在是太离谱了
category_dict[category] = [name]
else:
category_dict[category].append(name)
return category_dict

def print_tree(category_dict, depth=1):
print('news')
for key, value in category_dict.items():
print('\t' * depth + key)
if isinstance(value, dict):
print_tree(value, depth + 1)
else:
for v in value:
print('\t' * (depth + 1) + v)
if __name__ == '__main__':
str_list = ['news/car/suv', 'news/car/mpv', 'news/sport/basketball', 'news/sport/football']
category_dict = tree(str_list)
print_tree(category_dict)

logistic回归

让说一下logistic回归,我就说了下公式,然后说前些年用,因为方便求导,简单说了下,具体用来更新模型的参数没说。这相关的算法也是学过的,但是要清晰讲出来还是要准备的。

手撕注意力

然后让写一个attention的公式,然后也没写出来,就简单说了下原理,肯定是不行的,这种都是算法的八股,我应该提前准备下的。

chatgpt答案

注意力机制是一种让模型能够在处理信息时自动地聚焦于最重要信息的技术。在深度学习中,注意力机制通常用于序列模型,特别是在自然语言处理领域。最常见的注意力机制是由 Vaswani 等人在 2017 年提出的“Scaled Dot-Product Attention”。

注意力机制的数学公式

基础的注意力机制可以表示为三个核心组件:查询(Query),键(Key),值(Value)。给定查询 ( Q ),键 ( K ),和值 ( V ),注意力权重计算通常包括以下步骤:

  1. 注意力得分(Score)计算
    [ \text{Score}(Q, K) = QK^T ]

  2. 缩放(Scaling)
    [ \text{Scaled Score} = \frac{\text{Score}}{\sqrt{d_k}} ]
    其中 ( d_k ) 是键向量的维度,这一步骤是为了避免计算中的梯度过小。

  3. Softmax 归一化
    [ \text{Attention Weights} = \text{softmax}(\text{Scaled Score}) ]

  4. 输出计算
    [ \text{Output} = \text{Attention Weights} \cdot V ]

代码实现

在 Python 中,可以使用 PyTorch 这样的深度学习框架来实现注意力机制。下面是一个简单的 Scaled Dot-Product Attention 的实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import torch
import torch.nn.functional as F

def scaled_dot_product_attention(Q, K, V, mask=None):
"""
实现 Scaled Dot-Product Attention。
参数:
Q: 查询张量 [batch_size, num_heads, query_len, key_dim]
K: 键张量 [batch_size, num_heads, key_len, key_dim]
V: 值张量 [batch_size, num_heads, value_len, value_dim]
mask: 掩码张量,用于遮蔽不需要的值 [batch_size, 1, 1, key_len]

返回:
output: 注意力机制的结果 [batch_size, num_heads, query_len, value_dim]
attention_weights: 注意力权重 [batch_size, num_heads, query_len, key_len]
"""
d_k = Q.size(-1)

# 计算 Q 和 K 的点积,然后除以 sqrt(d_k)
scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))

# 如果有掩码,将掩码加到 scores 上
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))

# 应用 Softmax 函数获取注意力权重
attention_weights = F.softmax(scores, dim=-1)

# 计算最终的输出
output = torch.matmul(attention_weights, V)

return output, attention_weights

这段代码实现了 Scaled Dot-Product Attention,其中 Q, K, V 是输入的张量,可来自相同的或不同的输入源。mask 参数用于可选的掩码操作,通常用在处理变长序列时忽略填充项。注意力权重通过点积、缩放、Softmax 计算得出,最后使用这些权重对值 V 进行加权求和,得到每个查询的输出。

手撕Attention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Attention(nn.Module):
def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, attn_drop=0., proj_drop=0.):
super().__init__()
self.num_heads = num_heads
head_dim = dim // num_heads
# NOTE scale factor was wrong in my original version, can set manually to be compat with prev weights
self.scale = qk_scale or head_dim ** -0.5

self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
self.attn_drop = nn.Dropout(attn_drop)
self.proj = nn.Linear(dim, dim)
self.proj_drop = nn.Dropout(proj_drop)

def forward(self, x):
B, N, C = x.shape
qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
q, k, v = qkv[0], qkv[1], qkv[2] # make torchscript happy (cannot use tensor as tuple)

attn = (q @ k.transpose(-2, -1)) * self.scale
attn = attn.softmax(dim=-1)
attn = self.attn_drop(attn)

x = (attn @ v).transpose(1, 2).reshape(B, N, C)
x = self.proj(x)
x = self.proj_drop(x)
return x

MLP的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Mlp(nn.Module):
def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
super().__init__()
out_features = out_features or in_features
hidden_features = hidden_features or in_features
self.fc1 = nn.Linear(in_features, hidden_features)
self.act = act_layer()
self.fc2 = nn.Linear(hidden_features, out_features)
self.drop = nn.Dropout(drop)

def forward(self, x):
x = self.fc1(x)
x = self.act(x)
x = self.drop(x)
x = self.fc2(x)
x = self.drop(x)
return x

扩散模型

然后问扩散模型知道不,然后我说没做过,这里也是改准备下的,毕竟这么著名的模型,我虽然大致了解原理还有DDPM什么的,但是如果认真的讲出来,不成体系,我也就没敢说,这块还是要准备下。

chatgpt回答

扩散模型是一种新兴的深度学习方法,特别是在生成模型领域中表现出色。这种模型基于随机过程理论,特别是扩散过程,通过逐步增加随机性来转换数据,然后再逐步学习如何逆转这一过程,从而生成高质量的数据样本。这一方法在图像和声音生成中尤其流行。

扩散模型的基本原理

扩散模型包括两个主要阶段:正向过程(forward process)和反向过程(reverse process)。

  1. 正向过程(噪声添加过程)
    在正向过程中,模型逐步地将噪声引入到原始数据中。这通常通过多个时间步骤实现,每个时间步骤都会向数据添加更多的高斯噪声,直到数据完全转变为纯噪声。这个过程可以用以下等式表示:
    [
    x_{t} = \sqrt{\alpha_t} x_{t-1} + \sqrt{1-\alpha_t} \epsilon_t
    ]
    其中,( x_t ) 是在时间 ( t ) 的数据,( \alpha_t ) 是时间步骤中噪声的比例,而 ( \epsilon_t ) 是从标准正态分布中采样的噪声。

  2. 反向过程(噪声去除过程)
    反向过程旨在学习如何从纯噪声数据逐步恢复出原始数据。这是通过一个参数化的神经网络模型实现的,该模型试图预测给定噪声数据的前一时间步骤的状态。反向过程的目标是逐渐减少数据中的噪声,最终恢复出接近原始数据的状态。

训练扩散模型

扩散模型的训练通常依赖于变分下界(ELBO,Evidence Lower Bound)的优化。模型的目标是最小化重构数据和原始数据之间的差异,同时也优化生成数据的概率分布。训练过程需要大量的计算资源,因为它涉及到大量的正向和反向模拟步骤。

FM算法

然后问机器学习算法FM算法,然后我说不知道,这个我还真没听说过,查了下,是推荐的算法,因子分解机,真不懂,现学记住吧,chatgpt说:

chatgpt回答

FM(Factorization Machines,因子分解机)是一种广泛用于推荐系统和预测任务的机器学习算法。它被设计用来处理具有极大稀疏性的数据集,特别是在用户-物品交互数据中常见的那种稀疏性。

基本原理: 因子分解机模型主要是为了解决传统线性模型(如线性回归、逻辑回归)在处理大规模稀疏数据集时的不足。在传统的线性模型中,每个特征的权重是独立学习的,这使得模型难以捕捉特征间的交互作用。FM通过引入隐向量来为每个特征学习潜在的交互效应,这样可以有效地捕捉到特征之间的相互作用。
数学表示: 给定一个包含$n$个特征的数据集,FM模型的预测$\hat{y}$由下式给出:
$$
\hat{y}(\mathbf{x}) = w_0 + \sum_{i=1}^{n}w_i x_i + \sum_{i=1}^{n} \sum_{j=i+1}^{n} \langle \mathbf{v}_i, \mathbf{v}_j \rangle x_i x_j
$$
其中:

  • $w_0$ 是模型的全局偏置。
  • $w_i$ 是第$i$个特征的权重。
  • $\mathbf{x}$ 是输入特征向量。
  • $\mathbf{v}_i$ 和 $\mathbf{v}_j$ 是特征$i$和$j$的隐向量,隐向量的维度$k$通常远小于特征数量$n$。
  • $\langle \mathbf{v}_i, \mathbf{v}_j \rangle$ 表示特征$i$和$j$的隐向量的点积,用于模拟这两个特征的交互效果。

优势:

  1. 高效性:虽然FM模型包含二阶特征交互项,但是它可以在$O(kn)$的时间复杂度内完成计算,其中$k$是隐向量的维度,$n$是特征的数量。这种高效性主要来源于隐向量的点积运算方式,使得算法适用于大规模的数据集。

  2. 灵活性:FM模型不仅适用于任何实数特征,还可以很容易地扩展到高阶交互,如三阶或更高阶的交互。

  3. 泛化能力:由于特征间的交互是通过低维隐向量学习的,FM可以很好地泛化到新的、未见过的特征组合,尤其是在数据稀疏的场景下。

应用场景

  • 推荐系统:FM广泛用于推荐系统,能有效处理用户和物品的特征交互。
  • 点击率预测:在线广告和搜索引擎优化中,FM可以用来预测用户点击广告或搜索结果的概率。
  • 排名和分类任务:通过适当的目标函数和优化方法,FM可以适用于各种排名和分类问题。

因子分解机因其在处理稀疏数据和特征交互中的优势而被广泛研究和应用。

手写交叉熵损失

然后让写CrossEntropy,我还是没写出来,然后简单说了下公式的思想,

chatgpt回答

交叉熵损失(Cross-Entropy Loss)是用于衡量模型预测概率分布与真实标签概率分布之间的差异的一个常用损失函数。它在分类任务中特别常见,尤其是在处理二分类或多分类问题时。

交叉熵损失的公式

对于二分类问题,交叉熵损失可以表示为:

[ L = -\frac{1}{N} \sum_{i=1}^N [y_i \log(\hat{y}_i) + (1 - y_i) \log(1 - \hat{y}_i)] ]

其中:

  • ( N ) 是样本的数量。
  • ( y_i ) 是第 ( i ) 个样本的真实标签,通常为 0 或 1。
  • ( \hat{y}_i ) 是模型预测第 ( i ) 个样本为正类的概率。

对于多分类问题,公式扩展为:

[ L = -\frac{1}{N} \sum_{i=1}^N \sum_{c=1}^C y_{i,c} \log(\hat{y}_{i,c}) ]

其中:

  • ( C ) 是类别的总数。
  • ( y_{i,c} ) 是一个指示变量,如果样本 ( i ) 属于类别 ( c ),则为 1,否则为 0。
  • ( \hat{y}_{i,c} ) 是模型预测第 ( i ) 个样本属于类别 ( c ) 的概率。

代码实现

如果不使用库函数,可以手动实现交叉熵损失函数。这里我们将以 Python 为例展示如何实现多分类交叉熵损失函数。这将涉及到计算模型输出的对数概率,然后根据真实标签计算损失。

二分类交叉熵损失

首先,我们从较简单的二分类交叉熵损失开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
import numpy as np

def binary_cross_entropy_loss(y_true, y_pred):
# 防止 log(0) 导致数值问题,所以加上一个非常小的值 epsilon
epsilon = 1e-12
y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
loss = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
return loss

# 示例
y_true = np.array([0, 1, 1, 0])
y_pred = np.array([0.1, 0.9, 0.8, 0.2])
print("Binary Cross Entropy Loss:", binary_cross_entropy_loss(y_true, y_pred))

多分类交叉熵损失

对于多分类问题,我们通常使用独热编码的方式来表示真实的标签,然后计算每个类别的交叉熵损失:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def categorical_cross_entropy_loss(y_true, y_pred):
# 防止 log(0) 导致数值问题,所以加上一个非常小的值 epsilon
epsilon = 1e-12
y_pred = np.clip(y_pred, epsilon, 1.0 - epsilon)
# 计算交叉熵
loss = -np.sum(y_true * np.log(y_pred)) / y_true.shape[0]
return loss

# 示例
y_true = np.array([[0, 1, 0], [0, 0, 1], [1, 0, 0], [0, 1, 0]])
y_pred = np.array([
[0.05, 0.9, 0.05],
[0.1, 0.1, 0.8],
[0.7, 0.2, 0.1],
[0.1, 0.8, 0.1]
])
print("Categorical Cross Entropy Loss:", categorical_cross_entropy_loss(y_true, y_pred))

在这些示例中,y_pred 中的值必须是有效的概率分布,即所有预测概率的和应为1,且各概率值应在0到1之间。在实际应用中,模型的输出通常通过 softmax 函数来保证这一点。np.clip 用于防止数值问题,如计算对数时输入值为0。这些实现提供了对交叉熵损失计算的直接控制,避免了依赖外部库函数的需求。

U-Net相关

我说研究方向主要是医学影像分割分类,然后面试官说前面做的这些和这个也没什么关系,然后问医学影像出了什么成果,然后我也没,然后问U-Net,我说了下,然后问U-Net的原理,我说了下,然后后上下采样是如何做的,残差连接是如何做的,我说是拼接,这块要说原理我也是说的不太清楚,也没系统准备过。

然后问那个医学影像的竞赛,说用到nnUNet了,然后介绍下,也说不清。

然后问科研医学影像现在做到什么阶段了,我就说是分割分类任务的大致框架,具体仍然说不系统。

反问

然后就是反问,我问了实习那边任务是应用还是偏科研,然后面试官说不会以发论文为目的,是算法在具体任务重应用和优化,然后就结束了。

总结

总之这个面试是最差的一次,属于是项目没啥东西被拷打,后面问八股我也一点没准备,直接凉透,面完直接胳膊下面全是汗,湿透了,估计是凉了,不过也是一次经验,以后要多准备一些基础知识,吃一堑长一智,加油吧。

最后补个学习的内容:算法工程师面试常考手撕题

引用链接https://mp.weixin.qq.com/s/TAFvUlqdyqP-W6C10F1Hzw

  • 算法工程师面试常考手撕题
    • 注意力(Attention)篇
      • 手撕单头注意力机制(ScaledDotProductAttention)函数
      • 手撕多头注意力(MultiHeadAttention)
      • 手撕自注意力机制函数(SelfAttention)
      • GPT2 解码中的KV Cache
      • 手撕 MQA 算法
    • 基础机器学习算法篇
      • 手撕 numpy写线性回归的随机梯度下降(stochastic gradient descent,SGD)
      • 手撕 k-means 算法
    • 手撕 Layer Normalization 算法
    • 手撕 Batch Normalization 算法
    • 解码算法篇
      • 手撕 贪心搜索 (greedy search)
      • 手撕 集束搜索 beamsearch 算法
      • 手撕 温度参数采样(Temperature Sampling)算法
      • 手撕 Top-K Sampling算法
      • 手撕 Top-P (Nucleus) Sampling 算法
    • 神经网络篇
      • 手撕反向传播(backward propagation,BP)法
      • 手撕 卷积神经网络(CNN)法
      • 手撕 循环神经网络(RNN)法
      • 手撕 LSTM法
      • 手撕 二维卷积 算法
    • 位置编码篇
      • 手撕 绝对位置编码 算法
      • 手撕 可学习位置编码 算法
      • 手撕 相对位置编码 算法
      • 手撕 rope 算法
    • 面试题汇总
    • 致谢

注意力(Attention)篇

手撕单头注意力机制(ScaledDotProductAttention)函数

输入是query和 key-value,注意力机制首先计算query与每个key的关联性(compatibility),每个关联性作为每个value的权重(weight),各个权重与value的乘积相加得到输出。
image.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class ScaledDotProductAttention(nn.Module):
""" Scaled Dot-Product Attention """


def __init__(self, scale):
super().__init__()


self.scale = scale
self.softmax = nn.Softmax(dim=2)


def forward(self, q, k, v, mask=None):
u = torch.bmm(q, k.transpose(1, 2)) # 1.Matmul
u = u / self.scale # 2.Scale


if mask is not None:
u = u.masked_fill(mask, -np.inf) # 3.Mask


attn = self.softmax(u) # 4.Softmax
output = torch.bmm(attn, v) # 5.Output


return attn, output




if __name__ == "__main__":
n_q, n_k, n_v = 2, 4, 4
d_q, d_k, d_v = 128, 128, 64


q = torch.randn(batch, n_q, d_q)
k = torch.randn(batch, n_k, d_k)
v = torch.randn(batch, n_v, d_v)
mask = torch.zeros(batch, n_q, n_k).bool()


attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))
attn, output = attention(q, k, v, mask=mask)


print(attn)
print(output)

手撕多头注意力(MultiHeadAttention)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class MultiHeadAttention(nn.Module):
""" Multi-Head Attention """


def __init__(self, n_head, d_k_, d_v_, d_k, d_v, d_o):
super().__init__()


self.n_head = n_head
self.d_k = d_k
self.d_v = d_v


self.fc_q = nn.Linear(d_k_, n_head * d_k)
self.fc_k = nn.Linear(d_k_, n_head * d_k)
self.fc_v = nn.Linear(d_v_, n_head * d_v)


self.attention = ScaledDotProductAttention(scale=np.power(d_k, 0.5))


self.fc_o = nn.Linear(n_head * d_v, d_o)


def forward(self, q, k, v, mask=None):


n_head, d_q, d_k, d_v = self.n_head, self.d_k, self.d_k, self.d_v


batch, n_q, d_q_ = q.size()
batch, n_k, d_k_ = k.size()
batch, n_v, d_v_ = v.size()


q = self.fc_q(q) # 1.单头变多头
k = self.fc_k(k)
v = self.fc_v(v)
q = q.view(batch, n_q, n_head, d_q).permute(2, 0, 1, 3).contiguous().view(-1, n_q, d_q)
k = k.view(batch, n_k, n_head, d_k).permute(2, 0, 1, 3).contiguous().view(-1, n_k, d_k)
v = v.view(batch, n_v, n_head, d_v).permute(2, 0, 1, 3).contiguous().view(-1, n_v, d_v)


if mask is not None:
mask = mask.repeat(n_head, 1, 1)
attn, output = self.attention(q, k, v, mask=mask) # 2.当成单头注意力求输出


output = output.view(n_head, batch, n_q, d_v).permute(1, 2, 0, 3).contiguous().view(batch, n_q, -1) # 3.Concat
output = self.fc_o(output) # 4.仿射变换得到最终输出


return attn, output




if __name__ == "__main__":
n_q, n_k, n_v = 2, 4, 4
d_q_, d_k_, d_v_ = 128, 128, 64


q = torch.randn(batch, n_q, d_q_)
k = torch.randn(batch, n_k, d_k_)
v = torch.randn(batch, n_v, d_v_)
mask = torch.zeros(batch, n_q, n_k).bool()


mha = MultiHeadAttention(n_head=8, d_k_=128, d_v_=64, d_k=256, d_v=128, d_o=128)
attn, output = mha(q, k, v, mask=mask)


print(attn.size())
print(output.size())

手撕自注意力机制函数(SelfAttention)

Self-Attention。和Attention类似,他们都是一种注意力机制。不同的是Attention是source对target,输入的source和输出的target内容不同。例如英译中,输入英文,输出中文。而Self-Attention是source对source,是source内部元素之间或者target内部元素之间发生的Attention机制,也可以理解为Target=Source这种特殊情况下的注意力机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class SelfAttention(nn.Module):
""" Self-Attention """


def __init__(self, n_head, d_k, d_v, d_x, d_o):
self.wq = nn.Parameter(torch.Tensor(d_x, d_k))
self.wk = nn.Parameter(torch.Tensor(d_x, d_k))
self.wv = nn.Parameter(torch.Tensor(d_x, d_v))


self.mha = MultiHeadAttention(n_head=n_head, d_k_=d_k, d_v_=d_v, d_k=d_k, d_v=d_v, d_o=d_o)


self.init_parameters()


def init_parameters(self):
for param in self.parameters():
stdv = 1. / np.power(param.size(-1), 0.5)
param.data.uniform_(-stdv, stdv)


def forward(self, x, mask=None):
q = torch.matmul(x, self.wq)
k = torch.matmul(x, self.wk)
v = torch.matmul(x, self.wv)


attn, output = self.mha(q, k, v, mask=mask)


return attn, output




if __name__ == "__main__":
n_x = 4
d_x = 80


x = torch.randn(batch, n_x, d_x)
mask = torch.zeros(batch, n_x, n_x).bool()


selfattn = SelfAttention(n_head=8, d_k=128, d_v=64, d_x=80, d_o=80)
attn, output = selfattn(x, mask=mask)


print(attn.size())
print(output.size())

GPT2 解码中的KV Cache

无论是encoder-decoder结构,还是现在我们最接近AGI的decoder-only的LLM,解码生成时都是自回归auto-regressive的方式。
也就是,解码的时候,先根据当前输入 ,生成下一个 ,然后把新生成的 拼接在 后面,获得新的输入 ,再用 生成 ,依此迭代,直到生成结束。
我们可以注意到,下一个step的输入其实包含了上一个step的内容,而且只在最后面多了一点点(一个token)。那么下一个step的计算应该也包含了上一个step的计算。
但是模型在推理的时候可不管这些,无论你是不是只要最后一个字的输出,它都把所有输入计算一遍,给出所有输出结果。
也就是说中间有很多我们用不到的计算,这样就造成了浪费。
而且随着生成的结果越来越多,输入的长度也越来越长,上面这个例子里,输入长度就从step0的10个,每步增长1,直到step5的15个。如果输入的instruction是让模型写作文,那可能就有800个step。这个情况下,step0被算了800次,step1被算了799次…这样浪费的计算资源确实不容忽视。
有没有什么办法可以重复利用上一个step里已经计算过的结果,减少浪费呢?
答案就是KV Cache,利用一个缓存,把需要重复利用的中间计算结果存下来,减少重复计算。
而 k 和 v 就是我要缓存的对象。
想象一下,在上面的例子中,假设我们一开始的输入就是3个字,我们第一次预测就是预测第4个字,那么由于一开始没有任何缓存,所有我们每一层还是要老实地计算一遍。然后把 k 、 v 值缓存起来。
则有
image.jpg
kv cache的下标l表示模型层数。
在进行第二次预测,也就是预测第5个字的时候,在第l层的时候,由于前面我们缓存了每层的ku 值,那本层就只需要算新的 o3,而不用算 o0、o1、o2。
因为第l层的 o0、o1、o2本来会经过FNN层之后进到 l十1 层,再经过新的投影变换,成为 l + 1 层的 k、υ 值,但是l十 1 层的 k、υ值我们已经缓存过了!
然后我们把本次新增算出来的 k、υ 值也存入缓存。
image.jpg
这样就节省了attention和FFN的很多重复计算。
transformers中,生成的时候传入use_cache=True就会开启KV Cache。
也可以简单看下GPT2中的实现,中文注释的部分就是使用缓存结果和更新缓存结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
Class GPT2Attention(nn.Module):
...
...
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
layer_past: Optional[Tuple[torch.Tensor]] = None,
attention_mask: Optional[torch.FloatTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
encoder_hidden_states: Optional[torch.Tensor] = None,
encoder_attention_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
) -> Tuple[Union[torch.Tensor, Tuple[torch.Tensor]], ...]:
if encoder_hidden_states is not None:
if not hasattr(self, "q_attn"):
raise ValueError(
"If class is used as cross attention, the weights `q_attn` have to be defined. "
"Please make sure to instantiate class with `GPT2Attention(..., is_cross_attention=True)`."
)


query = self.q_attn(hidden_states)
key, value = self.c_attn(encoder_hidden_states).split(self.split_size, dim=2)
attention_mask = encoder_attention_mask
else:
query, key, value = self.c_attn(hidden_states).split(self.split_size, dim=2)


query = self._split_heads(query, self.num_heads, self.head_dim)
key = self._split_heads(key, self.num_heads, self.head_dim)
value = self._split_heads(value, self.num_heads, self.head_dim)


# 过去所存的值
if layer_past is not None:
past_key, past_value = layer_past
key = torch.cat((past_key, key), dim=-2) # 把当前新的key加入
value = torch.cat((past_value, value), dim=-2) # 把当前新的value加入


if use_cache is True:
present = (key, value) # 输出用于保存
else:
present = None


if self.reorder_and_upcast_attn:
attn_output, attn_weights = self._upcast_and_reordered_attn(query, key, value, attention_mask, head_mask)
else:
attn_output, attn_weights = self._attn(query, key, value, attention_mask, head_mask)


attn_output = self._merge_heads(attn_output, self.num_heads, self.head_dim)
attn_output = self.c_proj(attn_output)
attn_output = self.resid_dropout(attn_output)


outputs = (attn_output, present)
if output_attentions:
outputs += (attn_weights,)


return outputs # a, present, (attentions)

总的来说,KV Cache是以空间换时间的做法,通过使用快速的缓存存取,减少了重复计算。(注意,只有decoder结构的模型可用,因为有mask attention的存在,使得前面的token可以不用关注后面的token)

手撕 MQA 算法

MQA 让所有的头之间 共享 同一份 Key 和 Value 矩阵,每个头只单独保留了一份 Query 参数,从而大大减少 Key 和 Value 矩阵的参数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class MultiQueryAttention(nn.Module):
"""Multi-Query self attention.


Using torch or triton attention implemetation enables user to also use
additive bias.
"""


def __init__(
self,
d_model: int,
n_heads: int,
attn_impl: str = 'triton',
clip_qkv: Optional[float] = None,
qk_ln: bool = False,
softmax_scale: Optional[float] = None,
attn_pdrop: float = 0.0,
low_precision_layernorm: bool = False,
verbose: int = 0,
device: Optional[str] = None,
):
super().__init__()


self.attn_impl = attn_impl
self.clip_qkv = clip_qkv
self.qk_ln = qk_ln


self.d_model = d_model
self.n_heads = n_heads
self.head_dim = d_model // n_heads
self.softmax_scale = softmax_scale
if self.softmax_scale is None:
self.softmax_scale = 1 / math.sqrt(self.head_dim)
self.attn_dropout_p = attn_pdrop


self.Wqkv = nn.Linear(
d_model,
d_model + 2 * self.head_dim,
device=device,
)


fuse_splits = (d_model, d_model + self.head_dim)
self.Wqkv._fused = (0, fuse_splits) # type: ignore


self.attn_fn = scaled_multihead_dot_product_attention
self.out_proj = nn.Linear(self.d_model, self.d_model, device=device)
self.out_proj._is_residual = True # type: ignore


def forward(
self,
x,
past_key_value=None,
attn_bias=None,
attention_mask=None,
is_causal=True,
needs_weights=False,
):
qkv = self.Wqkv(x) # (1, 512, 960)


if self.clip_qkv:
qkv.clamp_(min=-self.clip_qkv, max=self.clip_qkv)


query, key, value = qkv.split( # query -> (1, 512, 768)
[self.d_model, self.head_dim, self.head_dim], # key -> (1, 512, 96)
dim=2 # value -> (1, 512, 96)
)


key_padding_mask = attention_mask


if self.qk_ln:
# Applying layernorm to qk
dtype = query.dtype
query = self.q_ln(query).to(dtype)
key = self.k_ln(key).to(dtype)


context, attn_weights, past_key_value = self.attn_fn(
query,
key,
value,
self.n_heads,
past_key_value=past_key_value,
softmax_scale=self.softmax_scale,
attn_bias=attn_bias,
key_padding_mask=key_padding_mask,
is_causal=is_causal,
dropout_p=self.attn_dropout_p,
training=self.training,
needs_weights=needs_weights,
multiquery=True,
)


return self.out_proj(context), attn_weights, past_key_value

基础机器学习算法篇

手撕 numpy写线性回归的随机梯度下降(stochastic gradient descent,SGD)

在每次更新时用1个样本,可以看到多了随机两个字,随机也就是说我们用样本中的一个例子来近似我所有的样本,来调整θ,因而随机梯度下降是会带来一定的问题,因为计算得到的并不是准确的一个梯度,对于最优化问题,凸问题,虽然不是每次迭代得到的损失函数都向着全局最优方向, 但是大的整体的方向是向全局最优解的,最终的结果往往是在全局最优解附近。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# 数据加载
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split


X, Y = fetch_california_housing(return_X_y=True)
X.shape, Y.shape # (20640, 8), (20640, )


# 数据预处理
ones = np.ones(shape=(X.shape[0], 1))
X = np.hstack([X, ones])
validate_size = 0.2
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=validate_size, shuffle=True)


# batch 函数
def get_batch(batchsize: int, X: np.ndarray, Y: np.ndarray):
assert 0 == X.shape[0]%batchsize, f'{X.shape[0]}%{batchsize} != 0'
batchnum = X.shape[0]//batchsize
X_new = X.reshape((batchnum, batchsize, X.shape[1]))
Y_new = Y.reshape((batchnum, batchsize, ))


for i in range(batchnum):
yield X_new[i, :, :], Y_new[i, :]


# 损失函数
def mse(X: np.ndarray, Y: np.ndarray, W: np.ndarray):
return 0.5 * np.mean(np.square(X@W-Y))


def diff_mse(X: np.ndarray, Y: np.ndarray, W: np.ndarray):
return X.T@(X@W-Y) / X.shape[0]


# 模型训练
lr = 0.001 # 学习率
num_epochs = 1000 # 训练周期
batch_size = 64 # |每个batch包含的样本数
validate_every = 4 # 多少个周期进行一次检验
def train(num_epochs: int, batch_size: int, validate_every: int, W0: np.ndarray, X_train: np.ndarray, Y_train: np.ndarray, X_test: np.ndarray, Y_test: np.ndarray):
loop = tqdm(range(num_epochs))
loss_train = []
loss_validate = []
W = W0
# 遍历epoch
for epoch in loop:
loss_train_epoch = 0
# 遍历batch
for x_batch, y_batch in get_batch(64, X_train, Y_train):
loss_batch = mse(X=x_batch, Y=y_batch, W=W)
loss_train_epoch += loss_batch*x_batch.shape[0]/X_train.shape[0]
grad = diff_mse(X=x_batch, Y=y_batch, W=W)
W = W - lr*grad


loss_train.append(loss_train_epoch)
loop.set_description(f'Epoch: {epoch}, loss: {loss_train_epoch}')


if 0 == epoch%validate_every:
loss_validate_epoch = mse(X=X_test, Y=Y_test, W=W)
loss_validate.append(loss_validate_epoch)
print('============Validate=============')
print(f'Epoch: {epoch}, train loss: {loss_train_epoch}, val loss: {loss_validate_epoch}')
print('================================')
plot_loss(np.array(loss_train), np.array(loss_validate), validate_every)


# 程序运行
W0 = np.random.random(size=(X.shape[1], )) # 初始权重
train(num_epochs=num_epochs, batch_size=batch_size, validate_every=validate_every, W0=W0, X_train=X_train, Y_train=Y_train, X_test=X_test, Y_test=Y_test)

image.jpg

手撕 k-means 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import numpy as np
def kmeans(data, k, thresh=1, max_iterations=100):
# 随机初始化k个中心点
centers = data[np.random.choice(data.shape[0], k, replace=False)]


for _ in range(max_iterations):
# 计算每个样本到各个中心点的距离
distances = np.linalg.norm(data[:, None] - centers, axis=2)


# 根据距离最近的中心点将样本分配到对应的簇
labels = np.argmin(distances, axis=1)


# 更新中心点为每个簇的平均值
new_centers = np.array([data[labels == i].mean(axis=0) for i in range(k)])


# 判断中心点是否收敛,多种收敛条件可选
# 条件1:中心点不再改变
if np.all(centers == new_centers):
break
# 条件2:中心点的阈值小于某个阈值
# center_change = np.linalg.norm(new_centers - centers)
# if center_change < thresh:
# break
centers = new_centers


return labels, centers


# 生成一些随机数据作为示例输入
data = np.random.rand(100, 2) # 100个样本,每个样本有两个特征


# 手动实现K均值算法
k = 3 # 聚类数为3
labels, centers = kmeans(data, k)


# 打印簇标签和聚类中心点
print("簇标签:", labels)
print("聚类中心点:", centers)

手撕 Layer Normalization 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import torch
from torch import nn

class LN(nn.Module):
# 初始化
def __init__(self, normalized_shape, # 在哪个维度上做LN
eps:float = 1e-5, # 防止分母为0
elementwise_affine:bool = True): # 是否使用可学习的缩放因子和偏移因子
super(LN, self).__init__()
# 需要对哪个维度的特征做LN, torch.size查看维度
self.normalized_shape = normalized_shape # [c,w*h]
self.eps = eps
self.elementwise_affine = elementwise_affine
# 构造可训练的缩放因子和偏置
if self.elementwise_affine:
self.gain = nn.Parameter(torch.ones(normalized_shape)) # [c,w*h]
self.bias = nn.Parameter(torch.zeros(normalized_shape)) # [c,w*h]

# 前向传播
def forward(self, x: torch.Tensor): # [b,c,w*h]
# 需要做LN的维度和输入特征图对应维度的shape相同
assert self.normalized_shape == x.shape[-len(self.normalized_shape):] # [-2:]
# 需要做LN的维度索引
dims = [-(i+1) for i in range(len(self.normalized_shape))] # [b,c,w*h]维度上取[-1,-2]维度,即[c,w*h]
# 计算特征图对应维度的均值和方差
mean = x.mean(dim=dims, keepdims=True) # [b,1,1]
mean_x2 = (x**2).mean(dim=dims, keepdims=True) # [b,1,1]
var = mean_x2 - mean**2 # [b,c,1,1]
x_norm = (x-mean) / torch.sqrt(var+self.eps) # [b,c,w*h]
# 线性变换
if self.elementwise_affine:
x_norm = self.gain * x_norm + self.bias # [b,c,w*h]
return x_norm

# ------------------------------- #
# 验证
# ------------------------------- #

if __name__ == '__main__':

x = torch.linspace(0, 23, 24, dtype=torch.float32) # 构造输入层
x = x.reshape([2,3,2*2]) # [b,c,w*h]
# 实例化
ln = LN(x.shape[1:])
# 前向传播
x = ln(x)
print(x.shape)

手撕 Batch Normalization 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class MyBN:
def __init__(self, momentum=0.01, eps=1e-5, feat_dim=2):
"""
初始化参数值
:param momentum: 动量,用于计算每个batch均值和方差的滑动均值
:param eps: 防止分母为0
:param feat_dim: 特征维度
"""
# 均值和方差的滑动均值
self._running_mean = np.zeros(shape=(feat_dim, ))
self._running_var = np.ones((shape=(feat_dim, ))
# 更新self._running_xxx时的动量
self._momentum = momentum
# 防止分母计算为0
self._eps = eps
# 对应Batch Norm中需要更新的beta和gamma,采用pytorch文档中的初始化值
self._beta = np.zeros(shape=(feat_dim, ))
self._gamma = np.ones(shape=(feat_dim, ))


def batch_norm(self, x):
"""
BN向传播
:param x: 数据
:return: BN输出
"""
if self.training:
x_mean = x.mean(axis=0)
x_var = x.var(axis=0)
# 对应running_mean的更新公式
self._running_mean = (1-self._momentum)*x_mean + self._momentum*self._running_mean
self._running_var = (1-self._momentum)*x_var + self._momentum*self._running_var
# 对应论文中计算BN的公式
x_hat = (x-x_mean)/np.sqrt(x_var+self._eps)
else:
x_hat = (x-self._running_mean)/np.sqrt(self._running_var+self._eps)
return self._gamma*x_hat + self._beta

解码算法篇

手撕 贪心搜索 (greedy search)

贪心搜索(greedy search)在每个时间步 t 都选取当前概率分布中概率最大的词,即
image.jpg
直到 yt 为或达到预设最大长度时停止生成。
贪心搜索本质上是局部最优策略,但并不能保证最终结果一定是全局最优的。由于贪心搜索在解码的任意时刻只保留一条候选序列,所以在搜索效率上,贪心搜索的复杂度显著低于穷举搜索。

1
2
3
4
5
6
7
8
9
10
11
def greedy_decoding(input_ids, max_tokens=300):
with torch.inference_mode():
for _ in range(max_tokens):
outputs = model(input_ids)
next_token_logits = outputs.logits[:, -1, :]
next_token = torch.argmax(next_token_logits, dim=-1)
if next_token == tokenizer.eos_token_id:
break
input_ids = torch.cat([input_ids, rearrange(next_token, 'c -> 1 c')], dim=-1)
generated_text = tokenizer.decode(input_ids[0])
return generated_text

手撕 集束搜索 beamsearch 算法

在NLP翻译或对话任务中,在句子解码阶段,经常用到一种搜索算法beam search。这个算法有时候在大厂面试中,甚至可能会被要求手写实现。这里就从beam search的原理出发,最后手写实现一个beam search。

  • 思路:beam search在贪心搜索上进一步扩大了搜索范围,贪心搜索每下一步只考虑当前最优的top-1结果,beam search考虑最优的top-k个结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import torch
import torch.nn.functional as F


def beam_search(LM_prob,beam_size = 3):
batch,seqlen,vocab_size = LM_prob.shape
#对LM_prob取对数
log_LM_prob = LM_prob.log()
#先选择第0个位置的最大beam_size个token,log_emb_prob与indices的shape为(batch,beam)
log_beam_prob, indices = log_LM_prob[:,0,:].topk(beam_size,sorted = True)
indices = indices.unsqueeze(-1)
#对每个长度进行beam search
for i in range(1,seqlen):
#log_beam_prob (batch,beam,vocab_size),每个beam的可能产生的概率
log_beam_prob = log_beam_prob.unsqueeze(-1) + log_LM_prob[:,i,:].unsqueeze(1).repeat(1,beam_size,1)
#选择当前步概率最高的token
log_beam_prob, index = log_beam_prob.view(batch,-1).topk(beam_size,sorted = True)
#下面的计算:beam_id选出新beam来源于之前的哪个beam;index代表真实的token id
#beam_id,index (batch,beam)
beam_id = index//vocab_size
index = index%vocab_size
mid = torch.Tensor([])
#对batch内每个样本循环,选出beam的同时拼接上新生成的token id
for j,bid,idx in zip(range(batch),beam_id,index):
x = torch.cat([indices[j][bid],idx.unsqueeze(-1)],-1)
mid = torch.cat([mid,x.unsqueeze(0)],0)
indices = mid
return indices,log_beam_prob


if __name__=='__main__':
# 建立一个语言模型 LM_prob (batch,seqlen,vocab_size)
LM_prob = F.softmax(torch.randn([32,20,1000]),dim = -1)
#最终返回每个候选,以及每个候选的log_prob,shape为(batch,beam_size,seqlen)
indices,log_prob = beam_search(LM_prob,beam_size = 3)
print(indices)

手撕 温度参数采样(Temperature Sampling)算法

温度参数采样(Temperature Sampling)常用于基于概率的生成模型,如语言模型。它通过引入一个称为“温度”(Temperature)的参数来调整模型输出的概率分布,从而控制生成文本的多样性。
在温度参数采样中,模型在每个时间步生成词语时,会计算出词语的条件概率分布。然后模型将这个条件概率分布中的每个词语的概率值除以温度参数,对结果进行归一化处理,获得新的归一化概率分布。较高的温度值会使概率分布更平滑,从而增加生成文本的多样性。低概率的词语也有较高的可能性被选择;而较低的温度值则会使概率分布更集中,更倾向于选择高概率的词语,因此生成的文本更加确定性。最后模型根据这个新的归一化概率分布进行随机采样,选择生成的词语。

1
2
3
4
5
6
7
8
9
import torch
import torch.nn.functional as F


def temperature_sampling(logits, temperature=1.0):
logits = logits / temperature
probabilities = F.softmax(logits, dim=-1)
sampled_token = torch.multinomial(probabilities, 1)
return sampled_token.item()

手撕 Top-K Sampling算法

Top-K 采样(在每个时间步选择条件概率排名前 K 的词语,然后在这 K 个词语中进行随机采样。这种方法既能保持一定的生成质量,又能增加文本的多样性,并且可以通过限制候选词语的数量来控制生成文本的多样性。
这个过程使得生成的文本在保持一定的生成质量的同时,也具有一定的多样性,因为在候选词语中仍然存在一定的竞争性。

1
2
3
4
5
6
7
8
9
10
11
12
def top_k_sampling(input_ids, max_tokens=100, top_k=50, temperature=1.0):
for _ in range(max_tokens):
with torch.inference_mode():
outputs = model(input_ids)
next_token_logits = outputs.logits[:, -1, :]
top_k_logits, top_k_indices = torch.topk(next_token_logits, top_k)
top_k_probs = F.softmax(top_k_logits / temperature, dim=-1)
next_token_index = torch.multinomial(top_k_probs, num_samples=1)
next_token = top_k_indices.gather(-1, next_token_index)
input_ids = torch.cat([input_ids, next_token], dim=-1)
generated_text = tokenizer.decode(input_ids[0])
return generated_text

手撕 Top-P (Nucleus) Sampling 算法

Nucleus Sampling(核采样),也被称为Top-p Sampling旨在在保持生成文本质量的同时增加多样性。这种方法可以视作是Top-K Sampling的一种变体,它在每个时间步根据模型输出的概率分布选择概率累积超过给定阈值p的词语集合,然后在这个词语集合中进行随机采样。这种方法会动态调整候选词语的数量,以保持一定的文本多样性。
在Nucleus Sampling中,模型在每个时间步生成词语时,首先按照概率从高到低对词汇表中的所有词语进行排序,然后模型计算累积概率,并找到累积概率超过给定阈值p的最小词语子集,这个子集就是所谓的“核”(nucleus)。模型在这个核中进行随机采样,根据词语的概率分布来选择最终输出的词语。这样做可以保证所选词语的总概率超过了阈值p,同时也保持了一定的多样性。
参数p是Nucleus Sampling中的重要参数,它决定了所选词语的概率总和。p的值会被设置在(0,1]之间,表示词语总概率的一个下界。
Nucleus Sampling 能够保持一定的生成质量,因为它在一定程度上考虑了概率分布。通过选择概率总和超过给定阈值p的词语子集进行随机采样,Nucleus Sampling 能够增加生成文本的多样性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def top_p_sampling(input_ids, max_tokens=100, top_p=0.95):
with torch.inference_mode():
for _ in range(max_tokens):
outputs = model(input_ids)
next_token_logits = outputs.logits[:, -1, :]
sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True)
sorted_probabilities = F.softmax(sorted_logits, dim=-1)
cumulative_probs = torch.cumsum(sorted_probabilities, dim=-1)
sorted_indices_to_remove = cumulative_probs > top_p
sorted_indices_to_remove[..., 0] = False
indices_to_remove = sorted_indices[sorted_indices_to_remove]
next_token_logits.scatter_(-1, indices_to_remove[None, :], float('-inf'))
probs = F.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
input_ids = torch.cat([input_ids, next_token], dim=-1)
generated_text = tokenizer.decode(input_ids[0])
return generated_text

神经网络篇

手撕反向传播(backward propagation,BP)法

BP算法就是反向传播,要输入的数据经过一个前向传播会得到一个输出,但是由于权重的原因,所以其输出会和你想要的输出有差距,这个时候就需要进行反向传播,利用梯度下降,对所有的权重进行更新,这样的话在进行前向传播就会发现其输出和你想要的输出越来越接近了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 生成权重以及偏执项layers_dim代表每层的神经元个数,
#比如[2,3,1]代表一个三成的网络,输入为2层,中间为3层输出为1层
def init_parameters(layers_dim):

L = len(layers_dim)
parameters ={}
for i in range(1,L):
parameters["w"+str(i)] = np.random.random([layers_dim[i],layers_dim[i-1]])
parameters["b"+str(i)] = np.zeros((layers_dim[i],1))
return parameters


def sigmoid(z):
return 1.0/(1.0+np.exp(-z))


# sigmoid的导函数
def sigmoid_prime(z):
return sigmoid(z) * (1-sigmoid(z))


# 前向传播,需要用到一个输入x以及所有的权重以及偏执项,都在parameters这个字典里面存储
# 最后返回会返回一个caches里面包含的 是各层的a和z,a[layers]就是最终的输出
def forward(x,parameters):
a = []
z = []
caches = {}
a.append(x)
z.append(x)
layers = len(parameters)//2
# 前面都要用sigmoid
for i in range(1,layers):
z_temp =parameters["w"+str(i)].dot(x) + parameters["b"+str(i)]
z.append(z_temp)
a.append(sigmoid(z_temp))
# 最后一层不用sigmoid
z_temp = parameters["w"+str(layers)].dot(a[layers-1]) + parameters["b"+str(layers)]
z.append(z_temp)
a.append(z_temp)

caches["z"] = z
caches["a"] = a
return caches,a[layers]


# 反向传播,parameters里面存储的是所有的各层的权重以及偏执,caches里面存储各层的a和z
# al是经过反向传播后最后一层的输出,y代表真实值
# 返回的grades代表着误差对所有的w以及b的导数
def backward(parameters,caches,al,y):
layers = len(parameters)//2
grades = {}
m = y.shape[1]
# 假设最后一层不经历激活函数
# 就是按照上面的图片中的公式写的
grades["dz"+str(layers)] = al - y
grades["dw"+str(layers)] = grades["dz"+str(layers)].dot(caches["a"][layers-1].T) /m
grades["db"+str(layers)] = np.sum(grades["dz"+str(layers)],axis = 1,keepdims = True) /m
# 前面全部都是sigmoid激活
for i in reversed(range(1,layers)):
grades["dz"+str(i)] = parameters["w"+str(i+1)].T.dot(grades["dz"+str(i+1)]) * sigmoid_prime(caches["z"][i])
grades["dw"+str(i)] = grades["dz"+str(i)].dot(caches["a"][i-1].T)/m
grades["db"+str(i)] = np.sum(grades["dz"+str(i)],axis = 1,keepdims = True) /m
return grades


# 就是把其所有的权重以及偏执都更新一下
def update_grades(parameters,grades,learning_rate):
layers = len(parameters)//2
for i in range(1,layers+1):
parameters["w"+str(i)] -= learning_rate * grades["dw"+str(i)]
parameters["b"+str(i)] -= learning_rate * grades["db"+str(i)]
return parameters
# 计算误差值
def compute_loss(al,y):
return np.mean(np.square(al-y))


# 加载数据
def load_data():
"""
加载数据集
"""
x = np.arange(0.0,1.0,0.01)
y =20* np.sin(2*np.pi*x)
# 数据可视化
plt.scatter(x,y)
return x,y
#进行测试
x,y = load_data()
x = x.reshape(1,100)
y = y.reshape(1,100)
plt.scatter(x,y)
parameters = init_parameters([1,25,1])
al = 0
for i in range(4000):
caches,al = forward(x, parameters)
grades = backward(parameters, caches, al, y)
parameters = update_grades(parameters, grades, learning_rate= 0.3)
if i %100 ==0:
print(compute_loss(al, y))
plt.scatter(x,al)
plt.show()

手撕 卷积神经网络(CNN)法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import torch
import torch.nn.functional as F #使用functional中的ReLu激活函数


#CNN模型
class CNNNet(torch.nn.Module):
def __init__(self):
super(CNNNet, self).__init__()
#两个卷积层
self.conv1 = torch.nn.Conv2d(1, 10, kernel_size=5) #1为in_channels 10为out_channels
self.conv2 = torch.nn.Conv2d(10, 20, kernel_size=5)
#池化层
self.pooling = torch.nn.MaxPool2d(2) #2为分组大小2*2
#全连接层 320 = 20 * 4 * 4
self.fc = torch.nn.Linear(320, 10)


def forward(self, x):
#先从x数据维度中得到batch_size
batch_size = x.size(0)
#卷积层->池化层->激活函数
x = F.relu(self.pooling(self.conv1(x)))
x = F.relu(self.pooling(self.conv2(x)))
x = x.view(batch_size, -1) #将数据展开,为输入全连接层做准备
x = self.fc(x)
return x
model = CNNNet()

手撕 循环神经网络(RNN)法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# encoding:utf-8
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch import nn


# 定义RNN模型(可以类别下方RNN简单测试代码理解)
class Rnn(nn.Module):
def __init__(self, input_size):
super(Rnn, self).__init__()
# 定义RNN网络
## hidden_size是自己设置的,貌似取值都是32,64,128这样来取值
## num_layers是隐藏层数量,超过2层那就是深度循环神经网络了
self.rnn = nn.RNN(
input_size=input_size,
hidden_size=32,
num_layers=1,
batch_first=True # 输入形状为[批量大小, 数据序列长度, 特征维度]
)
# 定义全连接层
self.out = nn.Linear(32, 1)


# 定义前向传播函数
def forward(self, x, h_0):
r_out, h_n = self.rnn(x, h_0)
# print("数据输出结果;隐藏层数据结果", r_out, h_n)
# print("r_out.size(), h_n.size()", r_out.size(), h_n.size())
outs = []
# r_out.size=[1,10,32]即将一个长度为10的序列的每个元素都映射到隐藏层上
for time in range(r_out.size(1)):
# print("映射", r_out[:, time, :])
# 依次抽取序列中每个单词,将之通过全连接层并输出.r_out[:, 0, :].size()=[1,32] -> [1,1]
outs.append(self.out(r_out[:, time, :]))
# print("outs", outs)
# stack函数在dim=1上叠加:10*[1,1] -> [1,10,1] 同时h_n已经被更新
return torch.stack(outs, dim=1), h_n


TIME_STEP = 10
INPUT_SIZE = 1
LR = 0.02
model = Rnn(INPUT_SIZE)
print(model)

手撕 LSTM法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding:UTF-8 -*-
import numpy as np
import torch
from torch import nn


# Define LSTM Neural Networks
class LstmRNN(nn.Module):
"""
Parameters:
- input_size: feature size
- hidden_size: number of hidden units
- output_size: number of output
- num_layers: layers of LSTM to stack
"""
def __init__(self, input_size, hidden_size=1, output_size=1, num_layers=1):
super().__init__()

self.lstm = nn.LSTM(input_size, hidden_size, num_layers) # utilize the LSTM model in torch.nn
self.forwardCalculation = nn.Linear(hidden_size, output_size)

def forward(self, _x):
x, _ = self.lstm(_x) # _x is input, size (seq_len, batch, input_size)
s, b, h = x.shape # x is output, size (seq_len, batch, hidden_size)
x = x.view(s*b, h)
x = self.forwardCalculation(x)
x = x.view(s, b, -1)
return x

手撕 二维卷积 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import numpy as np 
def conv2d(img, in_channels, out_channels ,kernels, bias, stride=1, padding=0):
N, C, H, W = img.shape
kh, kw = kernels.shape
p = padding
assert C == in_channels, "kernels' input channels do not match with img"


if p:
img = np.pad(img, ((0,0),(0,0),(p,p),(p,p)), 'constant') # padding along with all axis


out_h = (H + 2*padding - kh) // stride + 1
out_w = (W + 2*padding - kw) // stride + 1


outputs = np.zeros([N, out_channels, out_h, out_w])
# print(img)
for n in range(N):
for out in range(out_channels):
for i in range(in_channels):
for h in range(out_h):
for w in range(out_w):
for x in range(kh):
for y in range(kw):
outputs[n][out][h][w] += img[n][i][h * stride + x][w * stride + y] * kernels[x][y]
if i == in_channels - 1:
outputs[n][out][:][:] += bias[n][out]
return outputs

位置编码篇

手撕 绝对位置编码 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class SinPositionEncoding(nn.Module):
def __init__(self, max_sequence_length, d_model, base=10000):
super().__init__()
self.max_sequence_length = max_sequence_length
self.d_model = d_model
self.base = base


def forward(self):
pe = torch.zeros(self.max_sequence_length, self.d_model, dtype=torch.float) # size(max_sequence_length, d_model)
exp_1 = torch.arange(self.d_model // 2, dtype=torch.float) # 初始化一半维度,sin位置编码的维度被分为了两部分
exp_value = exp_1 / (self.d_model / 2)


alpha = 1 / (self.base ** exp_value) # size(dmodel/2)
out = torch.arange(self.max_sequence_length, dtype=torch.float)[:, None] @ alpha[None, :] # size(max_sequence_length, d_model/2)
embedding_sin = torch.sin(out)
embedding_cos = torch.cos(out)


pe[:, 0::2] = embedding_sin # 奇数位置设置为sin
pe[:, 1::2] = embedding_cos # 偶数位置设置为cos
return pe


SinPositionEncoding(d_model=4, max_sequence_length=10, base=10000).forward()

手撕 可学习位置编码 算法

1
2
3
4
5
6
7
8
9
10
11
class TrainablePositionEncoding(nn.Module):
def __init__(self, max_sequence_length, d_model):
super().__init__()
self.max_sequence_length = max_sequence_length
self.d_model = d_model


def forward(self):
pe = nn.Embedding(self.max_sequence_length, self.d_model)
nn.init.constant(pe.weight, 0.)
return pe

手撕 相对位置编码 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class RelativePosition(nn.Module):
def __init__(self, num_units, max_relative_position):
super().__init__()
self.num_units = num_units
self.max_relative_position = max_relative_position
self.embeddings_table = nn.Parameter(torch.Tensor(max_relative_position * 2 + 1, num_units))
nn.init.xavier_uniform_(self.embeddings_table)


def forward(self, length_q, length_k):
range_vec_q = torch.arange(length_q)
range_vec_k = torch.arange(length_k)
distance_mat = range_vec_k[None, :] - range_vec_q[:, None]
distance_mat_clipped = torch.clamp(distance_mat, -self.max_relative_position, self.max_relative_position)
final_mat = distance_mat_clipped + self.max_relative_position
final_mat = torch.LongTensor(final_mat).cuda()
embeddings = self.embeddings_table[final_mat].cuda()


return embeddings


class RelativeMultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads, dropout=0.1, batch_size=6):
"Take in model size and number of heads."
super(RelativeMultiHeadAttention, self).__init__()
self.d_model = d_model
self.n_heads = n_heads
self.batch_size = batch_size


assert d_model % n_heads == 0
self.head_dim = d_model // n_heads


self.linears = _get_clones(nn.Linear(d_model, d_model), 4)
self.dropout = nn.Dropout(p=dropout)
self.relative_position_k = RelativePosition(self.head_dim, max_relative_position=16)
self.relative_position_v = RelativePosition(self.head_dim, max_relative_position=16)


self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).cuda()


def forward(self, query, key, value):
# embedding
# query, key, value = [batch_size, len, hid_dim]
query, key, value = [l(x).view(self.batch_size, -1, self.d_model) for l, x in
zip(self.linears, (query, key, value))]


len_k = query.shape[1]
len_q = query.shape[1]
len_v = value.shape[1]


# Self-Attention
# r_q1, r_k1 = [batch_size, len, n_heads, head_dim]
r_q1 = query.view(self.batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
r_k1 = key.view(self.batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
attn1 = torch.matmul(r_q1, r_k1.permute(0, 1, 3, 2))


r_q2 = query.permute(1, 0, 2).contiguous().view(len_q, self.batch_size * self.n_heads, self.head_dim)
r_k2 = self.relative_position_k(len_q, len_k)
attn2 = torch.matmul(r_q2, r_k2.transpose(1, 2)).transpose(0, 1)
attn2 = attn2.contiguous().view(self.batch_size, self.n_heads, len_q, len_k)
attn = (attn1 + attn2) / self.scale


attn = self.dropout(torch.softmax(attn, dim=-1))
# attn = [batch_size, n_heads, len, len]
r_v1 = value.view(self.batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
weight1 = torch.matmul(attn, r_v1)
r_v2 = self.relative_position_v(len_q, len_v)
weight2 = attn.permute(2, 0, 1, 3).contiguous().view(len_q, self.batch_size * self.n_heads, len_k)
weight2 = torch.matmul(weight2, r_v2)
weight2 = weight2.transpose(0, 1).contiguous().view(self.batch_size, self.n_heads, len_q, self.head_dim)


x = weight1 + weight2
# x = [batch size, n heads, query len, head dim]


x = x.permute(0, 2, 1, 3).contiguous()
# x = [batch size, query len, n heads, head dim]


x = x.view(self.batch_size * len_q, self.d_model)
# x = [batch size * query len, hid dim]


return self.linears[-1](x)

手撕 rope 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


# %%


def sinusoidal_position_embedding(batch_size, nums_head, max_len, output_dim, device):
# (max_len, 1)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(-1)
# (output_dim//2)
ids = torch.arange(0, output_dim // 2, dtype=torch.float) # 即公式里的i, i的范围是 [0,d/2]
theta = torch.pow(10000, -2 * ids / output_dim)


# (max_len, output_dim//2)
embeddings = position * theta # 即公式里的:pos / (10000^(2i/d))


# (max_len, output_dim//2, 2)
embeddings = torch.stack([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)


# (bs, head, max_len, output_dim//2, 2)
embeddings = embeddings.repeat((batch_size, nums_head, *([1] * len(embeddings.shape)))) # 在bs维度重复,其他维度都是1不重复


# (bs, head, max_len, output_dim)
# reshape后就是:偶数sin, 奇数cos了
embeddings = torch.reshape(embeddings, (batch_size, nums_head, max_len, output_dim))
embeddings = embeddings.to(device)
return embeddings


# %%
def RoPE(q, k):
# q,k: (bs, head, max_len, output_dim)
batch_size = q.shape[0]
nums_head = q.shape[1]
max_len = q.shape[2]
output_dim = q.shape[-1]


# (bs, head, max_len, output_dim)
pos_emb = sinusoidal_position_embedding(batch_size, nums_head, max_len, output_dim, q.device)


# cos_pos,sin_pos: (bs, head, max_len, output_dim)
# 看rope公式可知,相邻cos,sin之间是相同的,所以复制一遍。如(1,2,3)变成(1,1,2,2,3,3)
cos_pos = pos_emb[..., 1::2].repeat_interleave(2, dim=-1) # 将奇数列信息抽取出来也就是cos 拿出来并复制
sin_pos = pos_emb[..., ::2].repeat_interleave(2, dim=-1) # 将偶数列信息抽取出来也就是sin 拿出来并复制


# q,k: (bs, head, max_len, output_dim)
q2 = torch.stack([-q[..., 1::2], q[..., ::2]], dim=-1)
q2 = q2.reshape(q.shape) # reshape后就是正负交替了


# 更新qw, *对应位置相乘
q = q * cos_pos + q2 * sin_pos


k2 = torch.stack([-k[..., 1::2], k[..., ::2]], dim=-1)
k2 = k2.reshape(k.shape)
# 更新kw, *对应位置相乘
k = k * cos_pos + k2 * sin_pos


return q, k


# %%
def attention(q, k, v, mask=None, dropout=None, use_RoPE=True):
# q.shape: (bs, head, seq_len, dk)
# k.shape: (bs, head, seq_len, dk)
# v.shape: (bs, head, seq_len, dk)


if use_RoPE:
q, k = RoPE(q, k)


d_k = k.size()[-1]


att_logits = torch.matmul(q, k.transpose(-2, -1)) # (bs, head, seq_len, seq_len)
att_logits /= math.sqrt(d_k)


if mask is not None:
att_logits = att_logits.masked_fill(mask == 0, -1e9) # mask掉为0的部分,设为无穷大


att_scores = F.softmax(att_logits, dim=-1) # (bs, head, seq_len, seq_len)


if dropout is not None:
att_scores = dropout(att_scores)


# (bs, head, seq_len, seq_len) * (bs, head, seq_len, dk) = (bs, head, seq_len, dk)
return torch.matmul(att_scores, v), att_scores


if __name__ == '__main__':
# (bs, head, seq_len, dk)
q = torch.randn((8, 12, 10, 32))
k = torch.randn((8, 12, 10, 32))
v = torch.randn((8, 12, 10, 32))


res, att_scores = attention(q, k, v, mask=None, dropout=None, use_RoPE=True)


# (bs, head, seq_len, dk), (bs, head, seq_len, seq_len)
print(res.shape, att_scores.shape)

面试题汇总

致谢

来自: 算法工程师面试常考手撕题(更新)