microgpt.py 是 Andrej Karpathy (@karpathy ) 写的一个教学项目,约 260 行纯 Python(不依赖 PyTorch,甚至不依赖 numpy)实现完整的 GPT 训练和推理。
我对原始代码进行了一些微调——封装了 AdamOptimizer、补了类型标注、给 Value 加了 _op 字段和 print_value_tree() 方便观察计算图。
下面是学习整理的源码级逐段拆解。
准备:自动求导实现 由于不依赖 PyTorch 和 Numpy,首先需要一个 Value 类来实现自动求导,为后续的训练和推理提供基础。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import mathclass Value : __slots__ = ("data" , "grad" , "_children" , "_local_grads" , "_op" ) def __init__ (self, data, children=( ), local_grads=( ), op=None ): self .data = float (data) self .grad = 0.0 self ._children = children self ._local_grads = local_grads self ._op = op def __add__ (self, other ): other = other if isinstance (other, Value) else Value(other) return Value(self .data + other.data, (self , other), (1 , 1 ), op="add" ) def __mul__ (self, other ): other = other if isinstance (other, Value) else Value(other) return Value(self .data * other.data, (self , other), (other.data, self .data), op="mul" ) def __pow__ (self, other ): assert not isinstance (other, Value) other = Value(other) return Value(self .data ** other.data, (self ,), (other.data * self .data ** (other.data - 1 ),), op="pow" ) def log (self ): return Value(math.log(self .data), (self ,), (1 / self .data,), op="log" ) def exp (self ): return Value(math.exp(self .data), (self ,), (math.exp(self .data),), op="exp" ) def relu (self ): return Value(max (0 , self .data), (self ,), (float (self .data > 0 ),), op="relu" ) def __neg__ (self ): return self * (-1 ) def __radd__ (self, other ): return self + other def __sub__ (self, other ): return self + (-other) def __rsub__ (self, other ): return other + (-self ) def __rmul__ (self, other ): return self * other def __truediv__ (self, other ): return self * other ** (-1 ) def __rtruediv__ (self, other ): return other * self ** (-1 ) def backward (self ): topo = [] visited = set () def build_topo (v ): if v not in visited: visited.add(v) for child in v._children: build_topo(child) topo.append(v) build_topo(self ) self .grad = 1 for v in reversed (topo): for child, local_grad in zip (v._children, v._local_grads): child.grad += local_grad * v.grad
对每个运算的重载做两件事:计算前向值,同时记录局部梯度。比如乘法 z = x * y,dz/dx = y,dz/dy = x,所以 local_grads = (other.data, self.data)。
backward() 是核心方法——先用 DFS 后序遍历把计算图线性化为拓扑序(保证子节点在其所有父节点之后),再逆序逐层将 v.grad * local_grad 累加到子节点。用 += 是因为同一节点可能被多条路径共享(比如同一参数在序列不同位置被使用),需要累加所有贡献。
给 Value 添加一个检查计算图结构的方法,方便观察运算节点之间的拓扑关系:
1 2 3 4 5 6 7 8 def print_value_tree (v: Value, indent=0 ): op_display = f"[{v._op} ]" if v._op else "[leaf]" prefix = " " * indent print ( f"{prefix} {op_display} Value(data={v.data:.4 f} , grad={v.grad:.4 f} ) (id={id (v)} )" ) for child in v._children: print_value_tree(child, indent + 1 )
构造一个简单表达式来验证:
1 2 3 4 5 6 a = Value(2.0 ) b = Value(3.0 ) c = a * b + a**2 print_value_tree(c)
输出
1 2 3 4 5 6 [add] Value(data=10.0000, grad=0.0000) (id=2024170557888) [mul] Value(data=6.0000, grad=0.0000) (id=2024170555648) [leaf] Value(data=2.0000, grad=0.0000) (id=2024170558048) [leaf] Value(data=3.0000, grad=0.0000) (id=2024170556528) [pow] Value(data=4.0000, grad=0.0000) (id=2024170557808) [leaf] Value(data=2.0000, grad=0.0000) (id=2024170558048)
运行 backward() 之后,每一个节点就会存储自己的梯度。
1 2 c.backward() print_value_tree(c)
输出
1 2 3 4 5 6 [add] Value(data=10.0000, grad=1.0000) (id=2024170557888) [mul] Value(data=6.0000, grad=1.0000) (id=2024170555648) [leaf] Value(data=2.0000, grad=7.0000) (id=2024170558048) [leaf] Value(data=3.0000, grad=2.0000) (id=2024170556528) [pow] Value(data=4.0000, grad=1.0000) (id=2024170557808) [leaf] Value(data=2.0000, grad=7.0000) (id=2024170558048)
可以用 Value 实现一个简单的线性回归:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import randomrandom.seed(42 ) true_w, true_b = 2.0 , 1.0 x_vals = [random.uniform(-3 , 3 ) for _ in range (50 )] y_vals = [(true_w * x + true_b) + random.gauss(0 , 0.5 ) for x in x_vals] w = Value(random.gauss(0 , 0.1 )) b = Value(0.0 ) for epoch in range (200 ): losses = [] for x, y_true in zip (x_vals, y_vals): y_pred = w * x + b loss = (y_pred - y_true) ** 2 losses.append(loss) total_loss = sum (losses) / len (losses) total_loss.backward() w.data -= 0.01 * w.grad b.data -= 0.01 * b.grad w.grad = 0 ; b.grad = 0 print (f"w={w.data:.4 f} (true={true_w} ), b={b.data:.4 f} (true={true_b} )" )
可以用 matplotlib 展示线性回归的结果
1 2 3 4 5 6 7 import matplotlib.pyplot as pltplt.scatter(x_vals, y_vals) x_line = sorted (x_vals) y_line = [w.data * x + b.data for x in x_line] plt.plot(x_line, y_line, color="red" ) plt.show()
准备数据与词表 我们使用的数据是一份人名列表(input.txt,约 30,000 行,每行如 emma、olivia),任务是用字符级语言模型学会生成”看起来像名字”的字符串。
准备数据并打乱顺序
1 2 3 file_path = "./input.txt" docs = [s for line in open (file_path) if (s := line.strip())] random.shuffle(docs)
生成词表
1 2 3 uchars = sorted (set ("" .join(docs))) BOS = len (uchars) vocab_size = len (uchars) + 1
其中特殊控制符号 BOS (Beginning of Sequence)同时承担起始 和终止 两个角色:训练时首尾各加一个;推理时采样到它就停止。不需要额外的 EOS token。
设置模型超参数 设置一些模型的超参数:控制网络深度、宽度以及注意力结构。
1 2 3 4 5 6 n_layer = 1 n_embd = 16 block_size = 16 n_head = 4 assert n_embd % n_head == 0 , "n_embd 必须能被 n_head 整除,否则无法均匀切分多头维度" head_dim = n_embd // n_head
生成随机矩阵的函数
1 2 3 def gen_random_matrix (nout: int , nin: int , std: float = 0.08 ) -> list [list [Value]]: """生成形状为 (nout, nin) 的参数矩阵,并使用高斯分布进行随机初始化。""" return [[Value(random.gauss(0 , std)) for _ in range (nin)] for _ in range (nout)]
使用字典集中保存全部参数矩阵,命名方式与常见深度学习框架中的 state_dict 类似。
1 2 3 4 5 6 7 8 state_dict: dict [str , list [list [Value]]] = { "wte" : gen_random_matrix(vocab_size, n_embd), "wpe" : gen_random_matrix(block_size, n_embd), "lm_head" : gen_random_matrix(vocab_size, n_embd), }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 for i in range (n_layer): state_dict[f"layer{i} .attn_wq" ] = gen_random_matrix(n_embd, n_embd) state_dict[f"layer{i} .attn_wk" ] = gen_random_matrix(n_embd, n_embd) state_dict[f"layer{i} .attn_wv" ] = gen_random_matrix(n_embd, n_embd) state_dict[f"layer{i} .attn_wo" ] = gen_random_matrix(n_embd, n_embd) state_dict[f"layer{i} .mlp_fc1" ] = gen_random_matrix(4 * n_embd, n_embd) state_dict[f"layer{i} .mlp_fc2" ] = gen_random_matrix(n_embd, 4 * n_embd) params = [p for mat in state_dict.values() for row in mat for p in row] print (f"num params: {len (params)} " )
参数用 Value 而非裸 float 存储,这样才能被 backward() 自动累积梯度。gen_random_matrix 用嵌套 list 表示二维矩阵,不依赖 numpy。MLP 采用 4x 扩展比、Q/K/V 三组独立投影都是标准 GPT-2 配置。注意这里没有 bias,RMSNorm 也没有可学习的仿射参数。
前向传播 需要的一些基本运算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def linear (x: list [Value], w: list [list [Value]] ) -> list [Value]: """ 线性变换,等价于矩阵乘向量 y = W x。 参数约定: - x 的长度为 nin - w 的形状为 (nout, nin) - 返回长度为 nout 的输出向量 """ return [sum (wi * xi for wi, xi in zip (wo, x)) for wo in w] def softmax (logits: list [Value] ) -> list [Value]: """把未归一化打分转换为概率分布,并通过减去最大值提升数值稳定性。""" max_val = max (val.data for val in logits) exps = [(val - max_val).exp() for val in logits] total = sum (exps) return [e / total for e in exps] def rmsnorm (x: list [Value] ) -> list [Value]: """RMSNorm:用均方根对向量进行缩放归一化,不引入可学习仿射参数。""" ms = sum (xi * xi for xi in x) / len (x) scale = (ms + 1e-5 ) ** (-0.5 ) return [xi * scale for xi in x]
核心的 gpt() 函数用于定义模型前向传播:输入当前 token 和历史 KV 缓存,输出下一个 token 的 logits。
结构整体沿用 GPT 风格的因果语言模型,但做了若干简化:1. 归一化层使用 RMSNorm;2. 全部线性层省略偏置;3. 前馈激活函数使用 ReLU。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def gpt (token_id: int , pos_id: int , keys, values ): tok_emb = state_dict["wte" ][token_id] pos_emb = state_dict["wpe" ][pos_id] x = [t + p for t, p in zip (tok_emb, pos_emb)] x = rmsnorm(x) for li in range (n_layer): x_residual = x x = rmsnorm(x) q = linear(x, state_dict[f"layer{li} .attn_wq" ]) k = linear(x, state_dict[f"layer{li} .attn_wk" ]) v = linear(x, state_dict[f"layer{li} .attn_wv" ]) keys[li].append(k) values[li].append(v) x_attn = [] for h in range (n_head): hs = h * head_dim q_h = q[hs : hs + head_dim] k_h = [ki[hs : hs + head_dim] for ki in keys[li]] v_h = [vi[hs : hs + head_dim] for vi in values[li]] attn_logits = [ sum (q_h[j] * k_h[t][j] for j in range (head_dim)) / head_dim**0.5 for t in range (len (k_h)) ] attn_weights = softmax(attn_logits) head_out = [ sum (attn_weights[t] * v_h[t][j] for t in range (len (v_h))) for j in range (head_dim) ] x_attn.extend(head_out) x = linear(x_attn, state_dict[f"layer{li} .attn_wo" ]) x = [a + b for a, b in zip (x, x_residual)] x_residual = x x = rmsnorm(x) x = linear(x, state_dict[f"layer{li} .mlp_fc1" ]) x = [xi.relu() for xi in x] x = linear(x, state_dict[f"layer{li} .mlp_fc2" ]) x = [a + b for a, b in zip (x, x_residual)] logits = linear(x, state_dict["lm_head" ]) return logits
Adam 优化器 Adam 优化器:维护一阶矩和二阶矩的指数滑动平均,用于自适应调整参数更新步长。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class AdamOptimizer : def __init__ (self, params, lr, beta1=0.85 , beta2=0.99 , eps=1e-8 ): self .params = list (params) self .lr = lr self .beta1 = beta1 self .beta2 = beta2 self .eps = eps self .m = [0.0 ] * len (self .params) self .v = [0.0 ] * len (self .params) self .t = 0 def step (self, lr_t=None ): if lr_t is None : lr_t = self .lr self .t += 1 for i, p in enumerate (self .params): grad_of_p = p.grad if grad_of_p is None : continue self .m[i] = self .beta1 * self .m[i] + (1 - self .beta1) * grad_of_p self .v[i] = self .beta2 * self .v[i] + (1 - self .beta2) * (grad_of_p**2 ) m_hat = self .m[i] / (1 - self .beta1**self .t) v_hat = self .v[i] / (1 - self .beta2**self .t) p.data -= lr_t * m_hat / (v_hat**0.5 + self .eps) p.grad = 0
初始化优化器与超参数
1 2 3 4 5 6 7 8 beta1 = 0.85 beta2 = 0.99 eps_adam = 1e-8 learning_rate = 0.01 optimizer = AdamOptimizer( params, lr=learning_rate, beta1=beta1, beta2=beta2, eps=eps_adam )
训练循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 num_steps = 500 for step in range (num_steps): doc = docs[step % len (docs)] tokens = [BOS] + [uchars.index(ch) for ch in doc] + [BOS] n = min (block_size, len (tokens) - 1 ) keys = [[] for _ in range (n_layer)] values = [[] for _ in range (n_layer)] loss_values: list [Value] = [] for pos_id in range (n): token_id = tokens[pos_id] target_id = tokens[pos_id + 1 ] logits = gpt(token_id, pos_id, keys, values) probs = softmax(logits) loss_t = -probs[target_id].log() loss_values.append(loss_t) loss = Value(1 / n) * sum (loss_values) loss.backward() lr_t = learning_rate * (1 - step / num_steps) optimizer.step(lr_t) print (f"step {step+1 :4d} / {num_steps:4d} | loss {loss.data:.4 f} " , end="\r" )
推理采样 推理过程:从 BOS 开始自回归采样,直到生成终止符或达到最大长度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 temperature = 0.5 print ("--- inference (new, hallucinated names) ---" )for sample_idx in range (20 ): keys = [[] for _ in range (n_layer)] values = [[] for _ in range (n_layer)] token_id = BOS sample = [] for pos_id in range (block_size): logits = gpt(token_id, pos_id, keys, values) probs = softmax([ll / temperature for ll in logits]) token_id = random.choices(range (vocab_size), weights=[p.data for p in probs])[0 ] if token_id == BOS: break sample.append(uchars[token_id]) print (f"sample {sample_idx+1 :2d} : {'' .join(sample)} " )