动态规划

在强化学习中,动态规划(Dynamic Programming, DP)是一类用于求解决策问题的算法,适用于已知环境模型的情况。它依赖于贝尔曼方程(Bellman Equation)来逐步改进策略,从而找到最优解。动态规划通常用于处理马尔可夫决策过程(MDP),该过程具有状态转移概率奖励函数,这些信息是已知的。

动态规划的关键思想是通过将问题拆解为较小的子问题来求解复杂的决策问题。在强化学习中,常用的动态规划方法包括策略评估(policy evaluation)策略改进(policy improvement),它们通常交替使用,实现策略迭代(policy interaction)

1. 策略评估(policy evaluation)

策略评估的目标是估计一个给定策略 π\pi 的状态价值函数 Vπ(s)V^\pi(s) 或者动作价值函数 Qπ(s,a)Q^\pi(s, a)。策略评估是指计算在一个固定策略 π\pi 下,每个状态 ss 的预期累积回报,即状态价值函数:

Vπ(s)=Eπ[GtSt=s]V^\pi(s) = \mathbb{E}\pi \left[ G_t | S_t = s \right]

其中 GtG_t 是时间 tt 时刻以后的总回报,通常是一个折扣的累积回报:

Gt=Rt+1+γRt+2+γ2Rt+3+=k=0γkRt+k+1G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \dots = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}

策略评估主要通过贝尔曼方程来迭代更新状态价值:

Vπ(s)=Eπ[Rt+1+γVπ(St+1)St=s]V^\pi(s) = \mathbb{E}_\pi \left[ R_{t+1} + \gamma V^\pi(S_{t+1}) | S_t = s \right]

该方程表示当前状态 ss 的价值是当前收益 Rt+1R_{t+1}​ 加上未来状态 St+1S_{t+1} 的折扣价值。

伪代码如下:

代码实现如下:

# a random policy 
rng = np.random.RandomState(1234)
pi_rand = softmax(rng.rand(env.nS, env.nA)*5, axis=1)
pi_rand #
# pi_rand是一个64 (state) x 4 (action)的矩阵,代表在某个state下执行四个action的概率
def policy_eval(pi, V, env, theta=1e-4, gamma=.99, show_value=False):

    # except v(terminal) = 0
    for s in env.s_termination:
        V[s] = 0

    # loop until convergence
    while True: 
        delta = 0
        for s in env.S:
            if s not in env.s_termination:
                v_old = V[s].copy()
                v_new = 0
                for a in env.A:
                    p = env.p_s_next(s, a)
                    for s_next in env.S:
                        r, done = env.r(s_next)
                        v_new += pi[s, a]*p[s_next]*(r + (1-done)*gamma*V[s_next])
                V[s] = v_new 
                # check convergence
                delta = np.max([delta, np.abs(v_new - v_old)])

        # visualize your results
        if show_value:
            _, ax = plt.subplots(1, 1, figsize=(4, 4))
            clear_output(True)
            env.show_v(ax, V)
            time.sleep(1)
            plt.show()
        
        # check convergence
        if delta < theta: break 
    
    return V
V = np.zeros([env.nS,])
V = policy_eval(pi_rand, V, env, show_value=True)

2. 策略改进(policy improvement)

策略改进是在已知一个当前策略 π\pi 的情况下,尝试找到一个新的、改进后的策略 π\pi',使得新策略在所有状态下都至少与原策略相同或更优。

策略改进定理: 如果对于所有状态 ss,存在:

π(s)=argmaxaQπ(s,a)\pi'(s) = \arg\max_a Q^\pi(s, a)

即,新策略 π(s)\pi'(s) 选择使得动作价值 Qπ(s,a)Q^\pi(s, a) 最大的动作,那么新策略 π\pi' 至少与原策略 π\pi 一样好,甚至更优。这个过程就是策略改进。

策略改进的关键是找到一种策略 π\pi' ,在每个状态下都选择最优的动作,即贪婪地最大化期望回报。在策略改进过程中,利用状态价值函数或者动作价值函数作为参考,对当前策略做出调整。

代码实现如下:

def policy_improve(pi, V, env, theta=1e-4, gamma=.99):
    pi_old = pi.copy()
    for s in env.S: # loop所有的state
        q = np.zeros([env.nA]) # 初始化一个qtable
        for a in env.A: # loop 所有的action
            p = env.p_s_next(s, a) # 注意这里p是一个向量,转移到各个s的概率
            for s_next in env.S: # loop所有可能的下一个state
                r, done = env.r(s_next)
                q[a] += p[s_next]*(r + (1-done)*gamma*V[s_next])
        pi[s] = np.eye(env.nA)[np.argmax(q)]
    
    # check stable
    if (np.abs(pi_old - pi) < theta).all():
        stable = True
    else:
        stable = False 

    return pi, stable  

3. 策略迭代(policy interaction)

策略迭代是一种结合策略评估和策略改进的循环过程,用于找到最优策略。

其流程为:

  1. 初始化一个初始策略 π0\pi_0​。

  2. 策略评估:通过多次迭代计算得到该策略 πk\pi_k​ 下的状态价值函数 VπkV^{\pi_k}​。

  3. 策略改进:根据评估出的状态价值函数,得到新的策略 πk+1\pi_{k+1},使得在每个状态下的期望收益最大。

  4. 重复以上过程,直到策略不再变化,达到收敛。

策略迭代的核心步骤是在当前策略下进行策略评估,然后改进策略,直到找到一个最优策略 π\pi^* 和最优价值函数 V(s)V^*(s)

伪代码如下:

代码实现如下:

def policy_iter(env, seed=1234, show_update=False):

    rng = np.random.RandomState(seed)

    # initialize V(s), arbitrarily except V(terminal)=0
    V = rng.rand(env.nS) * 0.001
    # except v(terminal) = 0
    for s in env.s_termination:
        V[s] = 0
    # initialize π(s), arbitrarily, pi is a 64 x 4 matrix
    pi = softmax(rng.rand(env.nS, env.nA)*5, axis=1)

    while True: 

        V = policy_eval(pi, V, env)
        pi, stable = policy_improve(pi, V, env)

        # visualize 
        if show_update:
            _, axs = plt.subplots(1, 2, figsize=(8, 4))
            clear_output(True)
            ax = axs[0]
            env.show_v(ax, V)
            ax = axs[1]
            env.show_pi(ax, pi)
            time.sleep(.1)
            plt.show()

        # check convergence
        if stable: break 

    return V, pi 
V1, pi1 = policy_iter(env, show_update=True)

在设定的游戏环境中模拟得到如下结果,数值代表当前位置的价值,箭头代表当前位置的最优动作:

最后更新于