microgpt.py 是 Andrej Karpathy (@karpathy) 写的一个教学项目,约 260 行纯 Python(不依赖 PyTorch,甚至不依赖 numpy)实现完整的 GPT 训练和推理。

我对原始代码进行了一些微调——封装了 AdamOptimizer、补了类型标注、给 Value 加了 _op 字段和 print_value_tree() 方便观察计算图。

下面是学习整理的源码级逐段拆解。

准备:自动求导实现

由于不依赖 PyTorch 和 Numpy,首先需要一个 Value 类来实现自动求导,为后续的训练和推理提供基础。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import math

class Value:
__slots__ = ("data", "grad", "_children", "_local_grads", "_op")

def __init__(self, data, children=(), local_grads=(), op=None):
self.data = float(data) # 标量值(前向结果)
self.grad = 0.0 # 损失对该节点的梯度(反向累积)
self._children = children # 计算图的子节点
self._local_grads = local_grads # 该节点对各子节点的局部偏导
self._op = op # 运算类型标记,便于调试和可视化

def __add__(self, other):
other = other if isinstance(other, Value) else Value(other)
return Value(self.data + other.data, (self, other), (1, 1), op="add")

def __mul__(self, other):
other = other if isinstance(other, Value) else Value(other)
return Value(self.data * other.data, (self, other),
(other.data, self.data), op="mul")

def __pow__(self, other):
assert not isinstance(other, Value)
other = Value(other)
return Value(self.data ** other.data, (self,),
(other.data * self.data ** (other.data - 1),), op="pow")

def log(self):
return Value(math.log(self.data), (self,), (1 / self.data,), op="log")

def exp(self):
return Value(math.exp(self.data), (self,), (math.exp(self.data),), op="exp")

def relu(self):
return Value(max(0, self.data), (self,),
(float(self.data > 0),), op="relu")

def __neg__(self): return self * (-1)
def __radd__(self, other): return self + other
def __sub__(self, other): return self + (-other)
def __rsub__(self, other): return other + (-self)
def __rmul__(self, other): return self * other
def __truediv__(self, other): return self * other ** (-1)
def __rtruediv__(self, other): return other * self ** (-1)

def backward(self):
# DFS 后序遍历构造拓扑序
topo = []
visited = set()

def build_topo(v):
if v not in visited:
visited.add(v)
for child in v._children:
build_topo(child)
topo.append(v)

build_topo(self)
self.grad = 1
# 逆序传播梯度
for v in reversed(topo):
for child, local_grad in zip(v._children, v._local_grads):
child.grad += local_grad * v.grad

对每个运算的重载做两件事:计算前向值,同时记录局部梯度。比如乘法 z = x * ydz/dx = ydz/dy = x,所以 local_grads = (other.data, self.data)

backward() 是核心方法——先用 DFS 后序遍历把计算图线性化为拓扑序(保证子节点在其所有父节点之后),再逆序逐层将 v.grad * local_grad 累加到子节点。用 += 是因为同一节点可能被多条路径共享(比如同一参数在序列不同位置被使用),需要累加所有贡献。

给 Value 添加一个检查计算图结构的方法,方便观察运算节点之间的拓扑关系:

1
2
3
4
5
6
7
8
def print_value_tree(v: Value, indent=0):
op_display = f"[{v._op}]" if v._op else "[leaf]"
prefix = " " * indent
print(
f"{prefix}{op_display} Value(data={v.data:.4f}, grad={v.grad:.4f}) (id={id(v)})"
)
for child in v._children:
print_value_tree(child, indent + 1)

构造一个简单表达式来验证:

1
2
3
4
5
6
a = Value(2.0)
b = Value(3.0)

c = a * b + a**2

print_value_tree(c)

输出

1
2
3
4
5
6
[add] Value(data=10.0000, grad=0.0000) (id=2024170557888)
[mul] Value(data=6.0000, grad=0.0000) (id=2024170555648)
[leaf] Value(data=2.0000, grad=0.0000) (id=2024170558048)
[leaf] Value(data=3.0000, grad=0.0000) (id=2024170556528)
[pow] Value(data=4.0000, grad=0.0000) (id=2024170557808)
[leaf] Value(data=2.0000, grad=0.0000) (id=2024170558048)

运行 backward() 之后,每一个节点就会存储自己的梯度。

1
2
c.backward()
print_value_tree(c)

输出

1
2
3
4
5
6
[add] Value(data=10.0000, grad=1.0000) (id=2024170557888)
[mul] Value(data=6.0000, grad=1.0000) (id=2024170555648)
[leaf] Value(data=2.0000, grad=7.0000) (id=2024170558048)
[leaf] Value(data=3.0000, grad=2.0000) (id=2024170556528)
[pow] Value(data=4.0000, grad=1.0000) (id=2024170557808)
[leaf] Value(data=2.0000, grad=7.0000) (id=2024170558048)

可以用 Value 实现一个简单的线性回归:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import random
random.seed(42)

true_w, true_b = 2.0, 1.0
x_vals = [random.uniform(-3, 3) for _ in range(50)]
y_vals = [(true_w * x + true_b) + random.gauss(0, 0.5) for x in x_vals]

w = Value(random.gauss(0, 0.1))
b = Value(0.0)

for epoch in range(200):
losses = []
for x, y_true in zip(x_vals, y_vals):
y_pred = w * x + b
loss = (y_pred - y_true) ** 2
losses.append(loss)
total_loss = sum(losses) / len(losses)
total_loss.backward()
w.data -= 0.01 * w.grad
b.data -= 0.01 * b.grad
w.grad = 0; b.grad = 0

print(f"w={w.data:.4f} (true={true_w}), b={b.data:.4f} (true={true_b})")

可以用 matplotlib 展示线性回归的结果

1
2
3
4
5
6
7
import matplotlib.pyplot as plt

plt.scatter(x_vals, y_vals)
x_line = sorted(x_vals)
y_line = [w.data * x + b.data for x in x_line]
plt.plot(x_line, y_line, color="red")
plt.show()

准备数据与词表

我们使用的数据是一份人名列表(input.txt,约 30,000 行,每行如 emmaolivia),任务是用字符级语言模型学会生成”看起来像名字”的字符串。

准备数据并打乱顺序

1
2
3
file_path = "./input.txt"
docs = [s for line in open(file_path) if (s := line.strip())]
random.shuffle(docs)

生成词表

1
2
3
uchars = sorted(set("".join(docs)))   # 所有出现过的字符 ['a', 'b', ..., 'z']
BOS = len(uchars) # BOS 编号 = 26
vocab_size = len(uchars) + 1 # 词表大小 = 普通字符数 + 1 个特殊 token = 27

其中特殊控制符号 BOS (Beginning of Sequence)同时承担起始终止两个角色:训练时首尾各加一个;推理时采样到它就停止。不需要额外的 EOS token。

设置模型超参数

设置一些模型的超参数:控制网络深度、宽度以及注意力结构。

1
2
3
4
5
6
n_layer = 1  # Transformer 块的层数
n_embd = 16 # 模型隐藏维度,同时也是 token/position embedding 的维度
block_size = 16 # 最大上下文长度;当前数据集中最长名字长度为 15,因此 16 足够覆盖含 BOS 的建模窗口
n_head = 4 # 多头自注意力中的头数
assert n_embd % n_head == 0, "n_embd 必须能被 n_head 整除,否则无法均匀切分多头维度"
head_dim = n_embd // n_head # 单个注意力头的通道维度

生成随机矩阵的函数

1
2
3
def gen_random_matrix(nout: int, nin: int, std: float = 0.08) -> list[list[Value]]:
"""生成形状为 (nout, nin) 的参数矩阵,并使用高斯分布进行随机初始化。"""
return [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)]

使用字典集中保存全部参数矩阵,命名方式与常见深度学习框架中的 state_dict 类似。

1
2
3
4
5
6
7
8
state_dict: dict[str, list[list[Value]]] = {
# token embedding 表:把离散 token id 映射为连续稠密向量表示。
"wte": gen_random_matrix(vocab_size, n_embd),
# position embedding 表:为不同位置注入位置信息,使模型感知顺序。
"wpe": gen_random_matrix(block_size, n_embd),
# 语言模型输出头:把隐藏状态投影到词表维度,得到下一个 token 的 logits。
"lm_head": gen_random_matrix(vocab_size, n_embd),
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
for i in range(n_layer):
# Q/K/V 三组投影矩阵分别生成查询、键和值,用于自注意力的相似度计算与信息聚合。
state_dict[f"layer{i}.attn_wq"] = gen_random_matrix(n_embd, n_embd) # Q
state_dict[f"layer{i}.attn_wk"] = gen_random_matrix(n_embd, n_embd) # K
state_dict[f"layer{i}.attn_wv"] = gen_random_matrix(n_embd, n_embd) # V
# 输出投影矩阵:把多头拼接结果重新映射回模型隐藏维度。
state_dict[f"layer{i}.attn_wo"] = gen_random_matrix(n_embd, n_embd) # 多头输出
# 前馈网络采用典型的 4 倍扩展比:先升维,再经过非线性激活,最后降回原维度。
state_dict[f"layer{i}.mlp_fc1"] = gen_random_matrix(4 * n_embd, n_embd) # FFN ↑
state_dict[f"layer{i}.mlp_fc2"] = gen_random_matrix(n_embd, 4 * n_embd) # FFN ↓

# 把所有矩阵展开为 list(Value)(浅拷贝,用来统一更新权重)
params = [p for mat in state_dict.values() for row in mat for p in row]
print(f"num params: {len(params)}")

参数用 Value 而非裸 float 存储,这样才能被 backward() 自动累积梯度。gen_random_matrix 用嵌套 list 表示二维矩阵,不依赖 numpy。MLP 采用 4x 扩展比、Q/K/V 三组独立投影都是标准 GPT-2 配置。注意这里没有 bias,RMSNorm 也没有可学习的仿射参数。

前向传播

需要的一些基本运算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def linear(x: list[Value], w: list[list[Value]]) -> list[Value]:
"""
线性变换,等价于矩阵乘向量 y = W x。

参数约定:
- x 的长度为 nin
- w 的形状为 (nout, nin)
- 返回长度为 nout 的输出向量
"""
return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w]


def softmax(logits: list[Value]) -> list[Value]:
"""把未归一化打分转换为概率分布,并通过减去最大值提升数值稳定性。"""
max_val = max(val.data for val in logits)
exps = [(val - max_val).exp() for val in logits]
total = sum(exps)
return [e / total for e in exps]


def rmsnorm(x: list[Value]) -> list[Value]:
"""RMSNorm:用均方根对向量进行缩放归一化,不引入可学习仿射参数。"""
ms = sum(xi * xi for xi in x) / len(x)
scale = (ms + 1e-5) ** (-0.5)
return [xi * scale for xi in x]

核心的 gpt() 函数用于定义模型前向传播:输入当前 token 和历史 KV 缓存,输出下一个 token 的 logits。

结构整体沿用 GPT 风格的因果语言模型,但做了若干简化:1. 归一化层使用 RMSNorm;2. 全部线性层省略偏置;3. 前馈激活函数使用 ReLU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def gpt(token_id: int, pos_id: int, keys, values):
tok_emb = state_dict["wte"][token_id] # 当前 token 的语义嵌入向量
pos_emb = state_dict["wpe"][pos_id] # 当前位置的位置嵌入向量
x = [t + p for t, p in zip(tok_emb, pos_emb)] # 把语义信息与位置信息相加,形成输入隐藏状态
x = rmsnorm(x)
# 这里的预归一化并不冗余,它会通过后续残差路径参与梯度传播。

for li in range(n_layer):
# 1)多头自注意力子层
x_residual = x

x = rmsnorm(x)
q = linear(x, state_dict[f"layer{li}.attn_wq"])
k = linear(x, state_dict[f"layer{li}.attn_wk"])
v = linear(x, state_dict[f"layer{li}.attn_wv"])

# 将当前时刻的 K/V 追加到缓存中;由于是自回归生成,缓存自然只包含当前位置及其之前的时刻。
keys[li].append(k)
values[li].append(v)

x_attn = []
for h in range(n_head):
hs = h * head_dim
q_h = q[hs : hs + head_dim]
k_h = [ki[hs : hs + head_dim] for ki in keys[li]]
v_h = [vi[hs : hs + head_dim] for vi in values[li]]
# 缩放点积注意力:先计算 q 与历史 k 的相似度,再除以 sqrt(head_dim) 稳定方差。
attn_logits = [
sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5
for t in range(len(k_h))
]
attn_weights = softmax(attn_logits)
head_out = [
sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h)))
for j in range(head_dim)
]
x_attn.extend(head_out) # 将各注意力头输出按通道维拼接
x = linear(x_attn, state_dict[f"layer{li}.attn_wo"])
x = [a + b for a, b in zip(x, x_residual)] # 残差连接

# 2)前馈网络子层
x_residual = x
x = rmsnorm(x)
x = linear(x, state_dict[f"layer{li}.mlp_fc1"])
x = [xi.relu() for xi in x]
x = linear(x, state_dict[f"layer{li}.mlp_fc2"])
x = [a + b for a, b in zip(x, x_residual)] # 残差连接

logits = linear(x, state_dict["lm_head"])
return logits

Adam 优化器

Adam 优化器:维护一阶矩和二阶矩的指数滑动平均,用于自适应调整参数更新步长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class AdamOptimizer:
def __init__(self, params, lr, beta1=0.85, beta2=0.99, eps=1e-8):
self.params = list(params)
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.m = [0.0] * len(self.params)
self.v = [0.0] * len(self.params)
self.t = 0

def step(self, lr_t=None):
if lr_t is None:
lr_t = self.lr
self.t += 1
# 逐参数执行 Adam 更新,并在更新后把梯度清零,避免跨 step 累积。
for i, p in enumerate(self.params):
grad_of_p = p.grad
if grad_of_p is None:
continue

self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * grad_of_p
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (grad_of_p**2)

m_hat = self.m[i] / (1 - self.beta1**self.t)
v_hat = self.v[i] / (1 - self.beta2**self.t)

p.data -= lr_t * m_hat / (v_hat**0.5 + self.eps)
p.grad = 0

初始化优化器与超参数

1
2
3
4
5
6
7
8
beta1 = 0.85
beta2 = 0.99
eps_adam = 1e-8
learning_rate = 0.01

optimizer = AdamOptimizer(
params, lr=learning_rate, beta1=beta1, beta2=beta2, eps=eps_adam
)

训练循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
num_steps = 500  # 训练总步数


for step in range(num_steps):
# 这里采用循环顺序采样;当 step 超过样本数时,通过取模回到数据集开头。
doc = docs[step % len(docs)]

# 进行字符级编码,并在首尾各补一个 BOS 标记,分别表示序列开始与结束。
tokens = [BOS] + [uchars.index(ch) for ch in doc] + [BOS]
n = min(block_size, len(tokens) - 1)

# 为每一层初始化 KV 缓存,用于保存当前样本前缀的历史上下文表示。
keys = [[] for _ in range(n_layer)]
values = [[] for _ in range(n_layer)]

loss_values: list[Value] = []
for pos_id in range(n):
token_id = tokens[pos_id] # 当前时刻输入给模型的 token
target_id = tokens[pos_id + 1] # 监督信号:下一个位置的真实 token
logits = gpt(token_id, pos_id, keys, values)
probs = softmax(logits)
loss_t = -probs[target_id].log() # 单个时间步的负对数似然,即交叉熵损失的逐 token 形式
loss_values.append(loss_t)

loss = Value(1 / n) * sum(loss_values) # 对序列内各时间步损失求平均,得到该样本的标量训练目标

loss.backward() # 在动态计算图上执行反向传播,累计所有参数的梯度
lr_t = learning_rate * (1 - step / num_steps) # 使用线性衰减学习率,训练后期减小更新幅度
optimizer.step(lr_t) # 根据当前梯度更新参数,并清空参数梯度

print(f"step {step+1:4d} / {num_steps:4d} | loss {loss.data:.4f}", end="\r")

推理采样

推理过程:从 BOS 开始自回归采样,直到生成终止符或达到最大长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
temperature = 0.5  # 温度系数越小,概率分布越集中,采样结果越保守

print("--- inference (new, hallucinated names) ---")
for sample_idx in range(20):
keys = [[] for _ in range(n_layer)]
values = [[] for _ in range(n_layer)]
token_id = BOS
sample = []
for pos_id in range(block_size):
logits = gpt(token_id, pos_id, keys, values)
# 通过温度缩放调节 logits 的离散程度,再输入 softmax 得到采样概率。
probs = softmax([ll / temperature for ll in logits])
# 按概率分布进行随机采样,而不是直接取 argmax,以保证生成多样化的结果。
token_id = random.choices(range(vocab_size), weights=[p.data for p in probs])[0]

if token_id == BOS:
break # 采样到终止符后结束当前样本生成

sample.append(uchars[token_id]) # 把预测字符写入输出序列,并继续下一时刻的自回归解码

print(f"sample {sample_idx+1:2d}: {''.join(sample)}")