ChatGPT 的自动优化
2023-10-26
| 2024-3-23
Words 5916Read Time 15 min
type
status
date
slug
summary
tags
category
icon
password
URL
作为一个一直对 AI 技术很感兴趣的软件开发工程师,早在深度学习开始火起来的 15、16 年,我也开始了相关技术的学习。当时还组织了公司内部同样有兴趣的同学一起研究,最终的成果汇集成几次社区中的分享以及几篇学习文章(见这里)。
从去年 OpenAI 发布 ChatGPT 以来,AI 的能力再次惊艳了世人。在这样的一个时间节点,重新去学习相关技术显得很有必要。
ChatGPT 的内容很多,我计划采用一个系列,多篇文章来分享学习我自己学习过程中的一些理解。本系列文章,我将站在一个普通开发人员的角度展开,希望对想了解 ChatGPT 技术原理的普通开发者们有帮助。
ChatGPT 本身就具备很丰富的知识,所以 ChatGPT 自身实际上就是一个很好的学习渠道,我也将借助 ChatGPT 来学习 ChatGPT。
这是此系列的第五篇,ChatGPT 的自动优化。
上一篇文章我们深入分析了 ChatGPT 是如何训练及优化的,了解了如何进行监督微调,及如何让模型可以支持更广泛领域的问答。但是,监督微调始终会限于训练集中的问题模板数量,无法支持更为一般的对话。这一步骤就需要引入强化学习的训练方式,让 ChatGPT 可以自动进行优化。

介绍

强化学习模型是最为复杂的部分,参考 ColossalAI 的文档,模型的工作原理如下:
notion image
为了理解上图,需要先了解一下强化学习相关的背景知识。

强化学习

强化学习(Reinforcement Learning,简称 RL)是一种机器学习方法,旨在使智能体(agent)通过与环境的交互来学习适应环境并制定实现特定目标的策略。在强化学习中,智能体通过观察环境的状态,执行动作,并接收环境的奖励或惩罚来不断调整自己的策略,以获得最大化累积奖励的能力。
强化学习的基本要素包括:
  • 智能体(Agent):智能体是进行学习和决策的主体,它通过观察环境的状态、选择合适的动作,并与环境进行交互。
  • 环境(Environment):环境是智能体所处的外部世界,它可以是真实的物理环境,也可以是抽象的模拟环境。环境会根据智能体的动作进行状态转移,并根据智能体的表现给予奖励或惩罚。
  • 状态(State):状态是描述环境的特征或信息,它可以是完全观察的,也可以是部分观察或隐含的。智能体的决策往往基于当前状态。
  • 动作(Action):动作是智能体在某个状态下可以执行的操作或策略。智能体的目标是根据当前状态选择最优的动作。
  • 奖励(Reward):奖励是环境根据智能体的动作和表现给予的反馈信号,用于指示动作的好坏。智能体的目标是通过最大化累积奖励来学习合适的策略。
在 ChatGPT 这个场景中,ChatGPT 模型即智能体,环境是一个对话系统,状态是当前对话的上下文及当前消息,动作是如何选择某一个回复,奖励是人类反馈的回复质量好或差。
强化学习的核心问题是通过智能体与环境的交互来学习一个最优的策略,以使智能体在长期累积奖励的过程中能够获得最大化的回报。强化学习算法通常基于价值函数或策略函数来进行决策和优化,其中价值函数用于评估状态或状态动作对的价值,策略函数用于指导智能体在特定状态下选择动作。
强化学习常常应用于机器人控制、游戏智能、自动驾驶等领域。

强化学习算法

如何学习最优策略呢?常见的学习算法包括 Q-learning、SARSA、Deep Q-Network(DQN)、Policy Gradient、Proximal Policy Optimization(PPO)、Actor-Critic 等。
以下简要介绍和 RLHF 相关的算法:
  • Q-learning: 核心思想是学习一个状态 - 动作价值函数(Q 函数),它衡量在给定状态下采取特定动作的长期累积回报。可根据贝尔曼公式 Q(s,a) = Q(s,a) + α(r + γ * max(Q(s',a')) - Q(s,a)) 更新及优化 Q 函数,其中 α 是学习率,γ 是折扣因子,r 是奖励,s’是新的状态。
  • Policy Gradient(策略梯度)算法:不需要建立值函数模型,而是直接优化策略(动作的选择)。其基本思想是通过采样经验轨迹(trajectory),通过最大化累积奖励来计算策略梯度,并利用梯度信息更新策略参数。
  • Proximal Policy Optimization(PPO):一种基于策略梯度的强化学习算法,旨在通过有效地优化策略函数(通过引入一个重要性采样比率和一个剪切函数来限制策略更新的幅度,以保持策略的相对不变性)来提高强化学习的性能和稳定性。
  • Actor-Critic(演员 - 评论家)算法:结合了值函数和策略函数的强化学习算法。它通过同时学习一个策略函数(演员)和一个值函数(评论家),以提高强化学习的效率和性能。演员根据评论家的评估结果来更新策略,从而改进策略的质量。评论家则通过学习一个值函数来估计每个状态的值或动作值,以提供演员关于策略改进的反馈。
RLHF 算法结合了 PPO 和 Actor-Critic 算法的优势,所以可以高效而又稳定的优化 ChatGPT 的模型。

代码分析

有了前面的了解,下面咱们跟着代码一起来了解一下算法的细节。

使用 PyTorch 实现 Policy Gradient

下面来看 PyTorch 的示例中提供的一个参考的策略梯度算法实现。
先介绍一下 gym 库,这个库提供了一个模拟环境,内置了很多小游戏,可以帮助我们开发强化学习算法。
比如下面这个平衡杆小游戏,我们要想办法控制平衡杆使其一直位于连接点的上方。有两个动作可以用来控制游戏中的连接点,即左和右。控制连接点向左时,可以避免平衡杆往左倾倒。控制连接点向右时,可以避免平衡杆往右倾倒。
notion image
下面来用策略梯度的方法训练一个强化学习算法让机器人自动玩游戏。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
class Policy(nn.Module): # 定义策略函数网络,输出每个动作对应的概率 def __init__(self): super(Policy, self).__init__() # 定义网络结构,包含了两层线性连接层,第二层输出的动作数量为2 self.affine1 = nn.Linear(4, 128) self.dropout = nn.Dropout(p=0.6) self.affine2 = nn.Linear(128, 2) self.saved_log_probs = [] self.rewards = [] def forward(self, x): x = F.relu(self.dropout(self.affine1(x))) action_scores = self.affine2(x) return F.softmax(action_scores, dim=1)class PolicyTrainer: def __init__(self): # 初始化游戏环境 self.env = gym.make("CartPole-v1", render_mode="rgb_array") # 初始化策略函数网络及优化器 self.policy = Policy() self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2) self.eps = np.finfo(np.float32).eps.item() def select_action(self, state): # 根据当前的状态,执行策略函数,并根据函数输出的概率选择一个动作 state = torch.from_numpy(state).float().unsqueeze(0) probs = self.policy(state) m = Categorical(probs) action = m.sample() # 将动作保存起来,在一局游戏结束的时候,用于训练 self.policy.saved_log_probs.append(m.log_prob(action)) return action.item() def finish_episode(self): # 在一局游戏结束的时候,执行策略函数的训练 # 根据执行动作时保存的奖励,来迭代计算每一个动作的奖励 # 每一个动作的奖励 = 当前奖励 + 折扣率 * 整局游戏中将来的奖励 R = 0 returns = deque() for r in self.policy.rewards[::-1]: R = r + args.gamma * R returns.appendleft(R) returns = torch.tensor(returns) # 将奖励归一化 returns = (returns - returns.mean()) / (returns.std() + self.eps) # 采用梯度上升法最大化策略奖励,这里使用最小化策略奖励的负数来实现 policy_loss = [] for log_prob, R in zip(self.policy.saved_log_probs, returns): # 计算每一个动作的策略损失,策略损失 = -动作概率 * 动作奖励 policy_loss.append(-log_prob * R) # 根据整局游戏的结果来计算梯度,并更新参数 self.optimizer.zero_grad() policy_loss = torch.cat(policy_loss).sum() policy_loss.backward() self.optimizer.step() del self.policy.rewards[:] del self.policy.saved_log_probs[:] def train(self): running_reward = 10 for i_episode in count(1): # 新的一局游戏开始,初始化环境 state, _ = self.env.reset() ep_reward = 0 for t in range(1, 10000): # Don't infinite loop while learning # 采用策略函数来生成下一步采用的动作,并执行 action = self.select_action(state) state, reward, done, _, _ = self.env.step(action) # 记录奖励 self.policy.rewards.append(reward) ep_reward += reward # 累计计算当前这一局游戏的奖励 if done: # 如果游戏结束,则退出循环 break # 在一局游戏结束是,计算奖励的移动平均值 # 这里将当前这一局游戏的奖励以5%的百分比给平衡掉,让我们更容易看出游戏当前能得到的奖励 running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward # 触发策略网络更新 self.finish_episode() # 周期性打印日志 if i_episode % args.log_interval == 0: print("Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}".format(i_episode, ep_reward, running_reward)) if running_reward > self.env.spec.reward_threshold: print("Solved! Running reward is now {} and " "the last episode runs to {} time steps!".format(running_reward, t)) break
完整代码见这里。运行以上算法可以看到以下日志:
Episode 10 Last reward: 21.00 Average reward: 16.30
Episode 20 Last reward: 41.00 Average reward: 24.02
Episode 30 Last reward: 33.00 Average reward: 33.05
Episode 40 Last reward: 64.00 Average reward: 50.71
Episode 50 Last reward: 73.00 Average reward: 59.70
Episode 60 Last reward: 41.00 Average reward: 63.28
Episode 70 Last reward: 59.00 Average reward: 63.88
Episode 80 Last reward: 86.00 Average reward: 80.87
Episode 90 Last reward: 125.00 Average reward: 91.54
Episode 100 Last reward: 224.00 Average reward: 136.09
Episode 110 Last reward: 95.00 Average reward: 182.31
Episode 120 Last reward: 200.00 Average reward: 170.03
Episode 130 Last reward: 80.00 Average reward: 149.48
Episode 140 Last reward: 102.00 Average reward: 148.63
Episode 150 Last reward: 644.00 Average reward: 349.26
Solved! Running reward is now 550.1608406568259 and the last episode runs to 2888 time steps!
可以看到,在算法玩了 150 多次游戏的时候,已经可以玩得非常好了。但是我们也能注意到算法有一些波动,特别是在 100 局到 110 局时,曾经达到一个不错的水平,但是后来突然又有一些下降。

使用 PyTorch 实现 Actor-Critic

Actor-Critic 算法与 Policy Gradient 算法是类似的。下面看一下代码:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
SavedAction = namedtuple("SavedAction", ["log_prob", "value"])class Policy(nn.Module): # 定义演员函数和评论家函数网络 def __init__(self): super(Policy, self).__init__() self.affine1 = nn.Linear(4, 128) self.action_head = nn.Linear(128, 2) # 演员函数输出每个动作对应的概率 self.value_head = nn.Linear(128, 1) # 评论家函数输出每个动作对应的奖励 # action & reward buffer self.saved_actions = [] self.rewards = [] def forward(self, x): x = F.relu(self.affine1(x)) action_prob = F.softmax(self.action_head(x), dim=-1) state_values = self.value_head(x) return action_prob, state_valuesclass PolicyTrainer: def __init__(self) -> None: self.env = gym.make("CartPole-v1") self.model = Policy() self.optimizer = optim.Adam(self.model.parameters(), lr=3e-2) self.eps = np.finfo(np.float32).eps.item() def select_action(self, state): state = torch.from_numpy(state).float() probs, state_value = self.model(state) m = Categorical(probs) action = m.sample() self.model.saved_actions.append(SavedAction(m.log_prob(action), state_value)) return action.item() def finish_episode(self): R = 0 saved_actions = self.model.saved_actions # 计算每一个步骤的奖励 returns = [] for r in self.model.rewards[::-1]: R = r + args.gamma * R returns.insert(0, R) returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + self.eps) policy_losses = [] value_losses = [] for (log_prob, value), R in zip(saved_actions, returns): # 计算真实奖励与评论家函数估计的奖励之差,并将其用于计算演员函数的损失 advantage = R - value.item() policy_losses.append(-log_prob * advantage) # 评论家函数的损失使用平滑L1损失函数(与均方差损失类似,但更稳定) value_losses.append(F.smooth_l1_loss(value, torch.tensor([R]))) # 计算梯度并更新网络 self.optimizer.zero_grad() loss = torch.stack(policy_losses).sum() + torch.stack(value_losses).sum() loss.backward() self.optimizer.step() # 清理数据 del self.model.rewards[:] del self.model.saved_actions[:] def train(self): # 训练过程与策略梯度方法类似 running_reward = 10 for i_episode in count(1): state, _ = self.env.reset() ep_reward = 0 for t in range(1, 10000): action = self.select_action(state) state, reward, done, _, _ = self.env.step(action) self.model.rewards.append(reward) ep_reward += reward if done: break running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward self.finish_episode() # log results if i_episode % args.log_interval == 0: print("Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}".format(i_episode, ep_reward, running_reward)) # check if we have "solved" the cart pole problem if running_reward > self.env.spec.reward_threshold: print("Solved! Running reward is now {} and " "the last episode runs to {} time steps!".format(running_reward, t)) break
完整代码见这里。运行以上算法可以看到以下日志:
Episode 10 Last reward: 32.00 Average reward: 14.11
Episode 20 Last reward: 89.00 Average reward: 32.64
Episode 30 Last reward: 20.00 Average reward: 45.73
Episode 40 Last reward: 32.00 Average reward: 47.44
Episode 50 Last reward: 332.00 Average reward: 142.78
Episode 60 Last reward: 410.00 Average reward: 428.13
Solved! Running reward is now 476.8880228063767 and the last episode runs to 1334 time steps!
可以看到算法在经历 60 多次的迭代之后就有一个很好的效果了。

ChatGPT 的 RLHF 算法

ColossalAI 中使用的强化学习算法与上述算法基本一致,完整代码的入口在这里, 代码比较长,以下是简化后的代码:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
def main(args): strategy = ColossalAIStrategy(stage=2, placement_policy='cuda') with strategy.model_init_context(): initial_model = LlamaActor(pretrained=args.pretrain) reward_model = LlamaRM(pretrained=args.rm_pretrain) actor = LlamaActor(pretrained=args.pretrain, lora_rank=args.lora_rank) critic = LlamaCritic(pretrained=args.rm_pretrain, lora_rank=args.lora_rank, use_action_mask=True) actor_optim = HybridAdam(actor.parameters(), lr=1e-7) critic_optim = HybridAdam(critic.parameters(), lr=1e-7) tokenizer = LlamaTokenizer.from_pretrained(args.pretrain) tokenizer = prepare_llama_tokenizer_and_embedding(tokenizer, actor) prompt_dataset = PromptDataset(tokenizer=tokenizer, data_path=args.prompt_dataset, max_datasets_size=16384) prompt_dataloader = DataLoader(prompt_dataset, ...) pretrain_dataset = SupervisedDataset(tokenizer=tokenizer, ...) pretrain_dataloader = DataLoader(pretrain_dataset, ...) (actor, actor_optim), (critic, critic_optim) = strategy.prepare((actor, actor_optim), (critic, critic_optim)) # configure trainer trainer = PPOTrainer(strategy, actor, critic, reward_model, initial_model, ...) trainer.fit(prompt_dataloader, pretrain_dataloader, args.num_episodes, ...)class Critic(LoRAModule): def __init__(self, ...) -> None: ... self.convert_to_lora() # 将Critic模型转换为LoRA模型 def forward(self, sequences: torch.LongTensor, action_mask, attention_mask) -> torch.Tensor: outputs = self.model(sequences, attention_mask=attention_mask) # 模型前向传播 # 获取最后一层隐藏状态,并通过value_head线性层得到值函数估计值 last_hidden_states = outputs['last_hidden_state'] values = self.value_head(last_hidden_states).squeeze(-1) if action_mask is not None and self.use_action_mask: num_actions = action_mask.size(1) prompt_mask = attention_mask[:, :-num_actions] values = values[:, :-num_actions] value = masked_mean(values, prompt_mask, dim=1) # 根据动作掩码计算平均值 return value values = values[:, :-1] value = values.mean(dim=1) # 计算平均值作为最终的评论家函数估计值 return valueclass LlamaCritic(Critic): def __init__(self, ...) -> None: model = LlamaModel.from_pretrained(pretrained) # 使用预训练的LlamaModel初始化模型 value_head = nn.Linear(model.config.hidden_size, 1) # 使用线性层作为评论家函数头部class Actor(LoRAModule): def __init__(self, model: nn.Module, ...) -> None: self.convert_to_lora() # 将Actor模型转换为LoRA模型 @torch.no_grad() def generate(self, input_ids: torch.Tensor, return_action_mask: bool = True, **kwargs): sequences = generate(self.model, input_ids, **kwargs) # 生成序列 attention_mask = None attention_mask = sequences.not_equal(pad_token_id) # 生成注意力掩码 # left padding may be applied, only mask action action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0 # 生成动作掩码 action_mask = F.pad(action_mask, (1 + input_len, -1), value=True) # include eos token and input action_mask[:, :input_len] = False action_mask = action_mask[:, 1:] # 返回生成的序列、注意力掩码和动作掩码 return sequences, attention_mask, action_mask[:, -(sequences.size(1) - input_len):] def forward(self, sequences: torch.LongTensor, num_actions: int, attention_mask): output = self.model(sequences, attention_mask=attention_mask) # 模型前向传播 # 从logits计算动作的对数概率 log_probs = log_probs_from_logits(output['logits'][:, :-1, :], sequences[:, 1:]) return log_probs[:, -num_actions:] # 返回动作的对数概率class LlamaActor(Actor): def __init__(self, ...) -> None: model = LlamaForCausalLM.from_pretrained(pretrained) # 使用预训练的LlamaForCausalLM初始化模型class PPOTrainer(Trainer): def __init__(self, ...) -> None: # 初始化PPO训练器的各个组件和参数 self.experience_maker = NaiveExperienceMaker(actor, critic, reward_model, initial_model, kl_coef) self.replay_buffer = NaiveReplayBuffer(train_batch_size, buffer_limit, buffer_cpu_offload) self.actor = actor self.critic = critic self.actor_loss_fn = PolicyLoss(eps_clip) # 演员损失函数 self.critic_loss_fn = ValueLoss(value_clip) # 评论家损失函数 self.vf_coef = vf_coef self.ptx_loss_fn = GPTLMLoss() # 预训练损失函数 self.ptx_coef = ptx_coef self.actor_optim = actor_optim # 演员网络的优化器 self.critic_optim = critic_optim # 评论家网络的优化器 def _learn(self): # 根据是否使用重放缓冲区选择不同的训练方式 if self.sample_replay_buffer: for _ in range(self.max_epochs): experience = self.replay_buffer.sample() # 从重放缓冲区中采样经验 metrics = self.training_step(experience) # 执行训练步骤 else: for epoch in range(self.max_epochs): for experience in dataloader: # 从数据集中获取经验 metrics = self.training_step(experience) # 执行训练步骤 def fit(self, prompt_dataloader, pretrain_dataloader) -> None: time = 0 for episode in range(num_episodes): for timestep in range(max_timesteps): time += 1 prompts = next(iter(self.prompt_dataloader)) # 获取输入提示数据 # 生成经验,这里可以支持在线和人进行对话 experience = self.experience_maker.make_experience(prompts, **self.generate_kwargs) self.replay_buffer.append(experience) # 将经验添加到重放缓冲区 if time % update_timesteps == 0: self._learn() # 执行模型更新 self.replay_buffer.clear() # 清空重放缓冲区 def training_step(self, experience: Experience) -> Dict[str, float]: self.actor.train() # 设置演员网络为训练模式 self.critic.train() # 设置评论家网络为训练模式 # 计算演员网络的动作对数概率 num_actions = experience.action_mask.size(1) action_log_probs = self.actor(experience.sequences, num_actions, attention_mask=experience.attention_mask) # 计算演员损失函数 actor_loss = self.actor_loss_fn( action_log_probs, experience.action_log_probs, experience.advantages, experience.action_mask) # 计算预训练损失函数 if self.ptx_coef != 0: batch = next(iter(self.pretrain_dataloader)) ptx_log_probs = self.actor.get_base_model()(batch['input_ids'], attention_mask=batch['attention_mask'])['logits'] ptx_loss = self.ptx_loss_fn(ptx_log_probs, batch['labels']) actor_loss = ptx_loss * self.ptx_coef + actor_loss * (1 - self.ptx_coef) self.strategy.backward(actor_loss, self.actor, self.actor_optim) # 演员网络的反向传播 self.strategy.optimizer_step(self.actor_optim) # 演员网络的优化器步骤 self.actor_optim.zero_grad() # 清空演员网络的梯度 # 计算评论家损失函数 values = self.critic(experience.sequences, experience.action_mask, experience.attention_mask) critic_loss = self.critic_loss_fn(values, experience.values, experience.reward, experience.action_mask) critic_loss = critic_loss * self.vf_coef self.strategy.backward(critic_loss, self.critic, self.critic_optim) # 评论家网络的反向传播 self.strategy.optimizer_step(self.critic_optim) # 评论家网络的优化器步骤 self.critic_optim.zero_grad() # 清空评论家网络的梯度class GPTLMLoss(nn.Module): def __init__(self): super().__init__() self.loss = nn.CrossEntropyLoss() def forward(self, logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: # 将logits向左移动一位,去掉最后一个时间步的预测 shift_logits = logits[..., :-1, :].contiguous() # 将标签向右移动一位,去掉第一个时间步的标签 shift_labels = labels[..., 1:].contiguous() # 计算交叉熵损失 return self.loss(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))class PolicyLoss(nn.Module): def __init__(self, clip_eps: float = 0.2) -> None: super().__init__() self.clip_eps = clip_eps def forward(self, log_probs, old_log_probs, advantages, action_mask) -> torch.Tensor: # 计算当前动作对数概率和旧动作对数概率的比例 ratio = (log_probs - old_log_probs).exp() surr1 = ratio * advantages # 第一项损失计算 surr2 = ratio.clamp(1 - self.clip_eps, 1 + self.clip_eps) * advantages # 第二项损失计算 loss = -torch.min(surr1, surr2) # 选取较小的损失 if action_mask is not None: loss = masked_mean(loss, action_mask) # 根据动作掩码计算平均损失 loss = loss.mean() # 计算平均损失 return lossclass ValueLoss(nn.Module): def __init__(self, clip_eps: float = 0.4) -> None: super().__init__() self.clip_eps = clip_eps def forward(self, values, old_values, reward, action_mask) -> torch.Tensor: # 对奖励进行裁剪 values_clipped = old_values + (values - old_values).clamp(-self.clip_eps, self.clip_eps) surr1 = (values_clipped - reward)**2 # 第一项损失计算 surr2 = (values - reward)**2 # 第二项损失计算 loss = torch.max(surr1, surr2) # 选取较大的损失 loss = loss.mean() # 计算平均损失 return 0.5 * lossclass NaiveExperienceMaker(ExperienceMaker): @torch.no_grad() def make_experience(self, input_ids: torch.Tensor, **generate_kwargs) -> Experience: # 基于演员函数生成回复及掩码 sequences, attention_mask, action_mask = self.actor.generate(input_ids, ...) num_actions = action_mask.size(1) # 获取动作的数量 # 计算动作的对数概率 action_log_probs = self.actor(sequences, num_actions, attention_mask) # 使用初始模型计算动作的对数概率 base_action_log_probs = self.initial_model(sequences, num_actions, attention_mask) # 计算价值 value = self.critic(sequences, action_mask, attention_mask) # 计算基础奖励值 r = self.reward_model(sequences, attention_mask) # 基于动作概率调整奖励值 reward = compute_reward(r, self.kl_coef, action_log_probs, base_action_log_probs, action_mask=action_mask) # 计算优势函数 advantage = reward - value return Experience(...) # 返回经验def compute_reward(r, kl_coef, log_probs, log_probs_base, action_mask) -> torch.Tensor: kl = compute_approx_kl(log_probs, log_probs_base, action_mask=action_mask) # 计算KL散度 reward = r - kl_coef * kl # 计算奖励 return rewarddef compute_approx_kl(log_probs, log_probs_base, action_mask) -> torch.Tensor: log_ratio = log_probs - log_probs_base # 计算对数概率之间的差异 approx_kl = (log_ratio.exp() - 1) - log_ratio # 计算近似KL散度 if action_mask is not None: approx_kl = masked_mean(approx_kl, action_mask, dim=1) # 根据动作掩码计算平均KL散度 return approx_kl approx_kl = approx_kl.mean(dim=1) # 计算平均KL散度 return approx_kl
上述代码中用到了 KL 散度。KL 散度(Kullback-Leibler divergence)是一种用于衡量两个概率分布之间差异的指标。在信息论和统计学中广泛应用。
给定两个离散概率分布 P 和 Q,它们的 KL 散度定义为:KL(P || Q) = Σ P(i) * log(P(i) / Q(i)) 其中,P (i) 和 Q (i) 分别表示 P 和 Q 在第 i 个事件上的概率。
KL 散度不具备对称性,即 KL (P || Q) ≠ KL (Q || P)。它度量的是从 P 到 Q 的信息损失或差异。KL 散度的值为非负数,当且仅当 P 和 Q 相等时,KL 散度等于 0。当 P 和 Q 之间的差异增大时,KL 散度的值也会增大。
在深度学习中,KL 散度常用于衡量生成模型生成的样本分布与真实数据分布之间的差异。通过最小化 KL 散度,可以使生成模型逼近真实数据分布,从而提高生成样本的质量。在上述代码中,KL 散度被用于计算奖励信号。通过比较动作对数概率与基准动作对数概率之间的差异,可以衡量动作选择与基准模型之间的差异程度,进而调整奖励的大小。

RLHF 算法总结

回顾 RLHF 算法的过程,可以看到,由于我们之前训练了一个奖励函数,RLHF 算法在执行过程中,可以没有人类的参与而自动进行。奖励函数代替人给出了对于模型生成的回复的质量的反馈。
到这里,大家可以理解为什么 ChatGPT 可以如此智能的回复大家的任意的自然语言问题了吧?OpenAI 开放 ChatGPT 模型给大家使用,随着大家使用越多,OpenAI 就可以根据 RLHF 的算法让模型接触到更多的对话,从而基于这些对话自动的优化 ChatGPT!

总结

到这里,我们就分析完了所有的 ChatGPT 类模型的训练和微调、RLHF 微调的代码。在分析代码时,我们有意忽略了很多细节及模型并行处理的部分代码,这些对于我们理解模型帮助不大。
到这里大家应该对 ChatGPT 类模型的训练有一个较为深入的认识了。
自 ChatGPT 发布以来,很多人认为这是一个人类走向通用人工智能的突破,也有一些人认为它其实没什么本质的改进。有很多人对自己的职业发展产生了很深的焦虑感,也有很多人感觉触碰到了科幻世界中的未来,还有很多人觉得又是一个可以好好捞一把的机会。
也许每个人都有必要去了解一下机器学习技术的原理,这样才能形成对它的理性的认知。
ChatGPT 的内容很多,我计划采用一个系列,多篇文章来分享学习我自己学习过程中的一些理解。本系列文章,我将站在一个普通开发人员的角度展开,希望对想了解 ChatGPT 技术原理的普通开发者们有帮助。
这是此系列的第五篇,ChatGPT 的自动优化。

参考

大语言模型入门指南ChatGPT 的模型训练
Loading...