大模型预训练概述
预训练是大模型经历的第一个训练阶段,通过在大量语料上进行训练获得基础的世界知识(背书)
主要从预训练定义、数据、流程、继续预训练与预训练评估几部分讲解大模型的预训练
Pre-training 定义
预训练目标:大模型通过在大规模数据集上学习,使模型能捕捉数据的通用特征和模式,从而提升
大模型在各种任务上的性能和泛化能力,让模型掌握通用能力。
比如在预训练期间,模型会接触大量未标记的文本数据,例如书籍、文章和网站,目标是捕获文本语料库中存在的底层模式、结构和语义知识
- 数据稀缺性:收集并标注专业领域的数据是耗时费力的任务。预训练先从大量无标注数据学习通用特征,减少对标注数据的依赖。
- 先验只是:模型从随机初始化的参数进行学习。对很多任务来说,具有一定的先验知识,可以加快大模型在新任务的训练。
大模型采用的预训练方法是大量无标签语料进行自监督学习(区别于聚类等无监督学习,NTP利用下一个token作为标签进行自监督学习)
训练目标为Next Token Prediction loss:
\[L = -\sum_{n=1}^N\log p(x_n|x_1,x_2,...,x_{x-1};\theta)\]
基本思想:模型根据上下文预测下一个最可能的单词,Next Token Prediction loss通过计算每个预测的对数似然估计,帮助模型进行更准确的下一个token预测
为什么计算损失使用对数?
- 数学简化:将概率连乘转换为对数相加,避免数值下溢。
- 凸函数性质:对数函数将非凸优化问题转换为凸优化问题,便于求解。
凸函数定义:
函数\(f(x)\)是凸函数,当且仅当对于任意\(x_1,x_2\)和\(\lambda \in [0,1]\),有:
\[f(\lambda x_1 +(1-\lambda)x_2)\geq \lambda f(x_1)+(1-\lambda)f(x_2)\]
直观理解:函数图像上任意两点的连线始终在函数曲线的上方。
非凸函数存在多个局部最优解,优化算法容易陷入局部最优,无法找到全局最优解。
- 概率解释:对数似然可以解释为编码长度,最小化损失等价于最小化编码真实标签所需的比特数。
香农在信息论中提出:最优编码长度与事件发生的概率成反比,概率越低的事件需要越长的编码来表示。而对数似然损失的最小化过程,本质上等价于寻找一种使真实标签编码长度最短的概率模型。
香农提出,对于一个概率为 \(p(x)\) 的事件 x,其最优二进制编码长度(单位:比特)
可定义为:\(\text{编码长度} = -\log_2 p(x)\)
原理:高概率事件(如 “明天太阳升起”)用短编码(如 0);低概率事件(如 “明天地震”)用长编码(如 1010)。
示例:
若事件 A 的概率 \(p(A) = 1/2\),则编码长度为 \(-\log_2(1/2) = 1\) 比特(如用 0 表示);
若事件 B 的概率 \(p(B) = 1/8\),则编码长度为 \(-\log_2(1/8) = 3\) 比特(如用 110 表示)。
预训练阶段
数据与tokenization
来自网络 (CommonCrawl、OpenWebText)、书籍 (BookCorpus) 、维基百科等,可能数百 GB 乃至 TB 级;
- 过滤:去重、去低质量、遵守道德标准
- tokenization: BPE/WordPiece/SentencePiece 等生成 30k到100k 规模词汇表
训练目标(损失函数)
自回归语言模型
\[L = -\sum_{n=1}^N\log p(x_n|x_1,x_2,...,x_{x-1};\theta)\]
我们证明上面的 loss function 是给定一个token序列的交叉熵
给出预训练的sequence \(w_1,w_2,...,w_n\)。那么生成这个句子的概率为\(P(w_1,w_2,...,w_n)\),可以通过条件概率公式分解为:
\(P(w_1,w_2,...,w_n)=p(w_1)p(w_2|w_1)p(w_3|w_1,w_2)...p(w_n|w_1,w_2,...,w_{n-1})=\prod_{i=1}^np(w_i|w_1,w_2,...,w_{i-1})\)
这里条件概率\(p(w_i|w_1,w_2,...,w_{i-1})\)既可以用N-gram语言模型,也可以用Transformer建模
交叉熵损失
\[H(LM)=E_{p(b_N)}[-\log_2 q(w_N|b_{N-1})]=-\sum_{b_N}p(b_N)\log_2 q(w_N|b_{N-1})=-\sum_{b_N}p(w_N|b_{N-1})p(b_{N-1}\log_2 q(w_N|b_{N-1}))\approx -\sum_{b_N}1\cdot \frac{1}{m}\log_2 q(w_N|b_{N-1})=-\frac{1}{m}\sum_{b_N}\log_2 q(w_N|b_{N-1})=-\frac{1}{m}\sum_{i=1}^m\log_2 q(w_i|w_1,w_2,...,w_{i-1})\]
这里\(\approx\)是重要一步
困惑度损失
\[Perplexity(LM)=(\prod_{i=1}^m\frac{1}{q(w_i|w_1,w_2,...,w_{i-1})})^{\frac{1}{m}}\]
当语言模型生成每一个token越不确定,那么困惑度越大,模型性能越差
接着交叉熵损失推导:
\[-\frac{1}{m}\log_2 q(w_i|w_1,w_2,...,w_{i-1})=\log_2[(\prod_{i=1}^m\frac{1}{q(w_i|w_1,w_2,...,w_{i-1})})^{\frac{1}{m}}]=\log_2(Perplexity(LM))\]
Perplexity求一个log就是交叉熵(推导精彩)
实际计算中如何计算loss
- Pre-training阶段和SFT阶段的loss计算差异
Pre-training阶段还是SFT阶段,loss函数都是一样的,只是计算的方式存在差异.
PreTraining阶段计算的是整段输入文本的loss,而SFT阶段计算的是response部分的loss。
Pre-training概念(2)
实际计算中如何计算loss
LLM的pretrain是自监督学习,所以语料文本之外无需标注,但在训练的时候需要label,.
处理pretrain阶段的数据
- 数据组成形式:
- 输入:\(X_1\) \(X_2\) \(X_3\)
- labels:\(X_1\) \(X_2\) \(X_3\)
- 典型的Decoder架构的数据训练方式;
即根据[]预测\(X_1\),根据[,X_1]预测\(X_2\),...
掩码语言模型(MLM/BERT-style)
\[L_{MLM}(\theta)=E_x[-\sum_{i\in M}\log P_\theta(x_i|\tilde{x})]\]
其中\(M\)是被mask的索引集合,\(\tilde{x}\)表示在\(M\)处替换为[MASK]
详情可见这篇博客Encoder-only 预训练任务
去噪自编码(Seq2Seq)
如 BART/T5 采用加噪(随机删除/打乱)的输入序列,由编码器读入,解码器输出去噪效果
我会在LLM/LLM_Architecture里详细介绍
算法与并行策略
两阶段优化
-
预训练:\(\theta^* \leftarrow \arg \min_{\theta}L_{pre}(\theta,D_{pretrain})\)
-
微调: \(\phi^* = \arg \min_\phi \sum_{(x,y)\in D_{down}}G(x,y;\phi)\) with initial \(\phi \approx \theta^*\)
prompt 工程
无需改动参数,通过上下文,让模型在生成时自适应:
\[\hat{y}= \arg \max_y P_{\theta^*}(y|x,prompt)\]
下面的代码片段来自LLaMA-Efficient-Tuning代码库,其中preprocess.py文件里面定义了不同阶段数据处理的方式。
(1). 根据不同训练阶段对数据集进行预处理
| def preprocess_dataset(
# dataset:输入数据集,支持常规或流式数据集
dataset: Union["Dataset", "IterableDataset"],
# tokenizer:分词器,用于将文本分成token
tokenizer: "PreTrainedTokenizer",
# args 是 arguments(参数)的缩写,通常表示一组配置参数
# DataArguments:配置数据加载和预处理参数
data_args: "DataArguments",
# Seq2SeqTrainingArguments:序列到序列模型(如 T5、BART)的训练参数配置类
training_args: "Seq2SeqTrainingArguments", # 使用引号,Python 会在运行时解析类型,允许你提前使用未导入的类型,避免循环依赖问题
# stage:训练阶段,可选值为 "pt"(预训练)、"sft"(监督微调)、"rm"(奖励模型)、"ppo"(策略优化)
stage: Literal["pt", "sft", "rm", "ppo"]
) -> Union["Dataset", "IterableDataset"]:
|
(2). 数据结构准备
| column_names = list(next(iter(dataset)).keys())
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
|
- column_names = list(next(iter(dataset)).keys())
功能
获取数据集的列名(即字段名称),通常用于了解数据集的结构。
执行步骤
iter(dataset):将数据集转换为迭代器,以便逐行访问数据。
next(iter(dataset)):从迭代器中取出第一个样本(即数据集的第一行)。
next(...).keys():获取该样本的所有键(key),即列名。
list(...):将键转换为列表格式,便于后续操作。
示例
假设数据集的第一行为 {'text': 'Hello', 'label': 1},则 column_names 为 ['text', 'label']。
- template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
功能
获取模板并修复分词器,确保输入文本能正确被模型处理
参数解析
data_args.template:配置文件中定义的模板字符串,用于格式化输入数据
例如:"问题:{text}\n答案:{label}"
get_template_and_fix_tokenizer操作:
解析模板:提取模板中的特殊标记(如 {text}、{label}),用于后续数据填充。
修复分词器: 为模板中的特殊符号(如 \n、自定义分隔符)添加特殊 token。确保分词器能正确处理模板中的占位符(如 {text})
返回处理后的模板:可能是一个函数或对象,用于后续数据格式化
示例
| # 假设dataset是一个包含文本和标签的数据集
dataset = [
{'text': '今天天气如何', 'label': '晴天'},
{'text': '明天有雨吗', 'label': '有雨'}
]
# 步骤1:获取列名
column_names = list(next(iter(dataset)).keys())
# 输出:['text', 'label']
# 步骤2:获取模板并修复分词器
# 假设template配置为 "问题:{text}\n答案:{label}"
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
# 后续使用template格式化数据
formatted_text = template(dataset[0])
# 输出:"问题:今天天气如何\n答案:晴天"
|
(3).数据生成器
| def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
for i in range(len(examples["prompt"])):
query = examples["prompt"][i]
if "query" in examples and examples["query"][i]:
query = query + "\n" + examples["query"][i]
response = examples["response"][i] if "response" in examples else None
history = examples["history"][i] if "history" in examples else None
system = examples["system"][i] if "system" in examples else None
yield query, response, history, system
|
示例
示例输入数据
假设输入的 examples
是一个包含4个字段的字典,对应2个样本:
| examples = {
"prompt": ["你是谁", "今天星期几"], # 必选字段,每个样本的基础提示
"query": ["请用简洁语言回答", ""], # 可选字段,附加查询(第二个样本为空)
"response": ["我是AI助手豆包", "今天星期五"], # 响应(标签)
"history": [None, [("昨天星期几", "昨天星期四")]], # 对话历史(第二个样本有历史记录)
"system": ["通用模式", None] # 系统提示(第二个样本无系统提示)
}
|
逐行代码执行过程与输出
** 函数定义与参数**
| def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
|
-
输入:
examples
是上述字典,包含2个样本的各字段列表。
-
输出:生成器,逐个返回处理后的样本元组。
** 循环遍历样本(i=0
时处理第一个样本)
| for i in range(len(examples["prompt"])): # len(examples["prompt"]) = 2,循环i=0和i=1
|
- **当前 i=0
:处理第一个样本(prompt
为“你是谁”)。
** 提取并处理 query
**
| query = examples["prompt"][i] # examples["prompt"][0] = "你是谁"
if "query" in examples and examples["query"][i]: # examples["query"][0] = "请用简洁语言回答"(非空)
query = query + "\n" + examples["query"][i] # 拼接后:"你是谁\n请用简洁语言回答"
|
-
处理后 query
:
"你是谁\n请用简洁语言回答"
。
** 提取可选字段(response
, history
, system
)
| response = examples["response"][i] if "response" in examples else None # examples["response"][0] = "我是AI助手豆包"
history = examples["history"][i] if "history" in examples else None # examples["history"][0] = None
system = examples["system"][i] if "system" in examples else None # examples["system"][0] = "通用模式"
|
- **response
:"我是AI助手豆包"
- history
:None
- system
:"通用模式"
生成第一个样本
| yield query, response, history, system
|
-
输出元组:
("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式")
循环再次执行(i=1
时处理第二个样本)
| # i=1,处理第二个样本(prompt为"今天星期几")
query = examples["prompt"][1] # "今天星期几"
if "query" in examples and examples["query"][1]: # examples["query"][1] = ""(空字符串,条件不成立)
# 不执行拼接
response = examples["response"][1] # "今天星期五"
history = examples["history"][1] # [("昨天星期几", "昨天星期四")]
system = examples["system"][1] # None
yield query, response, history, system
|
- 处理后
query
:保持原值 "今天星期几"
。
- 输出元组:
("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)
完整输出结果
通过生成器迭代输出的两个样本为:
-
("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式")
-
("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)
代码作用总结
代码行 |
作用描述 |
len(examples["prompt"]) |
确定样本总数(通过必选字段 prompt 的长度)。 |
query = examples["prompt"][i] |
提取每个样本的基础提示文本。 |
拼接 query |
若存在附加查询字段 query 且非空,将其与基础提示用换行符拼接,丰富输入。 |
条件提取可选字段 |
处理 response 、history 、system 字段,不存在时设为 None 。 |
yield 生成元组 |
返回处理后的样本,包含输入查询、响应、历史对话和系统提示。 |
(4). 预训练数据处理(PT)
| def preprocess_pretrain_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
total_length = (len(concatenated_examples[list(concatenated_examples.keys())[0]]) // block_size) * block_size
result = {
k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
return result
|
示例输入数据
假设输入的examples
包含2个样本,tokenizer
将文本转换为token ID,block_size=4
:
| examples = {
"prompt": ["Hello world", "How are you today"]
}
# 假设tokenizer的分词结果(简化示例):
# "Hello world" → [101, 102]
# "How are you today" → [103, 104, 105, 106]
|
逐行代码执行过程与输出
分词处理
| tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
|
-
分词结果(假设):
| tokenized_examples = {
"input_ids": [[101, 102], [103, 104, 105, 106]],
"attention_mask": [[1, 1], [1, 1, 1, 1]]
}
|
拼接所有样本的tokens
| concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
|
-
拼接后:
| concatenated_examples = {
"input_ids": [101, 102, 103, 104, 105, 106],
"attention_mask": [1, 1, 1, 1, 1, 1]
}
|
计算总长度并截断为block_size的整数倍
| total_length = (len(concatenated_examples["input_ids"]) // block_size) * block_size
# len=6,block_size=4 → total_length = (6//4)*4 = 4
|
-
结果:
total_length = 4
(丢弃剩余的2个token)。
按block_size切分tokens
| result = {
k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
|
-
切分后:
| result = {
"input_ids": [[101, 102, 103, 104]], # 丢弃[105, 106]
"attention_mask": [[1, 1, 1, 1]] # 丢弃[1, 1]
}
|
完整输出结果
| {
"input_ids": [[101, 102, 103, 104]],
"attention_mask": [[1, 1, 1, 1]]
}
|
代码作用总结
代码行 |
作用描述 |
tokenizer(...) |
将所有样本的文本转换为token ID(不添加特殊token)。 |
chain(*tokenized_examples[k]) |
将所有样本的tokens拼接成一个长列表(用于连续训练)。 |
total_length 计算 |
截断tokens总长度为block_size 的整数倍,确保能均匀切分。 |
按块切分 |
将长tokens列表切分为固定长度的块(每个块长度=block_size),丢弃不足的部分。 |
(5)监督微调数据处理(SFT)
| def preprocess_supervised_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
max_length = data_args.max_source_length + data_args.max_target_length
for query, response, history, system in construct_example(examples):
input_ids, labels = [], []
for source_ids, target_ids in template.encode_multiturn(...):
if len(input_ids) + len(source_ids) + len(target_ids) > max_length:
break
input_ids += source_ids + target_ids
labels += [IGNORE_INDEX] * len(source_ids) + target_ids
model_inputs["input_ids"].append(input_ids)
model_inputs["attention_mask"].append([1] * len(input_ids))
model_inputs["labels"].append(labels)
return model_inputs
|
示例输入与配置
假设输入的examples
包含1个对话样本,配置如下:
| examples = {
"prompt": ["今天天气如何?"],
"response": ["晴天,适合户外活动。"]
}
# 配置参数
data_args.max_source_length = 10 # 源文本最大长度
data_args.max_target_length = 10 # 目标文本最大长度
max_length = 20 # 总最大长度
# 模板编码结果(简化示例)
# 假设template.encode_multiturn(...)返回:
# [
# (source_ids=[101, 102], target_ids=[201, 202, 203]), # 表示"今天天气如何?" → "晴天,适合户外活动。"
# ]
|
逐行代码执行过程与输出
初始化输出结构
| model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
|
-
初始状态:
| model_inputs = {
"input_ids": [],
"attention_mask": [],
"labels": []
}
|
遍历construct_example生成的样本
| for query, response, history, system in construct_example(examples):
# query = "今天天气如何?"
# response = "晴天,适合户外活动。"
input_ids, labels = [], []
|
-
初始化样本的input_ids
和labels
:
| input_ids = []
labels = []
|
处理多轮对话(本例简化为单轮)
| for source_ids, target_ids in template.encode_multiturn(...):
# source_ids = [101, 102] # 对应"今天天气如何?"
# target_ids = [201, 202, 203] # 对应"晴天,适合户外活动。"
if len(input_ids) + len(source_ids) + len(target_ids) > max_length:
break # 长度未超过20,不执行
input_ids += source_ids + target_ids # [101, 102, 201, 202, 203]
labels += [IGNORE_INDEX] * len(source_ids) + target_ids # [-100, -100, 201, 202, 203]
|
-
处理后:
| input_ids = [101, 102, 201, 202, 203]
labels = [-100, -100, 201, 202, 203] # -100表示忽略,只预测target部分
|
更新model_inputs
| model_inputs["input_ids"].append(input_ids)
model_inputs["attention_mask"].append([1] * len(input_ids))
model_inputs["labels"].append(labels)
|
-
最终输出:
| model_inputs = {
"input_ids": [[101, 102, 201, 202, 203]],
"attention_mask": [[1, 1, 1, 1, 1]],
"labels": [[-100, -100, 201, 202, 203]]
}
|
完整输出结果
| {
"input_ids": [[101, 102, 201, 202, 203]],
"attention_mask": [[1, 1, 1, 1, 1]],
"labels": [[-100, -100, 201, 202, 203]]
}
|
代码作用总结
代码行 |
作用描述 |
model_inputs 初始化 |
创建用于存储输入、掩码和标签的字典。 |
construct_example |
生成(query, response, history, system)元组,处理对话结构。 |
template.encode_multiturn |
将对话转换为token ID,区分source(用户输入)和target(模型输出)。 |
input_ids 拼接 |
将source和target的token ID按顺序拼接。 |
labels 设置 |
source部分设为IGNORE_INDEX (如-100),仅预测target部分。 |
attention_mask 生成 |
全1掩码,表示所有token都参与计算。 |
此函数适用于监督微调(SFT)场景,将对话数据转换为模型可接受的格式,通过labels
指定需要预测的部分。
(6).奖励模型数据处理(RM)
| def preprocess_pairwise_dataset(examples):
model_inputs = {"prompt_ids": [], "chosen_ids": [], "rejected_ids": []}
for query, response, history, system in construct_example(examples):
prompt_ids, chosen_ids = template.encode_oneturn(query, response[0], ...)
_, rejected_ids = template.encode_oneturn(query, response[1], ...)
model_inputs["prompt_ids"].append(prompt_ids)
model_inputs["chosen_ids"].append(chosen_ids)
model_inputs["rejected_ids"].append(rejected_ids)
return model_inputs
|
示例输入与配置
假设输入的examples
包含1个样本,每个样本有1个查询和2个候选回复(分别为更优和更差的回复):
| examples = {
"prompt": ["推荐一部科幻电影"],
"response": [
["《星际穿越》,科学与情感的完美结合", "《蜘蛛侠》,适合全家观看"] # response[0]=更优回复,response[1]=更差回复
]
}
# 假设template.encode_oneturn的编码结果(简化示例):
# template.encode_oneturn("推荐一部科幻电影", "《星际穿越》...") → (prompt_ids=[101,102], chosen_ids=[201,202])
# template.encode_oneturn("推荐一部科幻电影", "《蜘蛛侠》...") → (prompt_ids=[101,102], rejected_ids=[301,302])
|
逐行代码执行过程与输出
1. 初始化输出结构
| model_inputs = {"prompt_ids": [], "chosen_ids": [], "rejected_ids": []}
|
-
初始状态:
| model_inputs = {
"prompt_ids": [],
"chosen_ids": [],
"rejected_ids": []
}
|
遍历construct_example生成的样本
| for query, response, history, system in construct_example(examples):
# query = "推荐一部科幻电影"
# response = ["《星际穿越》...", "《蜘蛛侠》..."] # 包含两个回复的列表
|
编码更优回复
| prompt_ids, chosen_ids = template.encode_oneturn(query, response[0], ...)
# 假设编码结果:
# prompt_ids = [101, 102] # 对应"推荐一部科幻电影"
# chosen_ids = [201, 202] # 对应"《星际穿越》,科学与情感的完美结合"
|
编码更差回复
| _, rejected_ids = template.encode_oneturn(query, response[1], ...)
# 假设编码结果:
# rejected_ids = [301, 302] # 对应"《蜘蛛侠》,适合全家观看"
|
更新model_inputs
| model_inputs["prompt_ids"].append(prompt_ids)
model_inputs["chosen_ids"].append(chosen_ids)
model_inputs["rejected_ids"].append(rejected_ids)
|
-
最终输出:
| model_inputs = {
"prompt_ids": [[101, 102]], # 查询的token ID
"chosen_ids": [[201, 202]], # 更优回复的token ID
"rejected_ids": [[301, 302]] # 更差回复的token ID
}
|
完整输出结果
| {
"prompt_ids": [[101, 102]],
"chosen_ids": [[201, 202]],
"rejected_ids": [[301, 302]]
}
|
代码作用总结
代码行 |
作用描述 |
model_inputs 初始化 |
创建用于存储查询、更优回复、更差回复的字典。 |
construct_example |
生成(query, response, history, system)元组,处理对话结构。 |
template.encode_oneturn |
将查询和回复编码为token ID,分别处理更优和更差的回复。 |
结果收集 |
将编码后的查询、更优回复、更差回复分别存入对应的列表。 |
此函数适用于基于人类反馈的强化学习(RLHF)中的偏好训练,通过对比同一查询的两个回复,让模型学习人类偏好。
(7)策略优化数据处理(PPO)
| def preprocess_unsupervised_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
for query, response, history, system in construct_example(examples):
source_ids, target_ids = template.encode_oneturn(...)
model_inputs["input_ids"].append(source_ids)
model_inputs["attention_mask"].append([1] * len(source_ids))
model_inputs["labels"].append(target_ids)
return model_inputs
|
示例输入与配置
假设输入的examples
包含1个样本,配置如下:
| examples = {
"prompt": ["介绍Python"],
"response": ["Python是一种高级编程语言"]
}
# 假设template.encode_oneturn的编码结果(简化示例):
# template.encode_oneturn("介绍Python", "Python是一种高级编程语言") →
# (source_ids=[101, 102], target_ids=[201, 202, 203])
|
逐行代码执行过程与输出
1. 初始化输出结构
| model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
|
-
初始状态:
| model_inputs = {
"input_ids": [],
"attention_mask": [],
"labels": []
}
|
遍历construct_example生成的样本
| for query, response, history, system in construct_example(examples):
# query = "介绍Python"
# response = "Python是一种高级编程语言"
|
编码样本
| source_ids, target_ids = template.encode_oneturn(...)
# 假设编码结果:
# source_ids = [101, 102] # 对应"介绍Python"
# target_ids = [201, 202, 203] # 对应"Python是一种高级编程语言"
|
更新model_inputs
| model_inputs["input_ids"].append(source_ids)
model_inputs["attention_mask"].append([1] * len(source_ids))
model_inputs["labels"].append(target_ids)
|
-
最终输出:
| model_inputs = {
"input_ids": [[101, 102]], # 输入的token ID
"attention_mask": [[1, 1]], # 注意力掩码(全1表示不忽略任何token)
"labels": [[201, 202, 203]] # 目标输出的token ID
}
|
完整输出结果
| {
"input_ids": [[101, 102]],
"attention_mask": [[1, 1]],
"labels": [[201, 202, 203]]
}
|
代码作用总结
代码行 |
作用描述 |
model_inputs 初始化 |
创建用于存储输入、掩码和标签的字典。 |
construct_example |
生成(query, response, history, system)元组,处理对话结构。 |
template.encode_oneturn |
将查询和回复编码为token ID,分别得到source(输入)和target(目标)。 |
结果收集 |
将编码后的输入、注意力掩码(全1)和目标输出分别存入对应的列表。 |
此函数适用于无监督学习或自监督学习场景,例如语言模型的生成任务,模型需要根据input_ids
生成labels
中的内容。
(8). 主处理逻辑
| if stage == "pt":
dataset = dataset.filter(lambda example: example["prompt"])
preprocess_function = preprocess_pretrain_dataset
elif stage == "sft" and not training_args.predict_with_generate:
dataset = dataset.filter(lambda example: example["prompt"] and example["response"])
preprocess_function = preprocess_supervised_dataset
elif stage == "rm":
dataset = dataset.filter(lambda example: example["prompt"] and len(example["response"]) > 1)
preprocess_function = preprocess_pairwise_dataset
else:
dataset = dataset.filter(lambda example: example["prompt"])
preprocess_function = preprocess_unsupervised_dataset
with training_args.main_process_first(...):
dataset = dataset.map(
preprocess_function,
batched=True,
remove_columns=column_names,
**kwargs
)
print_function(next(iter(dataset)))
|
示例输入数据
假设初始数据集包含3个样本,每个样本有prompt
和response
字段:
| dataset = [
{"prompt": "介绍Python", "response": ["Python是高级语言"]}, # 样本1
{"prompt": "推荐电影", "response": ["《星际穿越》", "《蜘蛛侠》"]}, # 样本2(两个回复,用于RM)
{"prompt": "", "response": ["空查询"]} # 样本3(无效)
]
|
场景1:预训练阶段(stage="pt")
代码执行过程
| # 1. 过滤数据:保留prompt不为空的样本
dataset = dataset.filter(lambda example: example["prompt"])
# 过滤后:[样本1, 样本2](丢弃样本3)
# 2. 指定预处理函数
preprocess_function = preprocess_pretrain_dataset # 拼接所有文本
# 3. 批量处理数据
dataset = dataset.map(
preprocess_function,
batched=True,
remove_columns=["prompt", "response"] # 删除原始字段
)
|
预处理结果
| # 假设preprocess_pretrain_dataset的输出(简化):
dataset = [
{
"input_ids": [101, 102, 201, 202], # "介绍Python Python是高级语言"
"attention_mask": [1, 1, 1, 1],
"labels": [101, 102, 201, 202] # 自监督任务,标签与输入相同
},
{
"input_ids": [103, 104, 203, 204, 205, 206], # "推荐电影 《星际穿越》 《蜘蛛侠》"
"attention_mask": [1, 1, 1, 1, 1, 1],
"labels": [103, 104, 203, 204, 205, 206]
}
]
|
场景2:监督微调阶段(stage="sft")
代码执行过程
| # 1. 过滤数据:保留prompt和response都不为空的样本
dataset = dataset.filter(lambda example: example["prompt"] and example["response"])
# 过滤后:[样本1, 样本2](丢弃样本3)
# 2. 指定预处理函数
preprocess_function = preprocess_supervised_dataset # 区分输入和目标
# 3. 批量处理数据
dataset = dataset.map(...)
|
预处理结果
| # 假设preprocess_supervised_dataset的输出(简化):
dataset = [
{
"input_ids": [101, 102], # "介绍Python"
"attention_mask": [1, 1],
"labels": [201, 202] # "Python是高级语言"
},
{
"input_ids": [103, 104], # "推荐电影"
"attention_mask": [1, 1],
"labels": [203, 204] # "《星际穿越》"(只取第一个回复)
}
]
|
场景3:奖励模型训练阶段(stage="rm")
代码执行过程
| # 1. 过滤数据:保留prompt不为空且有多个response的样本
dataset = dataset.filter(lambda example: example["prompt"] and len(example["response"]) > 1)
# 过滤后:[样本2](丢弃样本1和3)
# 2. 指定预处理函数
preprocess_function = preprocess_pairwise_dataset # 处理偏好对
# 3. 批量处理数据
dataset = dataset.map(...)
|
预处理结果
| # 假设preprocess_pairwise_dataset的输出(简化):
dataset = [
{
"prompt_ids": [103, 104], # "推荐电影"
"chosen_ids": [203, 204], # "《星际穿越》"(更优回复)
"rejected_ids": [205, 206] # "《蜘蛛侠》"(更差回复)
}
]
|
场景4:默认/无监督阶段
代码执行过程
| # 1. 过滤数据:保留prompt不为空的样本
dataset = dataset.filter(lambda example: example["prompt"])
# 过滤后:[样本1, 样本2](丢弃样本3)
# 2. 指定预处理函数
preprocess_function = preprocess_unsupervised_dataset # 无监督格式
# 3. 批量处理数据
dataset = dataset.map(...)
|
预处理结果
| # 假设preprocess_unsupervised_dataset的输出(简化):
dataset = [
{
"input_ids": [101, 102], # "介绍Python"
"attention_mask": [1, 1],
"labels": [201, 202] # "Python是高级语言"
},
{
"input_ids": [103, 104], # "推荐电影"
"attention_mask": [1, 1],
"labels": [203, 204] # "《星际穿越》"(只取第一个回复)
}
]
|
代码作用总结
代码行 |
作用描述 |
dataset.filter() |
根据不同阶段要求过滤无效样本(如prompt为空或回复不足)。 |
preprocess_function 选择 |
根据阶段选择对应预处理函数(拼接文本/监督学习/偏好对/无监督)。 |
dataset.map() |
批量应用预处理函数,将原始字段转换为模型所需的输入格式(如input_ids、labels)。 |
remove_columns |
删除原始字段,减少内存占用。 |
main_process_first |
确保主进程先处理数据,避免多进程冲突。 |
此流程展示了不同训练阶段(预训练、监督微调、奖励模型训练)的数据处理差异,通过过滤和格式转换,将原始数据转换为模型可接受的输入格式。
总结
1、preprocess_pretrain_dataset处理PreTraining阶段的数据
数据组成形式:
输入input: X1 X2 X3
标签labels:X1 X2 X3
典型的Decoder架构的数据训练方式;
2、preprocess_supervised_dataset处理SFT阶段的数据
数据组成形式:
输入input: prompt response
标签labels: -100 ... -100 response
这里面labels的重点在于prompt部分的被-100所填充,主要作用在下面会介绍到。
| lm_logits = self.lm_head(hidden_states)
loss = None
if labels is not None:
labels = labels.to(lm_logits.device)
shift_logits = lm_logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
loss_fct = CrossEntropyLoss()
loss = loss_fct(
shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)
)
### pass
|
乍一看上面这个loss貌似只是预训练Pre-training阶段的loss,SFT阶段的loss难道也是这么算的吗?答案是肯定的。
SFT阶段数据的输入输出格式是的形式,实际训练过程中应该是给定prompt,计算response部分的loss,但是这个是怎么实现的呢?
关键的要点来了,还记得在数据预处理阶段labels的加工方式吗?对于prompt部分的labels被-100所填充,导致在计算loss的时候模型只计算response部分的loss,-100的部分被忽略了。而这个机制得益于torch的CrossEntropyLossignore_index参数,ignore_index参数定义为如果labels中包含了指定了需要忽略的类别号(默认是-100),那么在计算loss的时候就不会计算该部分的loss也就对梯度的更新不起作用,详情可以查看这部分的定义。
最后,可以看出不管是Pre-training阶段还是SFT阶段,loss函数都是一样的,只是计算的方式存在差异,Pre-training阶段计算的是整段输入文本的loss,而SFT阶段计算的是response部分的loss。