策略评估的目标是估计一个给定策略 π 的状态价值函数 Vπ(s) 或者动作价值函数 Qπ(s,a)。策略评估是指计算在一个固定策略 π 下,每个状态 s 的预期累积回报,即状态价值函数:
Vπ(s)=Eπ[Gt∣St=s]
其中 Gt 是时间 t 时刻以后的总回报,通常是一个折扣的累积回报:
Gt=Rt+1+γRt+2+γ2Rt+3+⋯=k=0∑∞γkRt+k+1
策略评估主要通过贝尔曼方程来迭代更新状态价值:
Vπ(s)=Eπ[Rt+1+γVπ(St+1)∣St=s]
该方程表示当前状态 s 的价值是当前收益 Rt+1 加上未来状态 St+1 的折扣价值。
伪代码如下:
代码实现如下:
# a random policy
rng = np.random.RandomState(1234)
pi_rand = softmax(rng.rand(env.nS, env.nA)*5, axis=1)
pi_rand #
# pi_rand是一个64 (state) x 4 (action)的矩阵,代表在某个state下执行四个action的概率
def policy_eval(pi, V, env, theta=1e-4, gamma=.99, show_value=False):
# except v(terminal) = 0
for s in env.s_termination:
V[s] = 0
# loop until convergence
while True:
delta = 0
for s in env.S:
if s not in env.s_termination:
v_old = V[s].copy()
v_new = 0
for a in env.A:
p = env.p_s_next(s, a)
for s_next in env.S:
r, done = env.r(s_next)
v_new += pi[s, a]*p[s_next]*(r + (1-done)*gamma*V[s_next])
V[s] = v_new
# check convergence
delta = np.max([delta, np.abs(v_new - v_old)])
# visualize your results
if show_value:
_, ax = plt.subplots(1, 1, figsize=(4, 4))
clear_output(True)
env.show_v(ax, V)
time.sleep(1)
plt.show()
# check convergence
if delta < theta: break
return V
V = np.zeros([env.nS,])
V = policy_eval(pi_rand, V, env, show_value=True)
2. 策略改进(policy improvement)
代码实现如下:
def policy_improve(pi, V, env, theta=1e-4, gamma=.99):
pi_old = pi.copy()
for s in env.S: # loop所有的state
q = np.zeros([env.nA]) # 初始化一个qtable
for a in env.A: # loop 所有的action
p = env.p_s_next(s, a) # 注意这里p是一个向量,转移到各个s的概率
for s_next in env.S: # loop所有可能的下一个state
r, done = env.r(s_next)
q[a] += p[s_next]*(r + (1-done)*gamma*V[s_next])
pi[s] = np.eye(env.nA)[np.argmax(q)]
# check stable
if (np.abs(pi_old - pi) < theta).all():
stable = True
else:
stable = False
return pi, stable
3. 策略迭代(policy interaction)
策略迭代是一种结合策略评估和策略改进的循环过程,用于找到最优策略。
其流程为:
重复以上过程,直到策略不再变化,达到收敛。
伪代码如下:
代码实现如下:
def policy_iter(env, seed=1234, show_update=False):
rng = np.random.RandomState(seed)
# initialize V(s), arbitrarily except V(terminal)=0
V = rng.rand(env.nS) * 0.001
# except v(terminal) = 0
for s in env.s_termination:
V[s] = 0
# initialize π(s), arbitrarily, pi is a 64 x 4 matrix
pi = softmax(rng.rand(env.nS, env.nA)*5, axis=1)
while True:
V = policy_eval(pi, V, env)
pi, stable = policy_improve(pi, V, env)
# visualize
if show_update:
_, axs = plt.subplots(1, 2, figsize=(8, 4))
clear_output(True)
ax = axs[0]
env.show_v(ax, V)
ax = axs[1]
env.show_pi(ax, pi)
time.sleep(.1)
plt.show()
# check convergence
if stable: break
return V, pi