本文系统讲解了从Q学习到PlaNet等18种强化学习算法的原理与代码实现,并提供了Jupyter Notebook环境下的Python代码,方便读者动手实践。
原文标题:18个常用的强化学习算法整理:从基础方法到高级模型的理论技术与代码实现(上)
原文作者:数据派THU
冷月清谈:
怜星夜思:
2、Dyna-Q算法里,智能体通过学习环境模型来进行规划,这个“模型”具体是什么?感觉有点抽象,它和我们常说的监督学习模型有什么区别?
3、文章提到了A3C算法通过异步更新来提高训练效率,那是不是并行度越高越好呢?会不会出现什么问题?
原文内容
来源:Deephub Imba本文共17000字,建议阅读20分钟本文系统讲解从基本强化学习方法到高级技术(如PPO、A3C、PlaNet等)的实现原理与编码过程,旨在通过理论结合代码的方式,构建对强化学习算法的全面理解。
├── 1_simple_rl.ipynb ├── 2_q_learning.ipynb ├── 3_sarsa.ipynb ... ├── 9_a3c.ipynb ├── 10_ddpg.ipynb ├── 11_sac.ipynb ├── 12_trpo.ipynb ... ├── 17_mcts.ipynb └── 18_planet.ipynb
说明:github地址见文章最后,文章很长所以可以根据需求查看感兴趣的强化学习方法介绍和对应notebook。
搭建环境
# 克隆并导航到目录 git clone https://github.com/fareedkhan-dev/all-rl-algorithms.git cd all-rl-algorithms安装所需的依赖项
pip install -r requirements.txt
接下来,导入核心库:— 核心Python库 —
import random
import math
from collections import defaultdict, deque, namedtuple
from typing import List, Tuple, Dict, Optional, Any, DefaultDict # 用于代码中的类型提示— 数值计算 —
import numpy as np
— 机器学习框架(PyTorch - 从REINFORCE开始广泛使用) —
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical, Normal # 用于策略梯度、SAC、PlaNet等— 环境 —
用于加载标准环境,如Pendulum
import gymnasium as gym
注意:SimpleGridWorld类定义需要直接包含在代码中
因为它是博客文章中定义的自定义环境。
— 可视化(由博客中显示的图表暗示) —
import matplotlib.pyplot as plt
import seaborn as sns # 经常用于热力图— 可能用于异步方法(A3C) —
尽管在代码片段中没有明确展示,但A3C实现通常使用这些
import torch.multiprocessing as mp # 或标准的’multiprocessing’/‘threading’
— PyTorch设置(可选但是好习惯) —
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”)
print(f"Using device: {device}")— 禁用警告(可选) —
import warnings
warnings.filterwarnings(‘ignore’) # 抑制潜在的废弃警告等
强化学习环境设置
自定义网格世界(从头实现) 钟摆问题(使用OpenAI Gymnasium) # ------------------------------------- # 1. 简单自定义网格世界 # -------------------------------------class SimpleGridWorld:
“”" 一个基本的网格世界环境。 “”"
def init(self, size=5):
self.size = size
self.start_state = (0, 0)
self.goal_state = (size - 1, size - 1)
self.state = self.start_state动作: 0:上, 1:下, 2:左, 3:右
self.action_map = {0: (-1, 0), 1: (1, 0), 2: (0, -1), 3: (0, 1)}
self.action_space_size = 4def reset(self) -> Tuple[int, int]:
“”" 重置到初始状态。 “”"
self.state = self.start_state
return self.statedef step(self, action: int) -> Tuple[Tuple[int, int], float, bool]:
“”" 执行一个动作,返回next_state, reward, done。 “”"
if self.state == self.goal_state:
return self.state, 0.0, True # 在目标处停留计算潜在的下一个状态
dr, dc = self.action_map[action]
r, c = self.state
next_r, next_c = r + dr, c + dc应用边界(如果碰到墙壁则原地不动)
if not (0 <= next_r < self.size and 0 <= next_c < self.size):
next_r, next_c = r, c # 保持在当前状态
reward = -1.0 # 墙壁惩罚
else:
reward = -0.1 # 步骤成本更新状态
self.state = (next_r, next_c)
检查是否达到目标
done = (self.state == self.goal_state)
if done:
reward = 10.0 # 目标奖励
return self.state, reward, done
# ------------------------------------- # 2. 加载Gymnasium钟摆 # -------------------------------------pendulum_env = gym.make(‘Pendulum-v1’)
print(“Pendulum-v1 environment loaded.”)重置环境
observation, info = pendulum_env.reset(seed=42)
print(f"Initial Observation: {observation}“)
print(f"Observation Space: {pendulum_env.observation_space}”)
print(f"Action Space: {pendulum_env.action_space}")执行随机步骤
random_action = pendulum_env.action_space.sample()
observation, reward, terminated, truncated, info = pendulum_env.step(random_action)
done = terminated or truncated
print(f"Step with action {random_action}:“)
print(f” Next Obs: {observation}\n Reward: {reward}\n Done: {done}")关闭环境(如果使用了渲染则很重要)
pendulum_env.close()
1、最简单的强化学习算法
def choose_simple_action(state: Tuple[int, int], memory: DefaultDict[Tuple[int, int], DefaultDict[int, List[float]]], epsilon: float, n_actions: int) -> int: """ 基于平均即时奖励使用epsilon-greedy选择动作。 """ if random.random() < epsilon: return random.randrange(n_actions) # 探索 else: # 利用:找到具有最佳平均即时奖励的动作 state_action_memory = memory[state] best_avg_reward = -float('inf') best_actions = []for action_idx in range(n_actions):
rewards = state_action_memory[action_idx]如果从这个状态从未尝试过这个动作,将其平均奖励视为0或非常低
avg_reward = np.mean(rewards) if rewards else 0.0
if avg_reward > best_avg_reward:
best_avg_reward = avg_reward
best_actions = [action_idx]
elif avg_reward == best_avg_reward:
best_actions.append(action_idx)如果没有动作有积极奖励或状态未访问,随机选择
if not best_actions:
return random.randrange(n_actions)随机打破平局
return random.choice(best_actions)
def update_simple_memory(memory: DefaultDict[Tuple[int, int], DefaultDict[int, List[float]]], state: Tuple[int, int], action: int, reward: float) -> None: """ 为状态-动作对添加接收到的奖励到记忆中。 """ memory[state][action].append(reward)
-
奖励趋势:顶部图表显示了每个回合的总奖励。在早期阶段,智能体表现不佳(获得负奖励),但随着时间推移,其性能逐渐改善,这从移动平均线(橙色)的上升趋势可以明确看出。
-
状态访问频率:左下角热力图显示了智能体访问各个状态的频率。靠近起点的区域颜色较亮,表明探索集中在这些区域,而目标状态(G)的访问频率相对较低。
-
最佳动作奖励估计:中间热力图表示智能体对每个状态中最佳动作的即时奖励估计。大部分值较低,除了给予高奖励的目标状态附近区域。
-
动作选择分布:右下角条形图显示了智能体从起始状态(0,0)最常选择的动作。数据表明智能体倾向于向右和向下移动,这与通向目标的最优路径方向一致。
2、Q学习
-
我们计算一个新的值估计:当前奖励 + 折扣因子 * 最佳未来值
-
计算这个新估计与当前Q值之间的差异(称为时序差分误差)
-
按照学习率确定的比例更新当前Q值
# 基于Q值的Epsilon-Greedy动作选择 def choose_q_learning_action(state: Tuple[int, int], q_table: DefaultDict[Tuple[int, int], Dict[int, float]], epsilon: float, n_actions: int) -> int: """ 基于Q表值使用epsilon-greedy选择动作。 """ if random.random() < epsilon: return random.randrange(n_actions) # 探索 else: # 利用:选择该状态下Q值最高的动作 q_values_for_state = q_table[state] if not q_values_for_state: # 如果状态未被访问/更新 return random.randrange(n_actions)max_q = -float(‘inf’)
best_actions =遍历可用动作(0到n_actions-1)
for action_idx in range(n_actions):
q_val = q_values_for_state[action_idx] # defaultdict如果键缺失则返回0
if q_val > max_q:
max_q = q_val
best_actions = [action_idx]
elif q_val == max_q:
best_actions.append(action_idx)
return random.choice(best_actions) # 随机打破平局
# Q学习更新规则 def update_q_value(q_table: DefaultDict[Tuple[int, int], Dict[int, float]], state: Tuple[int, int], action: int, reward: float, next_state: Tuple[int, int], alpha: float, # 学习率 gamma: float, # 折扣因子 n_actions: int, is_done: bool) -> None: """ 执行单个Q学习更新步骤。 """获取当前Q值估计
current_q = q_table[state][action]
找到下一个状态的最大Q值(Q学习特有)
这代表了从下一个状态可达到的最佳可能值。
max_next_q = -float(‘inf’)
if not is_done and q_table[next_state]: # 检查next_state是否有条目
max_next_q = max(q_table[next_state].values())
elif is_done:
max_next_q = 0.0 # 如果回合结束,则无未来奖励
else:
max_next_q = 0.0 # 如果next_state还没有条目计算TD目标:R + gamma * max(Q(s’, a’))
td_target = reward + gamma * max_next_q
计算TD误差:TD目标 - Q(s, a)
td_error = td_target - current_q
更新Q值:Q(s, a) <- Q(s, a) + alpha * TD_Error
q_table[state][action] = current_q + alpha * td_error
-
奖励和回合长度:顶部图表显示了随时间变化的累积奖励和回合长度。初始阶段,回合较长,表明智能体在探索低效路径。随着学习进行,回合长度明显减少,表明学习进程有效。
-
动作Q值分布:四个热力图展示了向上、下、左、右四个动作方向的Q值。黄色区域(高值)表示靠近目标状态的优先动作,而深色区域(低值)表示较不优化的选择。
-
学习策略可视化:最右侧的图表直观展示了最终学习到的策略,箭头指示各状态的最优动作方向,清晰地显示了通向目标的路径。
3、Sarsa
# Epsilon-Greedy动作选择(与Q学习相同的函数) def choose_sarsa_action(state: Tuple[int, int], q_table: DefaultDict[Tuple[int, int], Dict[int, float]], epsilon: float, n_actions: int) -> int: """ 基于Q表值使用epsilon-greedy选择动作。 """ if random.random() < epsilon: return random.randrange(n_actions) # 探索 else: # 利用:选择Q值最高的动作 q_values_for_state = q_table[state] if not q_values_for_state: return random.randrange(n_actions) max_q = -float('inf') best_actions = [] for action_idx in range(n_actions): q_val = q_values_for_state[action_idx] # 默认为0.0 if q_val > max_q: max_q = q_val best_actions = [action_idx] elif q_val == max_q: best_actions.append(action_idx) return random.choice(best_actions) if best_actions else random.randrange(n_actions)
# SARSA更新规则 def update_sarsa_value(q_table: DefaultDict[Tuple[int, int], Dict[int, float]], state: Tuple[int, int], action: int, reward: float, next_state: Tuple[int, int], next_action: int, # 为下一步实际选择的动作 alpha: float, gamma: float, is_done: bool) -> None: """ 执行单个SARSA更新步骤。 """获取当前Q值估计
current_q = q_table[state][action]
获取下一个状态和策略选择的下一个动作的Q值
这是与Q学习的核心区别
q_next_state_action = 0.0
if not is_done:
q_next_state_action = q_table[next_state][next_action] # 使用Q(s’, a’)计算TD目标:R + gamma * Q(s’, a’)
td_target = reward + gamma * q_next_state_action
计算TD误差:TD目标 - Q(s, a)
td_error = td_target - current_q
更新Q值:Q(s, a) <- Q(s, a) + alpha * TD_Error
q_table[state][action] = current_q + alpha * td_error
-
学习进展:回合长度显著减少,表明智能体学会了更高效地到达目标。
-
奖励表现:奖励总体较高但存在波动,反映了一个有效但不完全稳定的策略,这可能是由于持续的探索所致。
-
学习策略:Q值分布和最终策略图表显示智能体成功学习了指向目标状态('T')的动作序列。
4、期望Sarsa
# 期望SARSA更新规则 def update_expected_sarsa_value( q_table: DefaultDict[Tuple[int, int], Dict[str, float]], state: Tuple[int, int], action: int, # 现在使用整数动作索引 reward: float, next_state: Tuple[int, int], alpha: float, gamma: float, epsilon: float, # 计算期望值需要当前epsilon n_actions: int, is_done: bool ) -> None: """ 执行单个期望SARSA更新步骤。 """获取当前Q值估计
current_q = q_table[state][action]
计算下一个状态的期望Q值
expected_q_next = 0.0
if not is_done and q_table[next_state]: # 检查next_state是否存在且有条目
q_values_next_state = q_table[next_state]
if q_values_next_state: # 检查字典是否非空
max_q_next = max(q_values_next_state.values())找到所有最佳动作(处理平局)
best_actions = [a for a, q in q_values_next_state.items() if q == max_q_next]
在epsilon-greedy下计算概率
prob_greedy = (1.0 - epsilon) / len(best_actions) # 将贪婪概率分配给最佳动作
prob_explore = epsilon / n_actions计算期望值 E[Q(s’, A’)] = sum[ pi(a’|s’) * Q(s’, a’) ]
for a_prime in range(n_actions):
prob_a_prime = 0.0
if a_prime in best_actions:
prob_a_prime += prob_greedy # 添加贪婪概率
prob_a_prime += prob_explore # 添加探索概率(适用于所有动作)expected_q_next += prob_a_prime * q_values_next_state.get(a_prime, 0.0) # 如果动作未见则默认为0
TD目标: R + gamma * E[Q(s’, A’)]
td_target = reward + gamma * expected_q_next
TD误差: TD目标 - Q(s, a)
td_error = td_target - current_q
更新Q值
q_table[state][action] = current_q + alpha * td_error
-
学习进展分析:回合长度迅速降低,表明智能体快速掌握了到达目标的高效路径。
-
奖励性能评估:初始学习阶段后,智能体能够频繁且稳定地获得接近10的高奖励值,证明其策略有效性。
-
学习策略分析:Q值矩阵清晰地识别出了首选动作,最终策略网格展示了智能体形成的合理路径,能够可靠地引导至终端状态('T')。
5、Dyna Q
-
直接强化学习(RL):利用真实经验(s, a, r, s')更新Q值Q(s, a),与标准Q学习算法相同。
-
模型学习:更新内部环境模型,记录状态s执行动作a导致奖励r和后继状态s'的转移关系。
-
规划:执行k次额外更新,每次随机选择先前经历过的状态-动作对(s_p, a_p),查询学习到的模型获取预测奖励r_p和预测后继状态s_p',然后使用这些模拟经验(s_p, a_p, r_p, s_p')执行Q学习更新。
-
Q表:维护Q(s,a)估计值。在最基本的表格形式中,这是一个字典结构,存储先前访问过的状态-动作对的观察结果(奖励和后继状态):Model(s,a)→(r,s′)。
# Q表:q_table[(state_tuple)][action_index] -> q_value(与之前相同) q_table_dynaq: DefaultDict[Tuple[int, int], Dict[int, float]] = \ defaultdict(lambda: defaultdict(float))模型:model[(state_tuple, action_index)] -> (reward, next_state_tuple)
model_dynaq: Dict[Tuple[Tuple[int, int], int], Tuple[float, Tuple[int, int]]] = {}
跟踪已观察到的状态-动作对,用于规划期间的采样
observed_state_actions: List[Tuple[Tuple[int, int], int]] =
# 模型更新函数 def update_model(model: Dict[Tuple[Tuple[int, int], int], Tuple[float, Tuple[int, int]]], observed_pairs: List[Tuple[Tuple[int, int], int]], state: Tuple[int, int], action: int, reward: float, next_state: Tuple[int, int]) -> None: """ 更新确定性表格模型和已观察对列表。 """ state_action = (state, action) model[state_action] = (reward, next_state) # 存储结果如果这个对之前未见过,则添加到列表中
if state_action not in observed_pairs:
observed_pairs.append(state_action)
# 规划步骤函数 def planning_steps(k: int, # 规划步骤数 q_table: DefaultDict[Tuple[int, int], Dict[int, float]], model: Dict[Tuple[Tuple[int, int], int], Tuple[float, Tuple[int, int]]], observed_pairs: List[Tuple[Tuple[int, int], int]], alpha: float, gamma: float, n_actions: int) -> None: """ 使用模型执行'k'次模拟Q学习更新。 """ if not observed_pairs: # 没有观察就无法规划 returnfor _ in range(k):
1. 采样随机先前观察到的状态-动作对
state_p, action_p = random.choice(observed_pairs)
2. 查询模型获取模拟结果
reward_p, next_state_p = model[(state_p, action_p)]
3. 使用模拟经验应用Q学习更新
(假设模拟步骤不会结束回合,除非模型表示如此)
这里需要Q学习部分的update_q_value函数。
update_q_value(q_table, state_p, action_p, reward_p, next_state_p,
alpha, gamma, n_actions, is_done=False) # 在模拟中假设未结束(更复杂的模型可以预测’done’)
-
学习进展分析:观察到极为快速的收敛过程。回合长度迅速下降,表明智能体能够非常高效地学习到最优路径。
-
奖励性能分析:总奖励从初始负值陡峭上升至持续稳定的高正值,展示了快速有效的策略学习过程。
-
效率评估(Dyna-Q特性):步数和奖励的快速改善明显体现了Dyna-Q规划机制(k=50)的优势,通过利用学习模型显著加速学习过程。
6、Reinforce
-
基于当前状态s采样一个动作a。
-
存储该动作的对数概率(log π(a|s))。
-
记录获取的奖励r。
-
对于后续获得高回报(G_t)的动作,更新过程增加其对数概率(log π(a|s))。
-
对于后续获得低回报的动作,更新过程降低其对数概率。
# 定义策略网络架构 class PolicyNetwork(nn.Module): def __init__(self, n_observations: int, n_actions: int): super(PolicyNetwork, self).__init__() self.layer1 = nn.Linear(n_observations, 128) self.layer2 = nn.Linear(128, 128) self.layer3 = nn.Linear(128, n_actions) # 输出logitsdef forward(self, x: torch.Tensor) -> Categorical:
“”" 前向传播,返回动作的Categorical分布。 “”"
if not isinstance(x, torch.Tensor):
x = torch.tensor(x, dtype=torch.float32, device=device) # 假设’device’已定义
elif x.dtype != torch.float32:
x = x.to(dtype=torch.float32)
if x.dim() == 1: x = x.unsqueeze(0)x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
action_logits = self.layer3(x)直接从logits创建Categorical分布
return Categorical(logits=action_logits)
# 通过采样选择动作 def select_action_reinforce(state: torch.Tensor, policy_net: PolicyNetwork) -> Tuple[int, torch.Tensor]: """ 通过从策略网络的输出分布采样选择动作。 """ action_dist = policy_net(state) action = action_dist.sample() # 采样动作 log_prob = action_dist.log_prob(action) # 获取所选动作的对数概率 return action.item(), log_prob
# 计算折扣回报 def calculate_discounted_returns(rewards: List[float], gamma: float, standardize: bool = True) -> torch.Tensor: """ 计算每一步t的折扣回报G_t,可选择标准化。 """ n_steps = len(rewards) returns = torch.zeros(n_steps, dtype=torch.float32) # 保持在CPU上进行计算 discounted_return = 0.0 # 反向迭代 for t in reversed(range(n_steps)): discounted_return = rewards[t] + gamma * discounted_return returns[t] = discounted_returnif standardize:
mean_return = torch.mean(returns)
std_return = torch.std(returns) + 1e-8 # 添加epsilon以保持稳定
returns = (returns - mean_return) / std_return
return returns.to(device) # 最后移动到适当的设备
# 策略更新步骤 def optimize_policy(log_probs: List[torch.Tensor], returns: torch.Tensor, optimizer: optim.Optimizer) -> float: """ 执行一次REINFORCE策略梯度更新。 """ # 堆叠对数概率并确保回报具有正确的形状 log_probs_tensor = torch.stack(log_probs) returns = returns.detach() # 在此更新中将回报视为固定目标计算损失:- Sum(G_t * log_pi(a_t|s_t))
我们最小化负目标
loss = -torch.sum(returns * log_probs_tensor)
执行优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss.item()
-
学习进展分析:智能体展现有效学习能力,从回合步骤(长度)的急剧减少并在低值稳定可以明确观察到。
-
奖励性能评估:总奖励从负值大幅提升并收敛到稳定的高正值,表明智能体成功学习了高效策略。
-
损失和稳定性分析:损失值在整个训练过程中表现出显著波动,移动平均线也未展示明确的收敛趋势。这突显了基本REINFORCE算法固有的高方差特性。
7、PPO(近端策略优化)
-
当前的新策略(π_new)对批次状态进行评估以获得新的动作对数概率。
-
计算新旧策略之间的概率比率r。
-
PPO应用裁剪目标(L_CLIP),基于裁剪参数(ϵ)限制r对更新的影响程度。
-
裁剪的策略损失(L_CLIP),确保策略更新稳定性。
-
值函数损失(L_VF),优化评论家网络。
-
熵正则项(S),鼓励足够的探索行为。
# PPO更新步骤(简化视图,假设数据已批处理) def update_ppo(actor: PolicyNetwork, critic: ValueNetwork, actor_optimizer: optim.Optimizer, critic_optimizer: optim.Optimizer, states: torch.Tensor, actions: torch.Tensor, log_probs_old: torch.Tensor, # 来自用于rollout的策略的对数概率 advantages: torch.Tensor, # 计算的GAE优势 returns_to_go: torch.Tensor, # 值函数的目标(Adv + V_old) ppo_epochs: int, # 每批次的更新次数 ppo_clip_epsilon: float, # 裁剪参数ε value_loss_coeff: float, # 评论家损失权重 entropy_coeff: float) -> Tuple[float, float, float]: # 平均损失total_policy_loss = 0.0
total_value_loss = 0.0
total_entropy = 0.0数据在进入循环前应该被分离
advantages = advantages.detach()
log_probs_old = log_probs_old.detach()
returns_to_go = returns_to_go.detach()在同一批次上执行多个epochs的更新
for _ in range(ppo_epochs):
— Actor(策略)更新 —
policy_dist = actor(states) # 获取当前策略分布
log_probs_new = policy_dist.log_prob(actions) # 动作在新策略下的对数概率
entropy = policy_dist.entropy().mean() # 平均熵计算比率r(θ) = π_new / π_old
ratio = torch.exp(log_probs_new - log_probs_old)
计算裁剪的替代目标部分
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1.0 - ppo_clip_epsilon, 1.0 + ppo_clip_epsilon) * advantages策略损失:-(min(surr1, surr2)) - entropy_bonus
policy_loss = -torch.min(surr1, surr2).mean() - entropy_coeff * entropy
优化actor
actor_optimizer.zero_grad()
policy_loss.backward()
actor_optimizer.step()— Critic(值)更新 —
values_pred = critic(states).squeeze() # 使用当前critic预测V(s)
value_loss = F.mse_loss(values_pred, returns_to_go) # 与计算的回报比较优化critic
critic_optimizer.zero_grad()
(value_loss_coeff * value_loss).backward() # 在backward前缩放损失
critic_optimizer.step()累积统计
total_policy_loss += policy_loss.item()
total_value_loss += value_loss.item()
total_entropy += entropy.item()返回epochs上的平均损失
return total_policy_loss / ppo_epochs, total_value_loss / ppo_epochs, total_entropy / ppo_epochs
-
学习进展分析:PPO展示极为快速的学习能力;平均回合长度急剧下降并迅速稳定在较低水平。
-
奖励性能评估:平均奖励值呈陡峭上升趋势并稳定在接近最大可能值的水平,表明算法快速收敛到高效策略。
-
训练稳定性分析:策略目标(损失)和熵值均呈稳步下降趋势,表明策略持续改进并逐步减少探索。值函数损失虽有波动但整体保持在可控范围内。
-
学习策略评估:最终策略网格清晰展示了确定性且合理的导航路径,指向网格右下方的目标区域。
8、A2C(同步优势演员-评论家)
-
演员网络:策略函数π(a∣s;θ),输出动作概率分布。
-
评论家网络:值函数V(s;ϕ),估计状态价值。
-
优势函数:计算为Â_t≈R_t−V(s_t),其中R_t通常是n步回报或通过GAE计算。它衡量动作相对于状态平均预期回报的相对优势。
-
同步更新机制:梯度在一批经验数据上计算并同时应用到演员和评论家网络。
# A2C更新步骤 def update_a2c(actor: PolicyNetwork, critic: ValueNetwork, actor_optimizer: optim.Optimizer, critic_optimizer: optim.Optimizer, states: torch.Tensor, actions: torch.Tensor, advantages: torch.Tensor, # 计算的GAE优势 returns_to_go: torch.Tensor, # 值函数的目标(Adv + V_old) value_loss_coeff: float, # 评论家损失权重 entropy_coeff: float # 熵奖励权重 ) -> Tuple[float, float, float]: # 平均损失— 评估当前网络 —
policy_dist = actor(states)
log_probs = policy_dist.log_prob(actions) # 所采取动作的对数概率
entropy = policy_dist.entropy().mean() # 平均熵
values_pred = critic(states).squeeze() # 评论家的值预测— 计算损失 —
策略损失(演员):- E[log_pi * A_detached] - entropy_bonus
policy_loss = -(log_probs * advantages.detach()).mean() - entropy_coeff * entropy
值损失(评论家):MSE(V_pred, Returns_detached)
value_loss = F.mse_loss(values_pred, returns_to_go.detach())
— 优化演员 —
actor_optimizer.zero_grad()
policy_loss.backward() # 计算演员梯度
actor_optimizer.step() # 更新演员权重— 优化评论家 —
critic_optimizer.zero_grad()
评论家损失的反向传播(缩放后)
(value_loss_coeff * value_loss).backward()
critic_optimizer.step() # 更新评论家权重返回用于记录的损失(策略目标部分,值损失,熵)
return policy_loss.item() + entropy_coeff * entropy.item(), value_loss.item(), entropy.item()
-
学习进展与效率分析:智能体展现出强劲的学习能力。平均回合奖励从显著负值迅速上升,在约100-150次迭代后稳定收敛在接近最大可能值水平。同时,平均回合长度从最大步数限制急剧下降至低且稳定的值,表明智能体迅速掌握了高效路径规划能力。
-
值函数学习过程:值损失(MSE)在初始阶段呈上升趋势,这是因为评论家网络需要在快速变化的策略环境中学习准确的状态价值。损失在约75次迭代后达到峰值,随后随着策略稳定而稳步下降,表明评论家网络对状态价值的预测精度显著提高。
-
策略优化与稳定性:策略损失曲线呈现一定噪声,这是策略梯度方法的固有特性,反映了更新过程中的方差。然而,移动平均趋势显示整体损失呈下降趋势(尽管有波动),表明策略持续改进。
-
最终策略评估:策略网格展示了一个连贯且确定性的导航策略。方向箭头一致地指引智能体,形成通向位于网格右下区域目标位置的明确路径,证明算法成功收敛到特定的导航方案。
9、A3C (异步优势演员-评论家算法)
-
异步更新: 工作者之间无需相互等待,减少了计算资源的闲置时间,同时降低了更新之间的数据相关性。
-
全局与本地网络架构: 工作者定期与中央全局网络同步参数,但在本地进行梯度计算。
-
N步回报计算: 更新通常基于短序列(n步)内收集的奖励总和,再加上由评论家网络对n步后状态的价值估计。这种方法平衡了单步时序差分学习的偏差与完整蒙特卡洛回报的方差。
-
优势估计: 优势函数计算采用Â_t = R_t − V(s_t)公式,其中R_t表示n步回报。
# 共享的演员-评论家网络(与A2C结构相同) class ActorCriticNetwork(nn.Module): def __init__(self, n_observations: int, n_actions: int): super(ActorCriticNetwork, self).__init__() self.layer1 = nn.Linear(n_observations, 128) self.layer2 = nn.Linear(128, 128) self.actor_head = nn.Linear(128, n_actions) # 动作对数概率 self.critic_head = nn.Linear(128, 1) # 状态价值def forward(self, x: torch.Tensor) -> Tuple[Categorical, torch.Tensor]:
if not isinstance(x, torch.Tensor):
x = torch.tensor(x, dtype=torch.float32, device=x.device)
elif x.dtype != torch.float32:
x = x.to(dtype=torch.float32)
if x.dim() == 1: x = x.unsqueeze(0)x = F.relu(self.layer1(x))
shared_features = F.relu(self.layer2(x))
action_logits = self.actor_head(shared_features)
state_value = self.critic_head(shared_features)确保在添加批次维度的情况下值被压缩
if x.shape[0] == 1 and state_value.dim() > 0:
state_value = state_value.squeeze(0)
return Categorical(logits=action_logits.to(x.device)), state_value
# 计算N步回报和优势(在每个工作者内使用) def compute_n_step_returns_advantages(rewards: List[float], values: List[torch.Tensor], # 网络预测的V(s_t) bootstrap_value: torch.Tensor, # V(s_{t+n})预测,已分离 dones: List[float], # 完成标志(0.0或1.0) gamma: float ) -> Tuple[torch.Tensor, torch.Tensor]: """ 计算n步回报(评论家目标)和优势(演员指导)。""" n_steps = len(rewards) returns = torch.zeros(n_steps, dtype=torch.float32) # 在CPU上存储结果 advantages = torch.zeros(n_steps, dtype=torch.float32)分离用于优势计算的值(作为基线的一部分)
values_detached = torch.cat([v.detach() for v in values]).squeeze().cpu()
R = bootstrap_value.detach().cpu() # 从自举值开始for t in reversed(range(n_steps)):
R = rewards[t] + gamma * R * (1.0 - dones[t]) # 计算n步回报
returns[t] = R确保values_detached具有正确的形状以进行优势计算
value_t = values_detached if values_detached.dim() == 0 else values_detached[t]
advantages[t] = R - value_t # 优势 = N步回报 - V(s_t)可选:标准化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
return returns, advantages
# --- 工作者损失计算(概念性 - 由每个工作者执行) --- # 假设'log_probs_tensor'、'values_pred_tensor'、'entropies_tensor'包含 # n步展开的网络输出, # 而'returns_tensor'、'advantages_tensor'包含计算出的目标。policy_loss = -(log_probs_tensor * advantages_tensor.detach()).mean()
value_loss = F.mse_loss(values_pred_tensor, returns_tensor.detach())
entropy_loss = -entropies_tensor.mean()total_loss = policy_loss + value_loss_coeff * value_loss + entropy_coeff * entropy_loss
— 梯度应用(概念性) —
global_optimizer.zero_grad()
total_loss.backward() # 计算本地模型上的梯度将梯度从local_model.parameters()传输到global_model.parameters()
global_optimizer.step() # 将梯度应用于全局模型
-
奖励进展: 智能体从初始负分状态显著改善,移动平均线表明在约300回合后收敛至稳定的高正奖励水平。
-
效率提升: 回合长度在250回合左右出现显著下降,从最大步数减少至稳定的较低平均值,表明任务完成效率大幅提高。
-
学习效果: 图表清晰展示了A3C的有效学习过程,从初始探索/低效阶段转变为稳定高效的策略,持续有效地解决任务。
10、DDPG (用于连续动作的演员-评论家算法)
-
确定性演员网络: 与REINFORCE/A2C/PPO等输出动作概率分布的算法不同,DDPG的演员网络直接为给定状态输出一个确定性动作向量。
-
Q值评论家网络: 评论家网络学习状态-动作值函数Q(s, a),类似于Q学习,但针对连续动作空间进行了适配,评估在状态(s)中执行演员选择的连续动作(a)的价值。
-
离策略学习: 采用经验回放缓冲区存储交互经验并随机采样小批量数据进行学习,实现从历史数据中进行稳定学习,类似于DQN的机制。
-
目标网络机制: 为演员和评论家网络均设置独立的目标网络,用于计算评论家的目标值,显著提高学习稳定性。
-
探索策略: 由于算法采用确定性策略,需要手动添加探索机制,通常通过向演员网络输出动作添加噪声(如高斯噪声或奥恩斯坦-乌伦贝克过程噪声)实现。
-
评论家网络更新: 目标Q值(TD目标y)通过结合即时奖励(r)、目标演员和目标评论家网络对下一状态(s')的评估计算得出。主评论家网络计算当前批次状态和动作的Q(s, a)值。系统使用目标(y)与Q(s, a)之间的均方误差(MSE损失)更新主评论家网络参数(ϕ)。
-
演员网络更新: 演员网络为批次状态(s)计算确定性动作。这些动作作为输入传递给主评论家网络以获取对应的Q值评估。演员网络通过最大化这些Q值(即最小化负Q值,L_θ = −Q)进行参数更新。
-
目标网络更新: 目标演员和目标评论家网络(θ', ϕ')通过软更新机制逐步向主网络参数靠近,以平滑学习过程并提高稳定性。
# 简化的DDPG演员网络(输出连续动作) class ActorNetworkDDPG(nn.Module): def __init__(self, state_dim: int, action_dim: int, max_action: float): super(ActorNetworkDDPG, self).__init__() self.layer1 = nn.Linear(state_dim, 256) self.layer2 = nn.Linear(256, 256) self.layer3 = nn.Linear(256, action_dim) self.max_action = max_action # 用于缩放输出def forward(self, state: torch.Tensor) -> torch.Tensor:
x = F.relu(self.layer1(state))
x = F.relu(self.layer2(x))使用tanh输出-1到1之间的值,然后缩放
action = self.max_action * torch.tanh(self.layer3(x))
return action简化的DDPG评论家网络(接收状态和动作)
class CriticNetworkDDPG(nn.Module):
def init(self, state_dim: int, action_dim: int):
super(CriticNetworkDDPG, self).init()分别或一起处理状态和动作
self.layer1 = nn.Linear(state_dim + action_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, 1) # 输出单个Q值def forward(self, state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
连接状态和动作
x = torch.cat([state, action], dim=1)
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
q_value = self.layer3(x)
return q_value
# 回放缓冲区(概念 - 使用deque或list) # 目标网络(概念 - 创建演员/评论家的副本) target_actor = ActorNetworkDDPG(...) target_critic = CriticNetworkDDPG(...) target_actor.load_state_dict(actor.state_dict()) # 初始化 target_critic.load_state_dict(critic.state_dict())
# --- DDPG更新逻辑概述 --- # (假设优化器已定义: actor_optimizer, critic_optimizer) # (假设tau: 软更新率, gamma: 折扣因子已定义)1. 从replay_buffer采样批次: states, actions, rewards, next_states, dones
— 评论家更新 —
从目标演员获取下一动作: next_actions = target_actor(next_states)
从目标评论家获取目标Q值: target_q = target_critic(next_states, next_actions)
计算TD目标: td_target = rewards + gamma * (1 - dones) * target_q
获取当前Q值估计: current_q = critic(states, actions)
计算评论家损失(MSE): critic_loss = F.mse_loss(current_q, td_target.detach())
更新评论家:
critic_optimizer.zero_grad()
critic_loss.backward()
critic_optimizer.step()— 演员更新 —
从主演员获取当前状态的动作: actor_actions = actor(states)
计算演员损失(主评论家的负Q值): actor_loss = -critic(states, actor_actions).mean()
更新演员:
actor_optimizer.zero_grad()
actor_loss.backward()
actor_optimizer.step()— 软更新目标网络 —
soft_update(target_critic, critic, tau)
soft_update(target_actor, actor, tau)
-
奖励进展: 智能体展现出明确的学习过程。每回合总奖励(左图)从极低水平(约-1500)开始稳步提升,移动平均线在约100回合处达到-250左右的水平。这表明在控制钟摆以最小化成本函数方面取得了显著进步。尽管性能曲线存在波动,但整体呈现明显的正向趋势。
-
评论家学习: 平均评论家损失(中图)在整个训练过程中呈现稳步增长趋势。这一现象在直觉上可能令人困惑,但在DDPG框架中,随着演员性能提升并探索到更高价值状态,目标Q值也相应增加。评论家网络不断适应这些演化的更高价值目标,因此损失增加实际上可能与学习成功同步发生,反映了价值函数适应过程中规模/复杂度的增加。
-
演员学习: 平均演员"损失"(右图,标记为Avg -Q Value,实际表示演员实现的平均Q值)呈现明显的上升趋势,与奖励进展高度相关。这表明演员成功学习选择评论家评估为高价值(低成本)的动作,推动整体性能提升。图表末端的平缓趋势暗示学习可能正在趋于收敛。
























