Skip to content

大模型预训练概述

预训练是大模型经历的第一个训练阶段,通过在大量语料上进行训练获得基础的世界知识(背书)

​主要从预训练定义、数据、流程、继续预训练与预训练评估几部分讲解大模型的预训练​

Pre-training 定义

预训练目标:大模型通过在大规模数据集上学习,使模型能捕捉数据的通用特征和模式,从而提升 大模型在各种任务上的性能和泛化能力,让模型掌握通用能力。

比如在预训练期间,模型会接触大量未标记的文本数据,例如书籍、文章和网站,目标是捕获文本语料库中存在的底层模式、结构和语义知识

  • 数据稀缺性:收集并标注专业领域的数据是耗时费力的任务。预训练先从大量无标注数据学习通用特征,减少对标注数据的依赖。
  • 先验只是:模型从随机初始化的参数进行学习。对很多任务来说,具有一定的先验知识,可以加快大模型在新任务的训练。

大模型采用的预训练方法是大量无标签语料进行自监督学习(区别于聚类等无监督学习,NTP利用下一个token作为标签进行自监督学习)

训练目标为Next Token Prediction loss:

\[L = -\sum_{n=1}^N\log p(x_n|x_1,x_2,...,x_{x-1};\theta)\]

基本思想:模型根据上下文预测下一个最可能的单词,Next Token Prediction loss通过计算每个预测的对数似然估计,帮助模型进行更准确的下一个token预测

为什么计算损失使用对数?

  • 数学简化:将概率连乘转换为对数相加,避免数值下溢。
  • 凸函数性质:对数函数将非凸优化问题转换为凸优化问题,便于求解。

凸函数定义:

函数\(f(x)\)是凸函数,当且仅当对于任意\(x_1,x_2\)\(\lambda \in [0,1]\),有:

\[f(\lambda x_1 +(1-\lambda)x_2)\geq \lambda f(x_1)+(1-\lambda)f(x_2)\]

直观理解:函数图像上任意两点的连线始终在函数曲线的上方。

非凸函数存在多个局部最优解,优化算法容易陷入局部最优,无法找到全局最优解。

  • 概率解释:对数似然可以解释为编码长度,最小化损失等价于最小化编码真实标签所需的比特数。

香农在信息论中提出:最优编码长度与事件发生的概率成反比,概率越低的事件需要越长的编码来表示。而对数似然损失的最小化过程,本质上等价于寻找一种使真实标签编码长度最短的概率模型。

香农提出,对于一个概率为 \(p(x)\) 的事件 x,其最优二进制编码长度(单位:比特)

可定义为:\(\text{编码长度} = -\log_2 p(x)\)

原理:高概率事件(如 “明天太阳升起”)用短编码(如 0);低概率事件(如 “明天地震”)用长编码(如 1010)。

示例:

若事件 A 的概率 \(p(A) = 1/2\),则编码长度为 \(-\log_2(1/2) = 1\) 比特(如用 0 表示);

若事件 B 的概率 \(p(B) = 1/8\),则编码长度为 \(-\log_2(1/8) = 3\) 比特(如用 110 表示)。

预训练阶段

数据与tokenization

来自网络 (CommonCrawl、OpenWebText)、书籍 (BookCorpus) 、维基百科等,可能数百 GB 乃至 TB 级;

  • 过滤:去重、去低质量、遵守道德标准
  • tokenization: BPE/WordPiece/SentencePiece 等生成 30k到100k 规模词汇表

训练目标(损失函数)

自回归语言模型

\[L = -\sum_{n=1}^N\log p(x_n|x_1,x_2,...,x_{x-1};\theta)\]

我们证明上面的 loss function 是给定一个token序列的交叉熵

给出预训练的sequence \(w_1,w_2,...,w_n\)。那么生成这个句子的概率为\(P(w_1,w_2,...,w_n)\),可以通过条件概率公式分解为:

\(P(w_1,w_2,...,w_n)=p(w_1)p(w_2|w_1)p(w_3|w_1,w_2)...p(w_n|w_1,w_2,...,w_{n-1})=\prod_{i=1}^np(w_i|w_1,w_2,...,w_{i-1})\)

这里条件概率\(p(w_i|w_1,w_2,...,w_{i-1})\)既可以用N-gram语言模型,也可以用Transformer建模

交叉熵损失

\[H(LM)=E_{p(b_N)}[-\log_2 q(w_N|b_{N-1})]=-\sum_{b_N}p(b_N)\log_2 q(w_N|b_{N-1})=-\sum_{b_N}p(w_N|b_{N-1})p(b_{N-1}\log_2 q(w_N|b_{N-1}))\approx -\sum_{b_N}1\cdot \frac{1}{m}\log_2 q(w_N|b_{N-1})=-\frac{1}{m}\sum_{b_N}\log_2 q(w_N|b_{N-1})=-\frac{1}{m}\sum_{i=1}^m\log_2 q(w_i|w_1,w_2,...,w_{i-1})\]

这里\(\approx\)是重要一步

  • 对于\(p(w_N|b_{N-1})\),假设理想情况下,给定上文\(b_{N-1}\)就能以置信度为1确定下一个词\(w_N\),所以\(p(w_N|b_{N-1})=1\)

  • 对于 \(p(b_{N - 1})\) ,假设滑动窗口操作后一共产生 m 个不同的上文 \(b_N\) ,每个 \(b_{N - 1}\) 出现的概率相同,即 \(p(b_{N - 1})=\frac{1}{m}\) 

困惑度损失

\[Perplexity(LM)=(\prod_{i=1}^m\frac{1}{q(w_i|w_1,w_2,...,w_{i-1})})^{\frac{1}{m}}\]

当语言模型生成每一个token越不确定,那么困惑度越大,模型性能越差

接着交叉熵损失推导:

\[-\frac{1}{m}\log_2 q(w_i|w_1,w_2,...,w_{i-1})=\log_2[(\prod_{i=1}^m\frac{1}{q(w_i|w_1,w_2,...,w_{i-1})})^{\frac{1}{m}}]=\log_2(Perplexity(LM))\]

Perplexity求一个log就是交叉熵(推导精彩)

实际计算中如何计算loss

  1. Pre-training阶段和SFT阶段的loss计算差异

Pre-training阶段还是SFT阶段,loss函数都是一样的,只是计算的方式存在差异.

PreTraining阶段计算的是整段输入文本的loss,而SFT阶段计算的是response部分的loss。

Pre-training概念(2)

实际计算中如何计算loss

LLM的pretrain是自监督学习,所以语料文本之外无需标注,但在训练的时候需要label,.

处理pretrain阶段的数据

  • 数据组成形式:
    • 输入:\(X_1\) \(X_2\) \(X_3\)
    • labels:\(X_1\) \(X_2\) \(X_3\)
  • 典型的Decoder架构的数据训练方式;

即根据[]预测\(X_1\),根据[,X_1]预测\(X_2\),...

掩码语言模型(MLM/BERT-style)

\[L_{MLM}(\theta)=E_x[-\sum_{i\in M}\log P_\theta(x_i|\tilde{x})]\]

其中\(M\)是被mask的索引集合,\(\tilde{x}\)表示在\(M\)处替换为[MASK]

详情可见这篇博客Encoder-only 预训练任务

去噪自编码(Seq2Seq)

如 BART/T5 采用加噪(随机删除/打乱)的输入序列,由编码器读入,解码器输出去噪效果

我会在LLM/LLM_Architecture里详细介绍

算法与并行策略

两阶段优化

  1. 预训练:\(\theta^* \leftarrow \arg \min_{\theta}L_{pre}(\theta,D_{pretrain})\)

  2. 微调: \(\phi^* = \arg \min_\phi \sum_{(x,y)\in D_{down}}G(x,y;\phi)\) with initial \(\phi \approx \theta^*\)

prompt 工程

无需改动参数,通过上下文,让模型在生成时自适应:

\[\hat{y}= \arg \max_y P_{\theta^*}(y|x,prompt)\]

下面的代码片段来自LLaMA-Efficient-Tuning代码库,其中preprocess.py文件里面定义了不同阶段数据处理的方式。

(1). 根据不同训练阶段对数据集进行预处理
def preprocess_dataset(

    # dataset:输入数据集,支持常规或流式数据集
    dataset: Union["Dataset", "IterableDataset"],

    # tokenizer:分词器,用于将文本分成token
    tokenizer: "PreTrainedTokenizer",

    # args 是 arguments(参数)的缩写,通常表示一组配置参数
    # DataArguments:配置数据加载和预处理参数
    data_args: "DataArguments",

    # Seq2SeqTrainingArguments:序列到序列模型(如 T5、BART)的训练参数配置类
    training_args: "Seq2SeqTrainingArguments",  # 使用引号,Python 会在运行时解析类型,允许你提前使用未导入的类型,避免循环依赖问题

    # stage:训练阶段,可选值为 "pt"(预训练)、"sft"(监督微调)、"rm"(奖励模型)、"ppo"(策略优化)
    stage: Literal["pt", "sft", "rm", "ppo"]
) -> Union["Dataset", "IterableDataset"]:
(2). 数据结构准备
column_names = list(next(iter(dataset)).keys())
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
  • column_names = list(next(iter(dataset)).keys())

功能

获取数据集的列名(即字段名称),通常用于了解数据集的结构。

执行步骤

iter(dataset):将数据集转换为迭代器,以便逐行访问数据。

next(iter(dataset)):从迭代器中取出第一个样本(即数据集的第一行)。

next(...).keys():获取该样本的所有键(key),即列名。

list(...):将键转换为列表格式,便于后续操作。

示例

假设数据集的第一行为 {'text': 'Hello', 'label': 1},则 column_names 为 ['text', 'label']。

  • template = get_template_and_fix_tokenizer(data_args.template, tokenizer)

功能

获取模板并修复分词器,确保输入文本能正确被模型处理

参数解析

data_args.template:配置文件中定义的模板字符串,用于格式化输入数据

例如:"问题:{text}\n答案:{label}"

get_template_and_fix_tokenizer操作:

解析模板:提取模板中的特殊标记(如 {text}、{label}),用于后续数据填充。 修复分词器: 为模板中的特殊符号(如 \n、自定义分隔符)添加特殊 token。确保分词器能正确处理模板中的占位符(如 {text}) 返回处理后的模板:可能是一个函数或对象,用于后续数据格式化

示例

# 假设dataset是一个包含文本和标签的数据集
dataset = [
    {'text': '今天天气如何', 'label': '晴天'},
    {'text': '明天有雨吗', 'label': '有雨'}
]

# 步骤1:获取列名
column_names = list(next(iter(dataset)).keys())
# 输出:['text', 'label']

# 步骤2:获取模板并修复分词器
# 假设template配置为 "问题:{text}\n答案:{label}"
template = get_template_and_fix_tokenizer(data_args.template, tokenizer)
# 后续使用template格式化数据
formatted_text = template(dataset[0])
# 输出:"问题:今天天气如何\n答案:晴天"
(3).数据生成器
1
2
3
4
5
6
7
8
9
def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
    for i in range(len(examples["prompt"])):
        query = examples["prompt"][i]
        if "query" in examples and examples["query"][i]:
            query = query + "\n" + examples["query"][i]
        response = examples["response"][i] if "response" in examples else None
        history = examples["history"][i] if "history" in examples else None
        system = examples["system"][i] if "system" in examples else None
        yield query, response, history, system

示例

示例输入数据

假设输入的 examples 是一个包含4个字段的字典,对应2个样本:

1
2
3
4
5
6
7
examples = {
    "prompt": ["你是谁", "今天星期几"],  # 必选字段,每个样本的基础提示
    "query": ["请用简洁语言回答", ""],     # 可选字段,附加查询(第二个样本为空)
    "response": ["我是AI助手豆包", "今天星期五"],  # 响应(标签)
    "history": [None, [("昨天星期几", "昨天星期四")]],  # 对话历史(第二个样本有历史记录)
    "system": ["通用模式", None]  # 系统提示(第二个样本无系统提示)
}

逐行代码执行过程与输出
** 函数定义与参数**

def construct_example(examples: Dict[str, List[Any]]) -> Generator[Any, None, None]:
- 输入examples 是上述字典,包含2个样本的各字段列表。
- 输出:生成器,逐个返回处理后的样本元组。

** 循环遍历样本(i=0 时处理第一个样本)

for i in range(len(examples["prompt"])):  # len(examples["prompt"]) = 2,循环i=0和i=1
- **当前 i=0:处理第一个样本(prompt 为“你是谁”)。

** 提取并处理 query**

1
2
3
query = examples["prompt"][i]  # examples["prompt"][0] = "你是谁"
if "query" in examples and examples["query"][i]:  # examples["query"][0] = "请用简洁语言回答"(非空)
    query = query + "\n" + examples["query"][i]  # 拼接后:"你是谁\n请用简洁语言回答"
- 处理后 query"你是谁\n请用简洁语言回答"

** 提取可选字段(response, history, system

1
2
3
response = examples["response"][i] if "response" in examples else None  # examples["response"][0] = "我是AI助手豆包"
history = examples["history"][i] if "history" in examples else None  # examples["history"][0] = None
system = examples["system"][i] if "system" in examples else None  # examples["system"][0] = "通用模式"
- **response"我是AI助手豆包"
- historyNone
- system"通用模式"

生成第一个样本

yield query, response, history, system
- 输出元组
("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式")

循环再次执行(i=1 时处理第二个样本)

1
2
3
4
5
6
7
8
# i=1,处理第二个样本(prompt为"今天星期几")
query = examples["prompt"][1]  # "今天星期几"
if "query" in examples and examples["query"][1]:  # examples["query"][1] = ""(空字符串,条件不成立)
    # 不执行拼接
response = examples["response"][1]  # "今天星期五"
history = examples["history"][1]  # [("昨天星期几", "昨天星期四")]
system = examples["system"][1]  # None
yield query, response, history, system
  • 处理后 query:保持原值 "今天星期几"
  • 输出元组
    ("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)

完整输出结果

通过生成器迭代输出的两个样本为:

  1. ("你是谁\n请用简洁语言回答", "我是AI助手豆包", None, "通用模式")

  2. ("今天星期几", "今天星期五", [("昨天星期几", "昨天星期四")], None)

代码作用总结

代码行 作用描述
len(examples["prompt"]) 确定样本总数(通过必选字段 prompt 的长度)。
query = examples["prompt"][i] 提取每个样本的基础提示文本。
拼接 query 若存在附加查询字段 query 且非空,将其与基础提示用换行符拼接,丰富输入。
条件提取可选字段 处理 responsehistorysystem 字段,不存在时设为 None
yield 生成元组 返回处理后的样本,包含输入查询、响应、历史对话和系统提示。
(4). 预训练数据处理(PT)
1
2
3
4
5
6
7
8
9
def preprocess_pretrain_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
    tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
    concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
    total_length = (len(concatenated_examples[list(concatenated_examples.keys())[0]]) // block_size) * block_size
    result = {
        k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
        for k, t in concatenated_examples.items()
    }
    return result

示例输入数据
假设输入的examples包含2个样本,tokenizer将文本转换为token ID,block_size=4

1
2
3
4
5
6
7
examples = {
    "prompt": ["Hello world", "How are you today"]
}

# 假设tokenizer的分词结果(简化示例):
# "Hello world" → [101, 102]
# "How are you today" → [103, 104, 105, 106]

逐行代码执行过程与输出

分词处理

tokenized_examples = tokenizer(examples["prompt"], add_special_tokens=False)
- 分词结果(假设):
1
2
3
4
tokenized_examples = {
    "input_ids": [[101, 102], [103, 104, 105, 106]],
    "attention_mask": [[1, 1], [1, 1, 1, 1]]
}

拼接所有样本的tokens

concatenated_examples = {k: list(chain(*tokenized_examples[k])) for k in tokenized_examples.keys()}
- 拼接后
1
2
3
4
concatenated_examples = {
    "input_ids": [101, 102, 103, 104, 105, 106],
    "attention_mask": [1, 1, 1, 1, 1, 1]
}

计算总长度并截断为block_size的整数倍

total_length = (len(concatenated_examples["input_ids"]) // block_size) * block_size
# len=6,block_size=4 → total_length = (6//4)*4 = 4
- 结果total_length = 4(丢弃剩余的2个token)。

按block_size切分tokens

1
2
3
4
result = {
    k: [t[i: i + block_size] for i in range(0, total_length, block_size)]
    for k, t in concatenated_examples.items()
}
- 切分后
1
2
3
4
result = {
    "input_ids": [[101, 102, 103, 104]],  # 丢弃[105, 106]
    "attention_mask": [[1, 1, 1, 1]]       # 丢弃[1, 1]
}

完整输出结果

1
2
3
4
{
    "input_ids": [[101, 102, 103, 104]],
    "attention_mask": [[1, 1, 1, 1]]
}

代码作用总结

代码行 作用描述
tokenizer(...) 将所有样本的文本转换为token ID(不添加特殊token)。
chain(*tokenized_examples[k]) 将所有样本的tokens拼接成一个长列表(用于连续训练)。
total_length计算 截断tokens总长度为block_size的整数倍,确保能均匀切分。
按块切分 将长tokens列表切分为固定长度的块(每个块长度=block_size),丢弃不足的部分。
(5)监督微调数据处理(SFT)
def preprocess_supervised_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
    model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
    max_length = data_args.max_source_length + data_args.max_target_length

    for query, response, history, system in construct_example(examples):
        input_ids, labels = [], []
        for source_ids, target_ids in template.encode_multiturn(...):
            if len(input_ids) + len(source_ids) + len(target_ids) > max_length:
                break
            input_ids += source_ids + target_ids
            labels += [IGNORE_INDEX] * len(source_ids) + target_ids

        model_inputs["input_ids"].append(input_ids)
        model_inputs["attention_mask"].append([1] * len(input_ids))
        model_inputs["labels"].append(labels)
    return model_inputs

示例输入与配置

假设输入的examples包含1个对话样本,配置如下:

examples = {
    "prompt": ["今天天气如何?"],
    "response": ["晴天,适合户外活动。"]
}

# 配置参数
data_args.max_source_length = 10  # 源文本最大长度
data_args.max_target_length = 10  # 目标文本最大长度
max_length = 20                  # 总最大长度

# 模板编码结果(简化示例)
# 假设template.encode_multiturn(...)返回:
# [
#   (source_ids=[101, 102], target_ids=[201, 202, 203]),  # 表示"今天天气如何?" → "晴天,适合户外活动。"
# ]

逐行代码执行过程与输出

初始化输出结构

model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
- 初始状态
1
2
3
4
5
model_inputs = {
    "input_ids": [],
    "attention_mask": [],
    "labels": []
}

遍历construct_example生成的样本

1
2
3
4
for query, response, history, system in construct_example(examples):
    # query = "今天天气如何?"
    # response = "晴天,适合户外活动。"
    input_ids, labels = [], []
- 初始化样本的input_idslabels
input_ids = []
labels = []

处理多轮对话(本例简化为单轮)

1
2
3
4
5
6
7
8
9
for source_ids, target_ids in template.encode_multiturn(...):
    # source_ids = [101, 102]  # 对应"今天天气如何?"
    # target_ids = [201, 202, 203]  # 对应"晴天,适合户外活动。"

    if len(input_ids) + len(source_ids) + len(target_ids) > max_length:
        break  # 长度未超过20,不执行

    input_ids += source_ids + target_ids  # [101, 102, 201, 202, 203]
    labels += [IGNORE_INDEX] * len(source_ids) + target_ids  # [-100, -100, 201, 202, 203]
- 处理后
input_ids = [101, 102, 201, 202, 203]
labels = [-100, -100, 201, 202, 203]  # -100表示忽略,只预测target部分

更新model_inputs

1
2
3
model_inputs["input_ids"].append(input_ids)
model_inputs["attention_mask"].append([1] * len(input_ids))
model_inputs["labels"].append(labels)
- 最终输出
1
2
3
4
5
model_inputs = {
    "input_ids": [[101, 102, 201, 202, 203]],
    "attention_mask": [[1, 1, 1, 1, 1]],
    "labels": [[-100, -100, 201, 202, 203]]
}

完整输出结果

1
2
3
4
5
{
    "input_ids": [[101, 102, 201, 202, 203]],
    "attention_mask": [[1, 1, 1, 1, 1]],
    "labels": [[-100, -100, 201, 202, 203]]
}

代码作用总结

代码行 作用描述
model_inputs初始化 创建用于存储输入、掩码和标签的字典。
construct_example 生成(query, response, history, system)元组,处理对话结构。
template.encode_multiturn 将对话转换为token ID,区分source(用户输入)和target(模型输出)。
input_ids拼接 将source和target的token ID按顺序拼接。
labels设置 source部分设为IGNORE_INDEX(如-100),仅预测target部分。
attention_mask生成 全1掩码,表示所有token都参与计算。

此函数适用于监督微调(SFT)场景,将对话数据转换为模型可接受的格式,通过labels指定需要预测的部分。

(6).奖励模型数据处理(RM)
1
2
3
4
5
6
7
8
9
def preprocess_pairwise_dataset(examples):
    model_inputs = {"prompt_ids": [], "chosen_ids": [], "rejected_ids": []}
    for query, response, history, system in construct_example(examples):
        prompt_ids, chosen_ids = template.encode_oneturn(query, response[0], ...)
        _, rejected_ids = template.encode_oneturn(query, response[1], ...)
        model_inputs["prompt_ids"].append(prompt_ids)
        model_inputs["chosen_ids"].append(chosen_ids)
        model_inputs["rejected_ids"].append(rejected_ids)
    return model_inputs

示例输入与配置

假设输入的examples包含1个样本,每个样本有1个查询2个候选回复(分别为更优和更差的回复):

examples = {
    "prompt": ["推荐一部科幻电影"],
    "response": [
        ["《星际穿越》,科学与情感的完美结合", "《蜘蛛侠》,适合全家观看"]  # response[0]=更优回复,response[1]=更差回复
    ]
}

# 假设template.encode_oneturn的编码结果(简化示例):
# template.encode_oneturn("推荐一部科幻电影", "《星际穿越》...") → (prompt_ids=[101,102], chosen_ids=[201,202])
# template.encode_oneturn("推荐一部科幻电影", "《蜘蛛侠》...") → (prompt_ids=[101,102], rejected_ids=[301,302])

逐行代码执行过程与输出

1. 初始化输出结构

model_inputs = {"prompt_ids": [], "chosen_ids": [], "rejected_ids": []}
- 初始状态
1
2
3
4
5
model_inputs = {
    "prompt_ids": [],
    "chosen_ids": [],
    "rejected_ids": []
}

遍历construct_example生成的样本

1
2
3
for query, response, history, system in construct_example(examples):
    # query = "推荐一部科幻电影"
    # response = ["《星际穿越》...", "《蜘蛛侠》..."]  # 包含两个回复的列表

编码更优回复

1
2
3
4
prompt_ids, chosen_ids = template.encode_oneturn(query, response[0], ...)
# 假设编码结果:
# prompt_ids = [101, 102]  # 对应"推荐一部科幻电影"
# chosen_ids = [201, 202]  # 对应"《星际穿越》,科学与情感的完美结合"

编码更差回复

1
2
3
_, rejected_ids = template.encode_oneturn(query, response[1], ...)
# 假设编码结果:
# rejected_ids = [301, 302]  # 对应"《蜘蛛侠》,适合全家观看"

更新model_inputs

1
2
3
model_inputs["prompt_ids"].append(prompt_ids)
model_inputs["chosen_ids"].append(chosen_ids)
model_inputs["rejected_ids"].append(rejected_ids)
- 最终输出
1
2
3
4
5
model_inputs = {
    "prompt_ids": [[101, 102]],        # 查询的token ID
    "chosen_ids": [[201, 202]],        # 更优回复的token ID
    "rejected_ids": [[301, 302]]       # 更差回复的token ID
}

完整输出结果

1
2
3
4
5
{
    "prompt_ids": [[101, 102]],
    "chosen_ids": [[201, 202]],
    "rejected_ids": [[301, 302]]
}

代码作用总结

代码行 作用描述
model_inputs初始化 创建用于存储查询、更优回复、更差回复的字典。
construct_example 生成(query, response, history, system)元组,处理对话结构。
template.encode_oneturn 将查询和回复编码为token ID,分别处理更优和更差的回复。
结果收集 将编码后的查询、更优回复、更差回复分别存入对应的列表。

此函数适用于基于人类反馈的强化学习(RLHF)中的偏好训练,通过对比同一查询的两个回复,让模型学习人类偏好。

(7)策略优化数据处理(PPO)
1
2
3
4
5
6
7
8
def preprocess_unsupervised_dataset(examples: Dict[str, List[Any]]) -> Dict[str, Any]:
    model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
    for query, response, history, system in construct_example(examples):
        source_ids, target_ids = template.encode_oneturn(...)
        model_inputs["input_ids"].append(source_ids)
        model_inputs["attention_mask"].append([1] * len(source_ids))
        model_inputs["labels"].append(target_ids)
    return model_inputs

示例输入与配置

假设输入的examples包含1个样本,配置如下:

1
2
3
4
5
6
7
8
examples = {
    "prompt": ["介绍Python"],
    "response": ["Python是一种高级编程语言"]
}

# 假设template.encode_oneturn的编码结果(简化示例):
# template.encode_oneturn("介绍Python", "Python是一种高级编程语言") → 
# (source_ids=[101, 102], target_ids=[201, 202, 203])

逐行代码执行过程与输出

1. 初始化输出结构

model_inputs = {"input_ids": [], "attention_mask": [], "labels": []}
- 初始状态
1
2
3
4
5
model_inputs = {
    "input_ids": [],
    "attention_mask": [],
    "labels": []
}

遍历construct_example生成的样本

1
2
3
for query, response, history, system in construct_example(examples):
    # query = "介绍Python"
    # response = "Python是一种高级编程语言"

编码样本

1
2
3
4
source_ids, target_ids = template.encode_oneturn(...)
# 假设编码结果:
# source_ids = [101, 102]  # 对应"介绍Python"
# target_ids = [201, 202, 203]  # 对应"Python是一种高级编程语言"

更新model_inputs

1
2
3
model_inputs["input_ids"].append(source_ids)
model_inputs["attention_mask"].append([1] * len(source_ids))
model_inputs["labels"].append(target_ids)
- 最终输出
1
2
3
4
5
model_inputs = {
    "input_ids": [[101, 102]],        # 输入的token ID
    "attention_mask": [[1, 1]],       # 注意力掩码(全1表示不忽略任何token)
    "labels": [[201, 202, 203]]       # 目标输出的token ID
}

完整输出结果

1
2
3
4
5
{
    "input_ids": [[101, 102]],
    "attention_mask": [[1, 1]],
    "labels": [[201, 202, 203]]
}

代码作用总结

代码行 作用描述
model_inputs初始化 创建用于存储输入、掩码和标签的字典。
construct_example 生成(query, response, history, system)元组,处理对话结构。
template.encode_oneturn 将查询和回复编码为token ID,分别得到source(输入)和target(目标)。
结果收集 将编码后的输入、注意力掩码(全1)和目标输出分别存入对应的列表。

此函数适用于无监督学习或自监督学习场景,例如语言模型的生成任务,模型需要根据input_ids生成labels中的内容。

(8). 主处理逻辑
if stage == "pt":
    dataset = dataset.filter(lambda example: example["prompt"])
    preprocess_function = preprocess_pretrain_dataset
elif stage == "sft" and not training_args.predict_with_generate:
    dataset = dataset.filter(lambda example: example["prompt"] and example["response"])
    preprocess_function = preprocess_supervised_dataset
elif stage == "rm":
    dataset = dataset.filter(lambda example: example["prompt"] and len(example["response"]) > 1)
    preprocess_function = preprocess_pairwise_dataset
else:
    dataset = dataset.filter(lambda example: example["prompt"])
    preprocess_function = preprocess_unsupervised_dataset

with training_args.main_process_first(...):
    dataset = dataset.map(
        preprocess_function,
        batched=True,
        remove_columns=column_names,
        **kwargs
    )
    print_function(next(iter(dataset)))

示例输入数据
假设初始数据集包含3个样本,每个样本有promptresponse字段:

1
2
3
4
5
dataset = [
    {"prompt": "介绍Python", "response": ["Python是高级语言"]},  # 样本1
    {"prompt": "推荐电影", "response": ["《星际穿越》", "《蜘蛛侠》"]},  # 样本2(两个回复,用于RM)
    {"prompt": "", "response": ["空查询"]}  # 样本3(无效)
]

场景1:预训练阶段(stage="pt")

代码执行过程

# 1. 过滤数据:保留prompt不为空的样本
dataset = dataset.filter(lambda example: example["prompt"])
# 过滤后:[样本1, 样本2](丢弃样本3)

# 2. 指定预处理函数
preprocess_function = preprocess_pretrain_dataset  # 拼接所有文本

# 3. 批量处理数据
dataset = dataset.map(
    preprocess_function,
    batched=True,
    remove_columns=["prompt", "response"]  # 删除原始字段
)

预处理结果

# 假设preprocess_pretrain_dataset的输出(简化):
dataset = [
    {
        "input_ids": [101, 102, 201, 202],  # "介绍Python Python是高级语言"
        "attention_mask": [1, 1, 1, 1],
        "labels": [101, 102, 201, 202]  # 自监督任务,标签与输入相同
    },
    {
        "input_ids": [103, 104, 203, 204, 205, 206],  # "推荐电影 《星际穿越》 《蜘蛛侠》"
        "attention_mask": [1, 1, 1, 1, 1, 1],
        "labels": [103, 104, 203, 204, 205, 206]
    }
]

场景2:监督微调阶段(stage="sft")

代码执行过程

1
2
3
4
5
6
7
8
9
# 1. 过滤数据:保留prompt和response都不为空的样本
dataset = dataset.filter(lambda example: example["prompt"] and example["response"])
# 过滤后:[样本1, 样本2](丢弃样本3)

# 2. 指定预处理函数
preprocess_function = preprocess_supervised_dataset  # 区分输入和目标

# 3. 批量处理数据
dataset = dataset.map(...)

预处理结果

# 假设preprocess_supervised_dataset的输出(简化):
dataset = [
    {
        "input_ids": [101, 102],  # "介绍Python"
        "attention_mask": [1, 1],
        "labels": [201, 202]  # "Python是高级语言"
    },
    {
        "input_ids": [103, 104],  # "推荐电影"
        "attention_mask": [1, 1],
        "labels": [203, 204]  # "《星际穿越》"(只取第一个回复)
    }
]

场景3:奖励模型训练阶段(stage="rm")

代码执行过程

1
2
3
4
5
6
7
8
9
# 1. 过滤数据:保留prompt不为空且有多个response的样本
dataset = dataset.filter(lambda example: example["prompt"] and len(example["response"]) > 1)
# 过滤后:[样本2](丢弃样本1和3)

# 2. 指定预处理函数
preprocess_function = preprocess_pairwise_dataset  # 处理偏好对

# 3. 批量处理数据
dataset = dataset.map(...)

预处理结果

1
2
3
4
5
6
7
8
# 假设preprocess_pairwise_dataset的输出(简化):
dataset = [
    {
        "prompt_ids": [103, 104],  # "推荐电影"
        "chosen_ids": [203, 204],  # "《星际穿越》"(更优回复)
        "rejected_ids": [205, 206]  # "《蜘蛛侠》"(更差回复)
    }
]

场景4:默认/无监督阶段

代码执行过程

1
2
3
4
5
6
7
8
9
# 1. 过滤数据:保留prompt不为空的样本
dataset = dataset.filter(lambda example: example["prompt"])
# 过滤后:[样本1, 样本2](丢弃样本3)

# 2. 指定预处理函数
preprocess_function = preprocess_unsupervised_dataset  # 无监督格式

# 3. 批量处理数据
dataset = dataset.map(...)

预处理结果

# 假设preprocess_unsupervised_dataset的输出(简化):
dataset = [
    {
        "input_ids": [101, 102],  # "介绍Python"
        "attention_mask": [1, 1],
        "labels": [201, 202]  # "Python是高级语言"
    },
    {
        "input_ids": [103, 104],  # "推荐电影"
        "attention_mask": [1, 1],
        "labels": [203, 204]  # "《星际穿越》"(只取第一个回复)
    }
]

代码作用总结

代码行 作用描述
dataset.filter() 根据不同阶段要求过滤无效样本(如prompt为空或回复不足)。
preprocess_function选择 根据阶段选择对应预处理函数(拼接文本/监督学习/偏好对/无监督)。
dataset.map() 批量应用预处理函数,将原始字段转换为模型所需的输入格式(如input_ids、labels)。
remove_columns 删除原始字段,减少内存占用。
main_process_first 确保主进程先处理数据,避免多进程冲突。

此流程展示了不同训练阶段(预训练、监督微调、奖励模型训练)的数据处理差异,通过过滤和格式转换,将原始数据转换为模型可接受的输入格式。

总结

1、preprocess_pretrain_dataset处理PreTraining阶段的数据

数据组成形式: 输入input: X1 X2 X3 标签labels:X1 X2 X3 典型的Decoder架构的数据训练方式; 2、preprocess_supervised_dataset处理SFT阶段的数据

数据组成形式: 输入input: prompt response 标签labels: -100 ... -100 response 这里面labels的重点在于prompt部分的被-100所填充,主要作用在下面会介绍到。

lm_logits = self.lm_head(hidden_states)
loss = None
if labels is not None:
    labels = labels.to(lm_logits.device)
    shift_logits = lm_logits[..., :-1, :].contiguous()
    shift_labels = labels[..., 1:].contiguous()
    loss_fct = CrossEntropyLoss()
    loss = loss_fct(
        shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)
    )

### pass

乍一看上面这个loss貌似只是预训练Pre-training阶段的loss,SFT阶段的loss难道也是这么算的吗?答案是肯定的。

SFT阶段数据的输入输出格式是的形式,实际训练过程中应该是给定prompt,计算response部分的loss,但是这个是怎么实现的呢?

关键的要点来了,还记得在数据预处理阶段labels的加工方式吗?对于prompt部分的labels被-100所填充,导致在计算loss的时候模型只计算response部分的loss,-100的部分被忽略了。而这个机制得益于torch的CrossEntropyLossignore_index参数,ignore_index参数定义为如果labels中包含了指定了需要忽略的类别号(默认是-100),那么在计算loss的时候就不会计算该部分的loss也就对梯度的更新不起作用,详情可以查看这部分的定义。

最后,可以看出不管是Pre-training阶段还是SFT阶段,loss函数都是一样的,只是计算的方式存在差异,Pre-training阶段计算的是整段输入文本的loss,而SFT阶段计算的是response部分的loss。