一、核心说明
以下代码完全手写,不依赖任何第三方 Transformer 库,仅用 PyTorch 实现,适配 Ubuntu 系统 + Jupyter Notebook,复制后直接粘贴到 Jupyter 单元格即可运行,无需修改任何内容。
二、完整手写 Transformer 代码(可直接复制)
python
运行
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
# 1. 位置编码(Transformer 核心组件,手写实现)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
# 初始化位置编码矩阵,shape: (max_len, d_model)
pe = torch.zeros(max_len, d_model)
# 生成位置序列,shape: (max_len, 1)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
# 计算位置编码的分母项,避免数值过大
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
# 偶数位置用正弦,奇数位置用余弦
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# 增加批次维度,适配模型输入,shape: (1, max_len, d_model)
pe = pe.unsqueeze(0)
# 注册为缓冲,不参与参数更新
self.register_buffer('pe', pe)
def forward(self, x):
# x: (batch_size, seq_len, d_model)
# 加入位置编码,确保序列长度不超过最大长度
x = x + self.pe[:, :x.size(1), :]
return x
# 2. 多头注意力机制(手写实现,核心模块)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, nhead):
super(MultiHeadAttention, self).__init__()
self.nhead = nhead # 注意力头数
self.d_model = d_model # 模型维度
self.d_k = d_model // nhead # 每个注意力头的维度(必须整除)
# 定义Q、K、V的线性投影层
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
# 输出线性层
self.w_o = nn.Linear(d_model, d_model)
def forward(self, x):
# x: (batch_size, seq_len, d_model)
batch_size, seq_len, _ = x.size()
# 1. 线性投影 + 拆分多头
q = self.w_q(x).view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2) # (batch, nhead, seq_len, d_k)
k = self.w_k(x).view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2)
v = self.w_v(x).view(batch_size, seq_len, self.nhead, self.d_k).transpose(1, 2)
# 2. 计算注意力分数(Q·K^T / sqrt(d_k))
attn_scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float))
# 3. 注意力权重归一化(softmax)
attn_weights = F.softmax(attn_scores, dim=-1)
# 4. 注意力加权求和(与V相乘)
attn_output = torch.matmul(attn_weights, v)
# 5. 拼接所有注意力头的输出,通过线性层
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
return self.w_o(attn_output)
# 3. Transformer 编码器块(手写实现)
class TransformerEncoderBlock(nn.Module):
def __init__(self, d_model, nhead):
super(TransformerEncoderBlock, self).__init__()
self.attn = MultiHeadAttention(d_model, nhead) # 多头注意力
self.norm1 = nn.LayerNorm(d_model) # 层归一化1
# 前馈网络(2层线性+ReLU激活)
self.feed_forward = nn.Sequential(
nn.Linear(d_model, d_model * 4),
nn.ReLU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model) # 层归一化2
def forward(self, x):
# 残差连接 + 层归一化(注意力模块)
x = self.norm1(x + self.attn(x))
# 残差连接 + 层归一化(前馈网络)
x = self.norm2(x + self.feed_forward(x))
return x
# 4. 完整 Transformer 编码器(手写实现,可直接训练)
class MyTransformer(nn.Module):
def __init__(self, vocab_size, d_model=64, nhead=2, num_layers=2):
super(MyTransformer, self).__init__()
self.d_model = d_model
# 词嵌入层(将词索引转为词向量)
self.embedding = nn.Embedding(vocab_size, d_model)
# 位置编码层
self.pos_encoding = PositionalEncoding(d_model)
# 多个编码器块拼接
self.encoder_blocks = nn.ModuleList([
TransformerEncoderBlock(d_model, nhead) for _ in range(num_layers)
])
# 输出层(映射到词典大小,用于预测下一个词)
self.fc_out = nn.Linear(d_model, vocab_size)
def forward(self, x):
# x: (batch_size, seq_len) → 词索引序列
# 1. 词嵌入 + 位置编码(缩放词嵌入,避免位置编码被掩盖)
x = self.embedding(x) * torch.sqrt(torch.tensor(self.d_model, dtype=torch.float))
x = self.pos_encoding(x)
# 2. 经过多个编码器块
for block in self.encoder_blocks:
x = block(x)
# 3. 输出层预测
output = self.fc_out(x)
return output
# 5. 初始化模型 + 训练(纯 CPU 运行,可直接跑)
if __name__ == "__main__":
# 配置参数(适配 CPU,避免显存不足)
vocab_size = 10000 # 词典大小(可修改)
d_model = 64 # 模型维度(越小,CPU 运行越快)
nhead = 2 # 注意力头数(2个即可,适配 CPU)
num_layers = 2 # 编码器块数量(2层,避免过拟合)
batch_size = 8 # 批次大小(CPU 建议 8 以内)
seq_len = 20 # 序列长度(每条数据的字符数)
epochs = 5 # 训练轮次(5轮即可看到效果)
lr = 1e-3 # 学习率
# 初始化模型、损失函数、优化器
model = MyTransformer(vocab_size, d_model, nhead, num_layers)
criterion = nn.CrossEntropyLoss() # 分类损失(适配词预测任务)
optimizer = optim.Adam(model.parameters(), lr=lr)
# 构造随机训练数据(模拟词索引序列,可替换为自己的文本数据)
train_data = torch.randint(0, vocab_size, (batch_size * 100, seq_len)) # 100个批次的数据
# 开始训练(纯 CPU 运行,无显卡也能跑)
print("开始训练手写 Transformer(纯 CPU)...")
for epoch in range(epochs):
total_loss = 0.0
# 分批训练
for i in range(0, len(train_data), batch_size):
batch = train_data[i:i+batch_size]
optimizer.zero_grad() # 清空梯度
output = model(batch) # 模型前向传播
# 计算损失(输出shape: (batch, seq_len, vocab_size),标签shape: (batch, seq_len))
loss = criterion(output.reshape(-1, vocab_size), batch.reshape(-1))
loss.backward() # 反向传播
optimizer.step() # 更新参数
total_loss += loss.item()
# 打印每轮训练损失
print(f"Epoch {epoch+1}/{epochs} | 平均损失: {total_loss / (len(train_data)//batch_size):.4f}")
print("✅ 手写 Transformer 训练完成!可直接修改数据继续训练")
三、运行步骤(适配 Ubuntu+Jupyter)
- 打开 Jupyter Notebook(网页版),新建 Python 3 单元格
- 完整复制上面的代码(不要手动调整缩进),粘贴到单元格中
- 点击「运行」按钮,即可开始训练(纯 CPU 运行,无需显卡)
四、关键注意事项
代码已完全规范缩进,复制时务必完整复制代码块,不要手动删除空格或换行,否则会报错
适配 Ubuntu 系统,无需安装 NVIDIA 驱动、无需 GPU,纯 CPU 即可正常运行
若提示 “模块缺失”,在 Jupyter 中新建单元格,粘贴以下代码运行:
bash
运行
!pip3 install torch --no-cache-dir可修改
vocab_size(词典大小)、seq_len(序列长度)、epochs(训练轮次)适配自己的数据若想替换为自己的文本数据,只需将
train_data替换为自己的词索引序列(需保证 shape 为 (batch_size, seq_len))