本文已参与「新人创造礼」活动,一同开启创造之路。

参阅视频:www.bilibili.com/video/BV1yv…

代码下载:github.com/PaddlePaddl…

能够先阅读我的文章强化学习大纲,本文针对强化学习的入门级解说。代码主要参阅强化学习算法结构库:PARL

材料引荐

  • 书本:《Reinforcement Learning: An Introduction》

  • 视频:David Silver经典强化学习公开课、UC Berkeley CS285、斯坦福CS234

  • 经典论文:

    • DQN:arxiv.org/pdf/1312.56…

    • A3C: www.jmlr.org/proceedings…

    • DDPG: arxiv.org/pdf/1509.02…

    • PPO: arxiv.org/pdf/1707.06…

  • 前沿研讨方向:Model-base RL、Hierarchical RL、Multi Agent RL、Meta Learning。

强化学习入门级实践教学

序章

Agent学习的两种方案

一种是依据价值(value-based),一种是依据战略(policy-based)

强化学习入门级实践教学

依据价值:咱们给每一个情况都赋予一个“价值”的概念,例如上图C情况的价值大于A情况,也便是往C走离奖赏更近。咱们的做法是让Agent总是往价值高的地方移动,这样就能找出一个最优的战略。一旦函数优化到最优了,相同的输入永远是同一个输出。

代表办法:Sarsa、Q-learning、DQN。

强化学习入门级实践教学

依据战略:咱们直接让一条战略走究竟,然后用最终的reward来判断战略是好是坏。好的战略能在以后的行为中取得更高的触发几率。因为输出的是几率,因而相同的输入会取得不同的输出,随机性更强。

代表办法:Policy Gradient。

RL概览分类

强化学习入门级实践教学

强化学习分为无模型(model-free)和依据模型(model-based),无模型的研讨会更加热门,上面的两种方案都是无模型的分类。

Model-free: 不需求知道情况之间的搬运概率(transition probability)。

Model-based: 需求知道情况之间的搬运概率。

而在Model-free中又有三种分类:

强化学习入门级实践教学

算法库

强化学习入门级实践教学

这儿咱们运用强化学习算法结构库:PARL。这儿涵盖了许多的经典算法,又能复现一些最新的盛行算法。看代码的学习办法,也是最快上手的学习办法。直接扫左上角二维码来学习。其间多智能体是一个热门的研讨方向,被认为更挨近人类社会的交互方式。

RL编程实践:GYM

前面的是和算法相关的库。别的一类是和环境相关的库。GYM是学术界比较喜爱的环境库。环境分为离散操控场景和接连操控场景,离散操控是输出只要有限个量,例如操控方向时只要向左向右,一般运用atari环境评价。接连操控是输出是一个接连的量,例如机械臂旋转的角度,一般运用mujoco环境来评价。

强化学习入门级实践教学

环境装置

在装置了python环境的前提下,翻开cmd,输入以下指令:

pip install paddlepaddle==1.6.3 # 网络超时加上-i https://pypi.tuna.tsinghua.edu.cn/simple,装置GPU版本要用paddlepaddle-gpu
pip install parl==1.3.1     #假如装置不成功,在install后加上--user
pip install gym

程序示例及PARL的优势

强化学习入门级实践教学

import gym
from gridworld import CliffWalkingWapper
import turtle
# 创立环境
env = gym.make("CliffWalking-v0")
# 绘制一个图形界面,不写这一行只要文字界面
env = CliffWalkingWapper(env)
# 重置界面,开端新的一轮
env.reset()
# 展示界面
env.render()
# 跟环境交互一步,假如有回来值第一个是纵坐标,第二个是reward,第三个是一轮是否完毕
# step(1)是往右走,step(2)是往下走,step(3)是向左走
env.step(0)
env.render()
# 让程序运转完毕后界面不立刻封闭
turtle.exitonclick()

PARL对结构库做了很好的笼统,一切的算法几乎都集中于Model,Algorithm,Agent三个类。其间Algoritm已经完结了中心的部分,别的两部分开发者能够很方便完结订制。在上面下载的PARL中,有许多算法的Example。

强化学习入门级实践教学

从上面随意找一个example,然后对model和Agent文件做必定修改,就能够移植到新的运用场景中。

PARL的工业运用才干很强。只需求加两行代码,就能够把单机练习变成多机练习。PARL的并行才干很强,和Python不同,真实做到多线程运转,节省大量时刻

强化学习入门级实践教学

装备好环境后,直接运转QuickStart里边的程序,看看是否成功,此处本人没有运转成功,报错信息如下:

C:\Python39\lib\site-packages\parl\remote\communication.py:38: FutureWarning: 'pyarrow.default_serialization_context' is deprecated as of 2.0.0 and will be removed in a future version. Use pickle or the pyarrow IPC functionality instead.
  context = pyarrow.default_serialization_context()
W0718 17:57:30.795331  4240 device_context.cc:404] Please NOTE: device: 0, GPU Compute Capability: 6.1, Driver API Version: 11.1, Runtime API Version: 10.2
[07-18 17:57:30 MainThread @train.py:73] obs_dim 4, act_dim 2
W0718 17:57:30.798295  4240 dynamic_loader.cc:238] Note: [Recommend] copy cudnn into CUDA installation directory. 
 For instance, download cudnn-10.0-windows10-x64-v7.6.5.32.zip from NVIDIA's official website, 
then, unzip it and copy it into C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.0
You should do this according to your CUDA installation directory and CUDNN version.

装置CUDNN仍旧测试失败,假如有人知道缘由,请私聊。

依据表格型办法求解RL

强化学习MDP四元组

强化学习入门级实践教学

这便是强化学习MDP四元组。咱们用情况搬运概率表述:在sts_t情况挑选ata_t的动作搬运到st+1s_{t+1}并且拿到奖赏rtr_t的概率。这样的决议计划进程是马尔科夫决议计划进程(MDP)。这是一个序列决议计划的经典表达办法。

情况搬运和序列决议计划

强化学习入门级实践教学

每次咱们只能走其间的一条完好通路,这便是一个调查情况和履行决议计划不断循环的进程。每次咱们都用两个函数描绘环境。一个是Probability Function(P函数),反映了环境的随机性。一个情况假如必定搬运到下一个特定的情况,那么情况搬运概率是100%。假如有多种或许的情况,每个的情况搬运概率在0到100%之间。和情况搬运概率成对的是Reward Function(R函数),描绘了环境的好坏。P函数和R函数已知,则称这个环境已知,也称为Model-based。**假如这些条件已知,那么咱们就能够用动态规划来寻觅最优战略。**但这儿咱们不讲动态规划。

这儿咱们针对的是在环境未知的情况下的解决办法,因为在实践问题中的情况搬运概率往往是未知的。关于P函数和R函数未知的情况咱们称为Model-free

Q表格

强化学习入门级实践教学

Q表格指导每一个Step的动作挑选,方针导向是未来的总收益。因为收益往往具有推迟,因而未来的总收益才干代表当时动作挑选的价值。可是因为前面的动刁难越往后的情况影响越小,因而咱们需求引入衰减因子。

强化学习入门级实践教学

**强化的概念是咱们能够用下一个情况的价值更新现在情况的价值。**因而咱们每一个Step都能够更新Q表格,这是一种时序差分的更新办法。

时序差分(Temporal Difference)

引荐一个有意思的网站:cs.stanford.edu/people/karp…

强化学习入门级实践教学

只要一个格子的reward为1,有许多格子reward为-1,随着小球的探究,有价值的格子也会把周围格子的分数拉高,构成一个全体的“评分体系”。只需求沿着分数增加的方向走必定能找到reward为1的点。

强化学习入门级实践教学

这便是一种时序差分的更新办法。

强化学习入门级实践教学

咱们要让Q(St,At)Q(S_t,A_t)逐渐逼近抱负的值,因而咱们每次只更新一点点,便是更新的速率。经过不断的更新咱们能够让每一个情况的GtG_t都满足:

Gt=Rt+1+Gt+1G_t = R_{t+1} + \gamma G_{t+1}

这便是Sarsa算法,命名办法是依照这五个值来进行核算。上面的格子的价值更新便是依照上面的公式来的。

Sarsa算法代码解析

Agent最重要的是完结两个功用,一个是依据算法选Action,第二个是学习Q表格。并且每一个step都包含这两步。

咱们要另开一个agent.py文件,声明一个Agent类,Sample函数挑选要输出Action,learn函数来更新Q。

class SarsaAgent(object):
    def __init__(self,obs_n,act_n,learning_rate=0.01,gamma=0.9,e_greed=0.1):
        self.act_n = act_n  # 动作维度,有几个动作可选
        self.lr = learning_rate  # 学习率
        self.gamma = gamma  # reward的衰减率
        self.epsilon = e_greed  # 按必定概率随机选动作
        self.Q = np.zeros((obs_n, act_n)) #生成一个矩阵作为Q表格,参数分别是情况的维度和动作维度,有多少格子就有多少情况
    # 依据输入调查值,回来输出的Action对应的索引,带探究
    def sample(self, obs):
        if np.random.uniform(0, 1) < (1.0 - self.epsilon):  #依据table的Q值选动作
            action = self.predict(obs)
        else:
            action = np.random.choice(self.act_n)  #有必定概率随机探究选取一个动作
        return action
    # 依据输入调查值,输出分数最高的一个动作值
    def predict(self, obs):
        Q_list = self.Q[obs, :]
        maxQ = np.max(Q_list)
        #where函数回来一个包含数组和数据类型的元组,取第一个才是数组
        action_list = np.where(Q_list == maxQ)[0]  # maxQ或许对应多个action
        action = np.random.choice(action_list)
        return action
    # 学习办法,也便是更新Q-table的办法,选用Sarsa算法来更新Q表格
    def learn(self, obs, action, reward, next_obs, next_action, done):
        """ on-policy
            obs: 交互前的obs, s_t
            action: 本次交互挑选的action, a_t
            reward: 本次动作取得的奖赏r
            next_obs: 本次交互后的obs, s_t+1
            next_action: 依据当时Q表格, 针对next_obs会挑选的动作, a_t+1
            done: episode是否完毕
        """
        predict_Q = self.Q[obs, action]
        if done:
            target_Q = reward  # 没有下一个情况了
        else:
            target_Q = reward + self.gamma * self.Q[next_obs,
                                                    next_action]  # Sarsa
        self.Q[obs, action] += self.lr * (target_Q - predict_Q)  # 批改q
    # 保存Q表格
    def save(self):
        npy_file = './q_table.npy'
        np.save(npy_file, self.Q)
        print(npy_file + ' saved.')
	# 读取Q表格
    def restore(self, npy_file='./q_table.npy'):
        self.Q = np.load(npy_file)
        print(npy_file + ' loaded.')

然后再开一个文件写咱们的main函数:

import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import SarsaAgent
import time
def run_episode(env, agent, render=False):
    total_steps = 0  # 记载每个episode走了多少step
    total_reward = 0
    obs = env.reset()  # 重置环境, 从头开一局(即开端新的一个episode)
    action = agent.sample(obs)  # 依据算法挑选一个动作
    while True:
        next_obs, reward, done, _ = env.step(action)  # 履行动作,action在0~3代表不同方向,并回来下一个情况
        next_action = agent.sample(next_obs)  # 依据算法挑选下一个动作
        # 练习 Sarsa 算法,更新Q表格
        agent.learn(obs, action, reward, next_obs, next_action, done)
        action = next_action # 把下一个动作更新为现在的动作
        obs = next_obs  # 把下一个情况更新为现在的情况
        total_reward += reward
        total_steps += 1  # 核算step数
        if render:
            env.render()  #渲染新的一帧图形
        if done:
            break
    return total_reward, total_steps
# 测试函数
def test_episode(env, agent):
    total_reward = 0
    obs = env.reset()
    while True:
        action = agent.predict(obs)  # greedy
        next_obs, reward, done, _ = env.step(action)
        total_reward += reward
        obs = next_obs
        time.sleep(0.5)
        env.render()
        if done:
            print('test reward = %.1f' % (total_reward))
            break
def main():
    # env = gym.make("FrozenLake-v0", is_slippery=False)  # 0 left, 1 down, 2 right, 3 up
    # env = FrozenLakeWapper(env)
    env = gym.make("CliffWalking-v0")  # 0 up, 1 right, 2 down, 3 left
    # 创立一个图形界面
    env = CliffWalkingWapper(env)
    # 实例化agent
    agent = SarsaAgent(
        obs_n=env.observation_space.n,
        act_n=env.action_space.n,
        learning_rate=0.1,
        gamma=0.9,
        e_greed=0.1)
    is_render = False
    for episode in range(500):
        ep_reward, ep_steps = run_episode(env, agent, is_render)
        print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
                                                          ep_reward))
        # 每隔20个episode渲染一下看看作用
        if episode % 20 == 0:
            is_render = True
        else:
            is_render = False
    # 练习完毕,检查算法作用
    test_episode(env, agent)
if __name__ == "__main__":
    main()

生成图形环境的文件gridworld.py的代码如下:

import gym
import turtle
import numpy as np
# turtle tutorial : https://docs.python.org/3.3/library/turtle.html
def GridWorld(gridmap=None, is_slippery=False):
    if gridmap is None:
        gridmap = ['SFFF', 'FHFH', 'FFFH', 'HFFG']
    env = gym.make("FrozenLake-v0", desc=gridmap, is_slippery=False)
    env = FrozenLakeWapper(env)
    return env
class FrozenLakeWapper(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.max_y = env.desc.shape[0]
        self.max_x = env.desc.shape[1]
        self.t = None
        self.unit = 50
    def draw_box(self, x, y, fillcolor='', line_color='gray'):
        self.t.up()
        self.t.goto(x * self.unit, y * self.unit)
        self.t.color(line_color)
        self.t.fillcolor(fillcolor)
        self.t.setheading(90)
        self.t.down()
        self.t.begin_fill()
        for _ in range(4):
            self.t.forward(self.unit)
            self.t.right(90)
        self.t.end_fill()
    def move_player(self, x, y):
        self.t.up()
        self.t.setheading(90)
        self.t.fillcolor('red')
        self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
    def render(self):
        if self.t == None:
            self.t = turtle.Turtle()
            self.wn = turtle.Screen()
            self.wn.setup(self.unit * self.max_x + 100,
                          self.unit * self.max_y + 100)
            self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
                                        self.unit * self.max_y)
            self.t.shape('circle')
            self.t.width(2)
            self.t.speed(0)
            self.t.color('gray')
            for i in range(self.desc.shape[0]):
                for j in range(self.desc.shape[1]):
                    x = j
                    y = self.max_y - 1 - i
                    if self.desc[i][j] == b'S':  # Start
                        self.draw_box(x, y, 'white')
                    elif self.desc[i][j] == b'F':  # Frozen ice
                        self.draw_box(x, y, 'white')
                    elif self.desc[i][j] == b'G':  # Goal
                        self.draw_box(x, y, 'yellow')
                    elif self.desc[i][j] == b'H':  # Hole
                        self.draw_box(x, y, 'black')
                    else:
                        self.draw_box(x, y, 'white')
            self.t.shape('turtle')
        x_pos = self.s % self.max_x
        y_pos = self.max_y - 1 - int(self.s / self.max_x)
        self.move_player(x_pos, y_pos)
class CliffWalkingWapper(gym.Wrapper):
    def __init__(self, env):
        gym.Wrapper.__init__(self, env)
        self.t = None
        self.unit = 50
        self.max_x = 12
        self.max_y = 4
    def draw_x_line(self, y, x0, x1, color='gray'):
        assert x1 > x0
        self.t.color(color)
        self.t.setheading(0)
        self.t.up()
        self.t.goto(x0, y)
        self.t.down()
        self.t.forward(x1 - x0)
    def draw_y_line(self, x, y0, y1, color='gray'):
        assert y1 > y0
        self.t.color(color)
        self.t.setheading(90)
        self.t.up()
        self.t.goto(x, y0)
        self.t.down()
        self.t.forward(y1 - y0)
    def draw_box(self, x, y, fillcolor='', line_color='gray'):
        self.t.up()
        self.t.goto(x * self.unit, y * self.unit)
        self.t.color(line_color)
        self.t.fillcolor(fillcolor)
        self.t.setheading(90)
        self.t.down()
        self.t.begin_fill()
        for i in range(4):
            self.t.forward(self.unit)
            self.t.right(90)
        self.t.end_fill()
    def move_player(self, x, y):
        self.t.up()
        self.t.setheading(90)
        self.t.fillcolor('red')
        self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
    def render(self):
        if self.t == None:
            self.t = turtle.Turtle()
            self.wn = turtle.Screen()
            self.wn.setup(self.unit * self.max_x + 100,
                          self.unit * self.max_y + 100)
            self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
                                        self.unit * self.max_y)
            self.t.shape('circle')
            self.t.width(2)
            self.t.speed(0)
            self.t.color('gray')
            for _ in range(2):
                self.t.forward(self.max_x * self.unit)
                self.t.left(90)
                self.t.forward(self.max_y * self.unit)
                self.t.left(90)
            for i in range(1, self.max_y):
                self.draw_x_line(
                    y=i * self.unit, x0=0, x1=self.max_x * self.unit)
            for i in range(1, self.max_x):
                self.draw_y_line(
                    x=i * self.unit, y0=0, y1=self.max_y * self.unit)
            for i in range(1, self.max_x - 1):
                self.draw_box(i, 0, 'black')
            self.draw_box(self.max_x - 1, 0, 'yellow')
            self.t.shape('turtle')
        x_pos = self.s % self.max_x
        y_pos = self.max_y - 1 - int(self.s / self.max_x)
        self.move_player(x_pos, y_pos)

On-policy和Off-policy

On-policy: 探究环境运用的战略和要更新的战略是一个战略(SARSA)

Off-policy: 探究环境运用的战略和要更新的战略不是同一个战略(Q-learning)

上面咱们学的SARSA是on-policy,优化的是实践履行的战略,因而只存在一种战略。off-policy在学习的进程中保留了两种不同的战略,一种是Target policy,使咱们想要的最佳的方针战略。另一个是Behavior policy,是探究环境的战略,需求斗胆探究一切或许的环境,然后交给方针战略去学习。并且交给方针战略的数据不需求At+1A_{t+1}(也便是Sarsa最终一个a)。

强化学习入门级实践教学

对比:

强化学习入门级实践教学

Q-learning

强化学习入门级实践教学

能够看到代码完结方面Q-learning和Sarsa相似,只是把初始action的挑选也放到了循环当中,并且learn函数已经不必传入At+1A_{t+1}了。

而在Agent类中,只要learn函数中的一行和Sarsa有差异,也便是Q表格的更新公式。

class QLearningAgent(object):
    --snip--
    # 学习办法,也便是更新Q-table的办法
    def learn(self, obs, action, reward, next_obs, done):
        """ off-policy
            obs: 交互前的obs, s_t
            action: 本次交互挑选的action, a_t
            reward: 本次动作取得的奖赏r
            next_obs: 本次交互后的obs, s_t+1
            done: episode是否完毕
        """
        predict_Q = self.Q[obs, action]
        if done:
            target_Q = reward  # 没有下一个情况了
        else:
            target_Q = reward + self.gamma * np.max(
                self.Q[next_obs, :])  # Q-learning
        self.Q[obs, action] += self.lr * (target_Q - predict_Q)  # 批改q
	--snip--

也便是说,探究的战略没变,都是往高价值的格子方向探究并带有10%的探究几率。可是更新格子价值的时分,咱们并不是依据探究的下一步来调整格子的价值,而是挑选周围全部能够到达的格子价值的最大值。因而,在Q-learning中,价值低的格子并不能影响周围格子的价值,只要价值高的格子才干影响周围格子的价值。

这和Sarsa不同,Sarsa下一步格子的价值都会影响到上一步格子,因而在reward为负数的格子周围那些格子的价值也为负数,因而agent会绕的远远地。而Q-learning,没有这个顾忌,因而agent直接便是最短途径拿到reward,因而total-reward会比Sarsa高。

强化学习入门级实践教学

总结

Sarsa的战略偏向于保存,既会趋利也会避害,即便探究不太完全也会里reward底的地方较远(防止随机走的时分掉进去),而Q-learning更具探究性,能找到一条最优的途径,可是探究的时分Behavior policy容易随机reward低点,但这不妨碍Target policy学习到更好的战略。

**其间每走一步reward减1的设定是探究的关键点。**因为没有探究到的格子价值为0,而探究到的无reward格子的价值往往为负数,agent会更趋向于探究没有探究过的格子。

那么Sarsa和Q-learning这两个经典的表格型算法和编程完结到此解说完毕。

拓展

强化学习入门级实践教学

讲到这儿,你应该已经了解两个算法的详细完结了,到多个环境中去跑算法调查现象吧!咱们能够自界说格子世界,看看咱们的算法是怎么找到最优途径的。

一起,我也发现这两个算法都有必定的缺点,便是没有办法跑黑块较多且较大的地图,这是因为阵线过长中途又没有反应,因而agent很难知道自己走的路线是对是错,我个人的主意是,往往这时分咱们需求人为加入额定的reward来引导。

强化学习入门级实践教学

依据神经网络办法求解RL

关于神经网络的内容能够自行检查深度学习内容,或检查我的文章《李宏毅机器学习2021》tianjuewudi.gitee.io/2021/05/07/…

当咱们的强化学习(RL)碰到了深度学习(DL),咱们就来到了深度强化学习(DRL)的范畴。

表格法有很大的缺点:

  1. 表格占用的内存空间巨大。
  2. 当表格极大时,查表功率低下。
  3. 只能表明有限个离散情况和动作。

现在咱们需求替换工具了,咱们能够用神经网络来替代表格法,咱们能够用S和A作为神经网络的输入,由此输出价值Q。也能够输入S并输出多个Q,每个Q对应一个A。神经网络只需求储存有限的网络参数,咱们的使命便是不断调整这些参数,使得输入输出符合咱们的预期,并且情况能够泛化,相似的情况输出也差不多,不必像表格法相同需求从头练习。

强化学习入门级实践教学

其间运用神经网络求解RL的经典算法是DQN。这是2015年宣布在Nature上的论文。神经网络的输入是像素级别的图像,运用神经网络近似替代Q表格,49个游戏中有30个超越人类水平。

DQN

强化学习入门级实践教学

DQN的做法相似于Q-learning和神经网络的组合。首要神经网络的练习办法是监督学习,其间练习的样本便是输入情况变量,输出动作Action,咱们经过Q-learning不断更新这些样本,然后送到神经网络中进行练习,希望拟合出一个和原来Q表格差不多的神经网络。

DQN提出了两个立异点使得其更有功率也更安稳。

强化学习入门级实践教学

第一个是经历回放。经历回放能够充分运用off-policy的优势,能够运用Behavior Policy探究的数据特征构成一组组数据,并且能够随机打乱,使得神经网络能够重复屡次地进行学习。这样能够打乱样本的关联性,并且能进步样本运用率。

强化学习入门级实践教学

第二个是固定Q方针。增加了算法的平稳性。因为在探究进程中的Q也是时时刻刻都在变化的,因而咱们练习的神经网络很难去逼近它的值,因而咱们需求把Q值固定一段时刻来练习参数,这是Q网络。咱们需求别的一个相同的网络(target Q网络),Q网络的作用是发生一个Q猜测值,直接用来决议计划发生action。而target Q是发生一个Q方针值,咱们需求练习网络让这两个值越挨近越好,这个Loss便是网络需求优化的方针,运用这个Loss咱们就能够更新Q网络的参数。刚开端咱们的网络的输出是随机的,可是遭到reward的影响,各个情况的价值随着更新都会逐渐区别开来。

强化学习入门级实践教学

强化学习入门级实践教学

这也是PARL这个结构的特色,它把算法结构拆分为model,algorithm,agent三个部分

强化学习入门级实践教学

强化学习入门级实践教学

代码完结

reply_memory.py:用来把一个batch的材料分类整理。

class ReplayMemory(object):
    def __init__(self, max_size):
        # deque是高性能的数据结构之一,是一个队列
        self.buffer = collections.deque(maxlen=max_size)
    # 往队列中添加一条数据
    def append(self, exp):
        self.buffer.append(exp)
    def sample(self, batch_size):
        # 随机取出batch_size条数据
        mini_batch = random.sample(self.buffer, batch_size)
        obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []
        # 数据分类寄存到列表中
        for experience in mini_batch:
            s, a, r, s_p, done = experience
            obs_batch.append(s)
            action_batch.append(a)
            reward_batch.append(r)
            next_obs_batch.append(s_p)
            done_batch.append(done)
        # 回来分类寄存一个batch数据的numpy数组
        return np.array(obs_batch).astype('float32'), \
            np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\
            np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')
    def __len__(self):
        return len(self.buffer)

首要是模型model.py:

界说了3层的全衔接网络,输入维数为1,输入为act_dim维。中间层分别是128和128个节点的全衔接层。

import parl
from parl.core.fluid import layers  # 封装了 paddle.fluid.layers 的API
class Model(parl.Model):
    # 界说了网络的结构
    def __init__(self, act_dim):
        hid1_size = 128
        hid2_size = 128
        # 3层全衔接网络
        self.fc1 = layers.fc(size=hid1_size, act='relu')
        self.fc2 = layers.fc(size=hid2_size, act='relu')
        self.fc3 = layers.fc(size=act_dim, act=None)
    # 决议了网络的输出
    def value(self, obs):
        h1 = self.fc1(obs)
        h2 = self.fc2(h1)
        Q = self.fc3(h2)
        return Q

algorithm.py,DQN算法的精髓:

import copy
import paddle.fluid as fluid
import parl
from parl.core.fluid import layers
class DQN(parl.Algorithm):
    def __init__(self, model, act_dim=None, gamma=None, lr=None):
        """ DQN algorithm    
        Args:
            model (parl.Model): 界说Q函数的前向网络结构
            act_dim (int): action空间的维度,即有几个action
            gamma (float): reward的衰减因子
            lr (float): learning_rate,学习率.
        """
        self.model = model
        # 仿制一个如出一辙的网络
        self.target_model = copy.deepcopy(model)
        # assert刺进调试断点到程序,这儿用isinstance判断数据类型是否正确,否则抛出异常
        assert isinstance(act_dim, int)
        assert isinstance(gamma, float)
        assert isinstance(lr, float)
        self.act_dim = act_dim
        self.gamma = gamma
        self.lr = lr
    # 运用self.model的value网络来获取 [Q(s,a1),Q(s,a2),...]    
    def predict(self, obs):
        return self.model.value(obs)
    # 运用DQN算法更新self.model的value网络,传进的参数是一个个数组,是一个batch中的数据,回来Loss
    def learn(self, obs, action, reward, next_obs, terminal):
        # 从target_model中获取 max Q' 的值,用于核算target_Q
        next_pred_value = self.target_model.value(next_obs)
        best_v = layers.reduce_max(next_pred_value, dim=1)
        best_v.stop_gradient = True  # 阻止梯度传递,Target Model不需求练习
        terminal = layers.cast(terminal, dtype='float32')  # bool变量转换成浮点数
        target = reward + (1.0 - terminal) * self.gamma * best_v  # 这样就能够把公式统一
        pred_value = self.model.value(obs)  # 获取Q猜测值
        # 将action转onehot向量,比方:3 => [0,0,0,1,0]
        action_onehot = layers.one_hot(action, self.act_dim)
        action_onehot = layers.cast(action_onehot, dtype='float32')
        # 下面一行是逐元素相乘,拿到action对应的 Q(s,a)
        # 比方:pred_value = [[2.3, 5.7, 1.2, 3.9, 1.4]], action_onehot = [[0,0,0,1,0]]
        #  ==> pred_action_value = [[3.9]]
        pred_action_value = layers.reduce_sum(
            layers.elementwise_mul(action_onehot, pred_value), dim=1)
        # 核算 Q(s,a) 与 target_Q的均方差,得到loss
        cost = layers.square_error_cost(pred_action_value, target)
        cost = layers.reduce_mean(cost)
        optimizer = fluid.optimizer.Adam(learning_rate=self.lr)  # 运用Adam优化器
        optimizer.minimize(cost)                                 # 练习
        return cost
    # 把 self.model 的模型参数值同步到 self.target_model
    def sync_target(self):
        self.model.sync_weights_to(self.target_model)

咱们看看程序是怎么更新神经网络的,咱们知道神经网络是替代存储Q表格的地方,首要,咱们存在两个神经网络,一个输出猜测值,一个输出方针值。首要咱们需求经过Target Q网络核算输出的方针值,因为网络没有充分练习,这个值是不准确的。然后咱们获取了Q网络的猜测值,因为它也没有经过充分练习,这个猜测值也是不准确的。尽管不准确,可是咱们还是需求练习Q网络让猜测值靠近依托Target Q网络核算的方针值。这便是Learn函数做的事情。

agent.py:

import numpy as np
import paddle.fluid as fluid
import parl
from parl.core.fluid import layers
class Agent(parl.Agent):
    def __init__(self,algorithm,obs_dim,act_dim,e_greed=0.1,e_greed_decrement=0):
        assert isinstance(obs_dim, int)
        assert isinstance(act_dim, int)
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        super(Agent, self).__init__(algorithm)
        self.global_step = 0
        self.update_target_steps = 200  # 每隔200个training steps再把model的参数仿制到target_model中
        self.e_greed = e_greed  # 有必定概率随机选取动作,探究
        self.e_greed_decrement = e_greed_decrement  # 随着练习逐渐收敛,探究的程度渐渐下降
    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()
        with fluid.program_guard(self.pred_program):  # 搭建核算图用于 猜测动作,界说输入输出变量
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            self.value = self.alg.predict(obs)
        with fluid.program_guard(self.learn_program):  # 搭建核算图用于 更新Q网络,界说输入输出变量
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            action = layers.data(name='act', shape=[1], dtype='int32')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            next_obs = layers.data(
                name='next_obs', shape=[self.obs_dim], dtype='float32')
            terminal = layers.data(name='terminal', shape=[], dtype='bool')
            self.cost = self.alg.learn(obs, action, reward, next_obs, terminal)
    def sample(self, obs):
        sample = np.random.rand()  # 发生0~1之间的小数
        if sample < self.e_greed:
            act = np.random.randint(self.act_dim)  # 探究:每个动作都有概率被挑选
        else:
            act = self.predict(obs)  # 挑选最优动作
        self.e_greed = max(
            0.01, self.e_greed - self.e_greed_decrement)  # 随着练习逐渐收敛,探究的程度渐渐下降
        return act
    def predict(self, obs):  # 挑选最优动作
        obs = np.expand_dims(obs, axis=0)
        pred_Q = self.fluid_executor.run(
            self.pred_program,
            feed={'obs': obs.astype('float32')},
            fetch_list=[self.value])[0]
        pred_Q = np.squeeze(pred_Q, axis=0)
        act = np.argmax(pred_Q)  # 挑选Q最大的下标,即对应的动作
        return act
    def learn(self, obs, act, reward, next_obs, terminal):
        # 每隔200个training steps同步一次model和target_model的参数
        if self.global_step % self.update_target_steps == 0:
            self.alg.sync_target()
        self.global_step += 1
        act = np.expand_dims(act, -1)
        feed = {
            'obs': obs.astype('float32'),
            'act': act.astype('int32'),
            'reward': reward,
            'next_obs': next_obs.astype('float32'),
            'terminal': terminal
        }
        cost = self.fluid_executor.run(
            self.learn_program, feed=feed, fetch_list=[self.cost])[0]  # 练习一次网络
        return cost

train.py:履行代码所在

import os
import gym
import numpy as np
import parl
from parl.utils import logger  # 日志打印工具
from model import Model
from algorithm import DQN
#from parl.algorithms import DQN
from agent import Agent
from replay_memory import ReplayMemory
LEARN_FREQ = 5  # 练习频率,不需求每一个step都learn,攒一些新增经历后再learn,进步功率
MEMORY_SIZE = 20000  # replay memory的巨细,越大越占用内存
MEMORY_WARMUP_SIZE = 200  # replay_memory 里需求预存一些经历数据,再从里边sample一个batch的经历让agent去learn
BATCH_SIZE = 32  # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
LEARNING_RATE = 0.001  # 学习率
GAMMA = 0.99  # reward 的衰减因子,一般取 0.9 到 0.999 不等
# 练习一个episode
def run_episode(env, agent, rpm):
    total_reward = 0
    obs = env.reset()
    step = 0
    while True:
        step += 1
        action = agent.sample(obs)  # 采样动作,一切动作都有概率被尝试到
        next_obs, reward, done, _ = env.step(action)
        rpm.append((obs, action, reward, next_obs, done))
        # train model
        if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):
            (batch_obs, batch_action, batch_reward, batch_next_obs,
             batch_done) = rpm.sample(BATCH_SIZE)
            train_loss = agent.learn(batch_obs, batch_action, batch_reward,
                                     batch_next_obs,
                                     batch_done)  # s,a,r,s',done
        total_reward += reward
        obs = next_obs
        if done:
            break
    return total_reward
# 评价 agent, 跑 5 个episode,总reward求均匀
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        episode_reward = 0
        while True:
            action = agent.predict(obs)  # 猜测动作,只选最优动作
            obs, reward, done, _ = env.step(action)
            episode_reward += reward
            if render:
                env.render()
            if done:
                break
        eval_reward.append(episode_reward)
    return np.mean(eval_reward)
def main():
    env = gym.make(
        'CartPole-v0'
    )  # CartPole-v0: expected reward > 180                MountainCar-v0 : expected reward > -120
    action_dim = env.action_space.n  # CartPole-v0: 2
    obs_shape = env.observation_space.shape  # CartPole-v0: (4,)
    rpm = ReplayMemory(MEMORY_SIZE)  # DQN的经历回放池
    # 依据parl结构构建agent
    model = Model(act_dim=action_dim)
    algorithm = DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)
    agent = Agent(
        algorithm,
        obs_dim=obs_shape[0],
        act_dim=action_dim,
        e_greed=0.1,  # 有必定概率随机选取动作,探究
        e_greed_decrement=1e-6)  # 随着练习逐渐收敛,探究的程度渐渐下降
    # 加载模型
    # save_path = './dqn_model.ckpt'
    # agent.restore(save_path)
    # 先往经历池里存一些数据,防止最开端练习的时分样本丰厚度不够
    while len(rpm) < MEMORY_WARMUP_SIZE:
        run_episode(env, agent, rpm)
    max_episode = 2000
    # start train
    episode = 0
    while episode < max_episode:  # 练习max_episode个回合,test部分不核算入episode数量
        # train part
        for i in range(0, 50):
            total_reward = run_episode(env, agent, rpm)
            episode += 1
        # test part
        eval_reward = evaluate(env, agent, render=True)  # render=True 检查显现作用
        logger.info('episode:{}    e_greed:{}   Test reward:{}'.format(
            episode, agent.e_greed, eval_reward))
    # 练习完毕,保存模型
    save_path = './dqn_model.ckpt'
    agent.save(save_path)
if __name__ == '__main__':
    main()

CartPole

CartPole的环境相当于强化学习中的“Hello World”,是最基础的环境,任何强化学习算法都能够在上面运转并测试是否收敛。

强化学习入门级实践教学

PARL常用API

强化学习入门级实践教学

强化学习入门级实践教学

依据战略梯度求解RL

上面咱们所学的是value-based,经过探究将Q价值更新到最优,然后依据Q价值来挑选最优的动作。下面咱们讲policy-based,动作挑选不再依靠价值函数,而是依据一个战略走究竟,看最终的总收益决议算法好坏,好Action将在以后有更大的概率被随机到。

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

能够看到,智能体经过直接输出一个动作挑选的概率来挑选下一步的action,这个战略是能够优化的,可是履行一个action之后有多种情况的或许性,这也便是环境的随机性。不断输出动作到新的情况到游戏完毕称为完结一个episode,这一串的交互称为一个episode的轨道(Trajectory)。

强化学习入门级实践教学

希望报答是穷举一切在该战略下的轨道能拿到的报答(Reward)的均匀值,在实践运用中咱们无法穷举,也不能得出情况搬运概率,可是咱们能够经过运转多个episode然后将得到的reward取均匀近似认为是希望报答。

因为咱们没有label对咱们的网络进行更新,这时分就需求运用咱们的希望报答了,咱们经过梯度上升法使得咱们的reward变大,那么就能使得战略变好,经过推导能够得出咱们更新的梯度公式如下:

强化学习入门级实践教学

蒙特卡洛(MC)和时序差分(TD)

强化学习入门级实践教学

蒙特卡洛是完结一个episode之后,再做一次更新,其间GtG_t指的是在一个step之后能拿到的总收益之和。而时序差分指的是每一个step都更新数据。用的是Q function来近似表明总收益。

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

Policy Gradient

重点来了!!!咱们运用的更新办法是Policy Gradient,也便是战略梯度,咱们的loss仍旧经过穿插熵办法取得,但还需求乘以一个GtG_t作为权重,也便是咱们核算出来的总收益!总收益越高的说明咱们的决议计划越好,GtG_t越大,我网络会用较大的梯度来进行更新,反之GtG_t较小则用小梯度更新。这样进行屡次更新之后,咱们网络算出来的决议计划就会越来越趋近于好的决议计划。

关于离散的动作,咱们直接选用价值核算的原始公式,然后用梯度上升法即可,详细操作能够用蒙泰卡罗采样的办法替代:

强化学习入门级实践教学

关于接连动作(关于离散动作相同适用),例如代码中的比如:

强化学习入门级实践教学

总结:

强化学习入门级实践教学

其间关于每个动作价值Q的核算都是在完结一个episode之后,运用带扣头的Reward从后往行进行核算的。还有一种便是运用神经网络来近似价值函数的办法,称为Actor-Critic。

对应代码:

    def learn(self, obs, action, reward):
        """ 用policy gradient 算法更新policy model
        """
        act_prob = self.model(obs)  # 获取输出动作概率
        # log_prob = layers.cross_entropy(act_prob, action) # 穿插熵
        log_prob = layers.reduce_sum(
            -1.0 * layers.log(act_prob) * layers.one_hot(
                action, act_prob.shape[1]),
            dim=1)
        cost = log_prob * reward
        cost = layers.reduce_mean(cost)
        optimizer = fluid.optimizer.Adam(self.lr)
        optimizer.minimize(cost)
        return cost

流程及代码:

强化学习入门级实践教学

强化学习入门级实践教学

model.py:

import parl
from parl import layers
class Model(parl.Model):
    def __init__(self, act_dim):
        act_dim = act_dim
        hid1_size = act_dim * 10
        self.fc1 = layers.fc(size=hid1_size, act='tanh')
        self.fc2 = layers.fc(size=act_dim, act='softmax')
    def forward(self, obs):  # 可直接用 model = Model(5); model(obs)调用
        out = self.fc1(obs)
        out = self.fc2(out)
        return out

algorithm.py:

import paddle.fluid as fluid
import parl
from parl import layers
class PolicyGradient(parl.Algorithm):
    def __init__(self, model, lr=None):
        self.model = model
        assert isinstance(lr, float)
        self.lr = lr
    def predict(self, obs):
        """ 运用policy model猜测输出的动作概率
        """
        return self.model(obs)
    def learn(self, obs, action, reward):
        """ 用policy gradient 算法更新policy model
        """
        act_prob = self.model(obs)  # 获取输出动作概率
        # log_prob = layers.cross_entropy(act_prob, action) # 穿插熵
        log_prob = layers.reduce_sum(
            -1.0 * layers.log(act_prob) * layers.one_hot(
                action, act_prob.shape[1]),
            dim=1)
        cost = log_prob * reward
        cost = layers.reduce_mean(cost)
        optimizer = fluid.optimizer.Adam(self.lr)
        optimizer.minimize(cost)
        return cost

agent.py:

import numpy as np
import paddle.fluid as fluid
import parl
from parl import layers
class Agent(parl.Agent):
    def __init__(self, algorithm, obs_dim, act_dim):
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        super(Agent, self).__init__(algorithm)
    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()
        with fluid.program_guard(self.pred_program):  # 搭建核算图用于 猜测动作,界说输入输出变量
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            self.act_prob = self.alg.predict(obs)
        with fluid.program_guard(
                self.learn_program):  # 搭建核算图用于 更新policy网络,界说输入输出变量
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            act = layers.data(name='act', shape=[1], dtype='int64')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            self.cost = self.alg.learn(obs, act, reward)
    def sample(self, obs):
        obs = np.expand_dims(obs, axis=0)  # 增加一维维度
        act_prob = self.fluid_executor.run(
            self.pred_program,
            feed={'obs': obs.astype('float32')},
            fetch_list=[self.act_prob])[0]
        act_prob = np.squeeze(act_prob, axis=0)  # 减少一维维度
        act = np.random.choice(range(self.act_dim), p=act_prob)  # 依据动作概率选取动作
        return act
    def predict(self, obs):
        obs = np.expand_dims(obs, axis=0)
        act_prob = self.fluid_executor.run(
            self.pred_program,
            feed={'obs': obs.astype('float32')},
            fetch_list=[self.act_prob])[0]
        act_prob = np.squeeze(act_prob, axis=0)
        act = np.argmax(act_prob)  # 依据动作概率挑选概率最高的动作
        return act
    def learn(self, obs, act, reward):
        act = np.expand_dims(act, axis=-1)
        feed = {
            'obs': obs.astype('float32'),
            'act': act.astype('int64'),
            'reward': reward.astype('float32')
        }
        cost = self.fluid_executor.run(
            self.learn_program, feed=feed, fetch_list=[self.cost])[0]
        return cost

train.py:

import gym
import numpy as np
import parl
from agent import Agent
from model import Model
from algorithm import PolicyGradient  # from parl.algorithms import PolicyGradient
from parl.utils import logger
LEARNING_RATE = 1e-3
# 练习一个episode
def run_episode(env, agent):
    obs_list, action_list, reward_list = [], [], []
    obs = env.reset()
    while True:
        obs_list.append(obs)
        action = agent.sample(obs)
        action_list.append(action)
        obs, reward, done, info = env.step(action)
        reward_list.append(reward)
        if done:
            break
    return obs_list, action_list, reward_list
# 评价 agent, 跑 5 个episode,总reward求均匀
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        episode_reward = 0
        while True:
            action = agent.predict(obs)
            obs, reward, isOver, _ = env.step(action)
            episode_reward += reward
            if render:
                env.render()
            if isOver:
                break
        eval_reward.append(episode_reward)
    return np.mean(eval_reward)
def calc_reward_to_go(reward_list, gamma=1.0):
    for i in range(len(reward_list) - 2, -1, -1):
        # G_i = r_i + G_i+1
        reward_list[i] += gamma * reward_list[i + 1]  # Gt
    return np.array(reward_list)
def main():
    env = gym.make('CartPole-v0')
    # env = env.unwrapped # Cancel the minimum score limit
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.n
    logger.info('obs_dim {}, act_dim {}'.format(obs_dim, act_dim))
    # 依据parl结构构建agent
    model = Model(act_dim=act_dim)
    alg = PolicyGradient(model, lr=LEARNING_RATE)
    agent = Agent(alg, obs_dim=obs_dim, act_dim=act_dim)
    # 加载模型
    # if os.path.exists('./model.ckpt'):
    #     agent.restore('./model.ckpt')
    #     run_episode(env, agent, train_or_test='test', render=True)
    #     exit()
    for i in range(1000):
        obs_list, action_list, reward_list = run_episode(env, agent)
        if i % 10 == 0:
            logger.info("Episode {}, Reward Sum {}.".format(
                i, sum(reward_list)))
        batch_obs = np.array(obs_list)
        batch_action = np.array(action_list)
        batch_reward = calc_reward_to_go(reward_list)
        agent.learn(batch_obs, batch_action, batch_reward)
        if (i + 1) % 100 == 0:
            total_reward = evaluate(env, agent, render=True)
            logger.info('Test reward: {}'.format(total_reward))
    # save the parameters to ./model.ckpt
    agent.save('./model.ckpt')
if __name__ == '__main__':
    main()

因为库的版本问题,这儿仍旧没有运转成功。

接连动作空间上求解RL

离散动作和接连动作

强化学习入门级实践教学

在接连动作空间的求解中,前面讲的Sarsa,DQN,Q-learning,Reinforce都是没有办法处理的。这时分咱们就需求用神经网络的输出代表一个动作的“起伏”,而不是单纯决议做某个动作。例如咱们能够用一个-1到1的输出来操控小车的速度,正数是行进,负数是撤退,绝对值越大速度越快。

强化学习入门级实践教学

DDPG(Deep Deterministic Policy Gradient)

DDPG借鉴了DQN的技巧,也便是方针网络和经历回放。并且能够输出确认的动作(即动作的起伏),并且是一个单步更新的Policy网络。

Actor-Critic

强化学习入门级实践教学

咱们有两个神经网络,一个Actor,一个Critic。Actor的使命是对外输出动作并且依据Critic的打分来调整自己的参数来取得更好的输出,使得Critic打分更高。

Critic的使命是对Actor的输出做评价(猜测),并且经过reward不断调整自己的猜测能或许地挨近reward。这也便是一个Q网络。

强化学习入门级实践教学

强化学习入门级实践教学

实践上,为了更新的安稳,咱们又分别给Actor和Critic都附加一个Target网络,和DQN相同,也便是说DDPG存在四个网络。

代码完结

强化学习入门级实践教学

分类数据的reply_memory.py:

import random
import collections
import numpy as np
class ReplayMemory(object):
    def __init__(self, max_size):
        self.buffer = collections.deque(maxlen=max_size)
    def append(self, exp):
        self.buffer.append(exp)
    def sample(self, batch_size):
        mini_batch = random.sample(self.buffer, batch_size)
        obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []
        for experience in mini_batch:
            s, a, r, s_p, done = experience
            obs_batch.append(s)
            action_batch.append(a)
            reward_batch.append(r)
            next_obs_batch.append(s_p)
            done_batch.append(done)
        return np.array(obs_batch).astype('float32'), \
            np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\
            np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')
    def __len__(self):
        return len(self.buffer)

model.py:

import parl
from parl import layers
# 用一个类把两个模型封装起来
class Model(parl.Model):
    def __init__(self, act_dim):
        self.actor_model = ActorModel(act_dim)
        self.critic_model = CriticModel()
    def policy(self, obs):
        return self.actor_model.policy(obs)
    def value(self, obs, act):
        return self.critic_model.value(obs, act)
    # 获取actor网络的parameters的称号,回来包含模型一切参数称号的list
    def get_actor_params(self):
        return self.actor_model.parameters()
# Actor网络
class ActorModel(parl.Model):
    def __init__(self, act_dim):
        hid_size = 100
        self.fc1 = layers.fc(size=hid_size, act='relu')
        self.fc2 = layers.fc(size=act_dim, act='tanh')
    def policy(self, obs):
        hid = self.fc1(obs)
        means = self.fc2(hid)
        return means
# Critic网络
class CriticModel(parl.Model):
    def __init__(self):
        hid_size = 100
        self.fc1 = layers.fc(size=hid_size, act='relu')
        self.fc2 = layers.fc(size=1, act=None)
    def value(self, obs, act):
        # 拼接两个数组
        concat = layers.concat([obs, act], axis=1)
        hid = self.fc1(concat)
        Q = self.fc2(hid)
        Q = layers.squeeze(Q, axes=[1])
        return Q

最关键的algorithm.py:

其实两个Target model都是给Critic更新参数用的,而Actor更新参数只需求看Critic的输出。

import parl
from parl import layers
from copy import deepcopy
from paddle import fluid
class DDPG(parl.Algorithm):
    def __init__(self,model,gamma=None,tau=None,actor_lr=None,critic_lr=None):
        """  DDPG algorithm      
        Args:
            model (parl.Model): actor and critic 的前向网络.
                                model 必须完结 get_actor_params() 办法.
            gamma (float): reward的衰减因子.
            tau (float): self.target_model 跟 self.model 同步参数 的 软更新参数
            actor_lr (float): actor 的学习率
            critic_lr (float): critic 的学习率
        """
        assert isinstance(gamma, float)
        assert isinstance(tau, float)
        assert isinstance(actor_lr, float)
        assert isinstance(critic_lr, float)
        self.gamma = gamma
        self.tau = tau
        self.actor_lr = actor_lr
        self.critic_lr = critic_lr
        self.model = model
        self.target_model = deepcopy(model)
    def predict(self, obs):
        """ 运用 self.model 的 actor model 来猜测动作
        """
        return self.model.policy(obs)
    def learn(self, obs, action, reward, next_obs, terminal):
        """ 用DDPG算法更新 actor 和 critic
        """
        actor_cost = self._actor_learn(obs)
        critic_cost = self._critic_learn(obs, action, reward, next_obs,
                                         terminal)
        return actor_cost, critic_cost
    def _actor_learn(self, obs):
        # 核算Critic中的Q,然后使这个Q越大越好
        action = self.model.policy(obs)
        Q = self.model.value(obs, action)
        cost = layers.reduce_mean(-1.0 * Q)
        optimizer = fluid.optimizer.AdamOptimizer(self.actor_lr)
        # 只更新Actor的参数,因而要给minimize指定方针,否则也会调整到Critic的参数
        optimizer.minimize(cost, parameter_list=self.model.get_actor_params())
        return cost
    def _critic_learn(self, obs, action, reward, next_obs, terminal):
        # 核算Target Q,也便是方针价值
        next_action = self.target_model.policy(next_obs)
        next_Q = self.target_model.value(next_obs, next_action)
        terminal = layers.cast(terminal, dtype='float32')
        target_Q = reward + (1.0 - terminal) * self.gamma * next_Q
        target_Q.stop_gradient = True
        # 依据网络得出现在的Q,然后算出均方差,用来练习网络使得Q趋近于Target Q
        Q = self.model.value(obs, action)
        cost = layers.square_error_cost(Q, target_Q)
        cost = layers.reduce_mean(cost)
        optimizer = fluid.optimizer.AdamOptimizer(self.critic_lr)
        optimizer.minimize(cost)
        return cost
    def sync_target(self, decay=None, share_vars_parallel_executor=None):
        """ self.target_model从self.model仿制参数过来,若decay不为None,则是软更新
        """
        if decay is None:
            decay = 1.0 - self.tau
        self.model.sync_weights_to(
            self.target_model,
            decay=decay,
            share_vars_parallel_executor=share_vars_parallel_executor)

train.py:

import gym
import numpy as np
import parl
from parl.utils import logger
from agent import Agent
from model import Model
from algorithm import DDPG  # from parl.algorithms import DDPG
from env import ContinuousCartPoleEnv
from replay_memory import ReplayMemory
ACTOR_LR = 1e-3  # Actor网络的 learning rate
CRITIC_LR = 1e-3  # Critic网络的 learning rate
GAMMA = 0.99  # reward 的衰减因子
TAU = 0.001  # 软更新的系数
MEMORY_SIZE = int(1e6)  # 经历池巨细
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20  # 预存一部分经历之后再开端练习
BATCH_SIZE = 128
REWARD_SCALE = 0.1  # reward 缩放系数
NOISE = 0.05  # 动作噪声方差
TRAIN_EPISODE = 6e3  # 练习的总episode数
# 练习一个episode
def run_episode(agent, env, rpm):
    obs = env.reset()
    total_reward = 0
    steps = 0
    while True:
        steps += 1
        batch_obs = np.expand_dims(obs, axis=0)
        action = agent.predict(batch_obs.astype('float32'))
        # 增加探究扰动, 输出限制在 [-1.0, 1.0] 范围内
        action = np.clip(np.random.normal(action, NOISE), -1.0, 1.0)
        next_obs, reward, done, info = env.step(action)
        action = [action]  # 方便存入replaymemory
        rpm.append((obs, action, REWARD_SCALE * reward, next_obs, done))
        # 预存多个经历以及每五个step提取一次
        if len(rpm) > MEMORY_WARMUP_SIZE and (steps % 5) == 0:
            (batch_obs, batch_action, batch_reward, batch_next_obs,
             batch_done) = rpm.sample(BATCH_SIZE)
            agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs,
                        batch_done)
        obs = next_obs
        total_reward += reward
        if done or steps >= 200:
            break
    return total_reward
# 评价 agent, 跑 5 个episode,总reward求均匀
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        total_reward = 0
        steps = 0
        while True:
            batch_obs = np.expand_dims(obs, axis=0)
            action = agent.predict(batch_obs.astype('float32'))
            action = np.clip(action, -1.0, 1.0)
            steps += 1
            next_obs, reward, done, info = env.step(action)
            obs = next_obs
            total_reward += reward
            if render:
                env.render()
            if done or steps >= 200:
                break
        eval_reward.append(total_reward)
    return np.mean(eval_reward)
def main():
    env = ContinuousCartPoleEnv()
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]
    # 运用PARL结构创立agent
    model = Model(act_dim)
    algorithm = DDPG(
        model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
    agent = Agent(algorithm, obs_dim, act_dim)
    # 创立经历池
    rpm = ReplayMemory(MEMORY_SIZE)
    # 往经历池中预存数据
    while len(rpm) < MEMORY_WARMUP_SIZE:
        run_episode(agent, env, rpm)
    episode = 0
    while episode < TRAIN_EPISODE:
        for i in range(50):
            total_reward = run_episode(agent, env, rpm)
            episode += 1
    	# 每50个episode评价一次
        eval_reward = evaluate(env, agent, render=False)
        logger.info('episode:{}    Test reward:{}'.format(
            episode, eval_reward))
if __name__ == '__main__':
    main()

别的附上环境的代码env.py:

"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
Continuous version by Ian Danforth
"""
import math
import gym
from gym import spaces, logger
from gym.utils import seeding
import numpy as np
class ContinuousCartPoleEnv(gym.Env):
    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 50
    }
    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 30.0
        self.tau = 0.02  # seconds between state updates
        self.min_action = -1.0
        self.max_action = 1.0
        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4
        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds
        high = np.array([
            self.x_threshold * 2,
            np.finfo(np.float32).max, self.theta_threshold_radians * 2,
            np.finfo(np.float32).max
        ])
        self.action_space = spaces.Box(
            low=self.min_action, high=self.max_action, shape=(1, ))
        self.observation_space = spaces.Box(-high, high)
        self.seed()
        self.viewer = None
        self.state = None
        self.steps_beyond_done = None
    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]
    def stepPhysics(self, force):
        x, x_dot, theta, theta_dot = self.state
        costheta = math.cos(theta)
        sintheta = math.sin(theta)
        temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta
                ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / \
            (self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
        x = x + self.tau * x_dot
        x_dot = x_dot + self.tau * xacc
        theta = theta + self.tau * theta_dot
        theta_dot = theta_dot + self.tau * thetaacc
        return (x, x_dot, theta, theta_dot)
    def step(self, action):
        action = np.expand_dims(action, 0)
        assert self.action_space.contains(action), \
            "%r (%s) invalid" % (action, type(action))
        # Cast action to float to strip np trappings
        force = self.force_mag * float(action)
        self.state = self.stepPhysics(force)
        x, x_dot, theta, theta_dot = self.state
        done = x < -self.x_threshold \
            or x > self.x_threshold \
            or theta < -self.theta_threshold_radians \
            or theta > self.theta_threshold_radians
        done = bool(done)
        if not done:
            reward = 1.0
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn("""
You are calling 'step()' even though this environment has already returned
done = True. You should always call 'reset()' once you receive 'done = True'
Any further steps are undefined behavior.
                """)
            self.steps_beyond_done += 1
            reward = 0.0
        return np.array(self.state), reward, done, {}
    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4, ))
        self.steps_beyond_done = None
        return np.array(self.state)
    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400
        world_width = self.x_threshold * 2
        scale = screen_width / world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * 1.0
        cartwidth = 50.0
        cartheight = 30.0
        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth / 2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(0, 0, 0)
            self.viewer.add_geom(self.track)
        if self.state is None:
            return None
        x = self.state
        cartx = x[0] * scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])
        return self.viewer.render(return_rgb_array=(mode == 'rgb_array'))
    def close(self):
        if self.viewer:
            self.viewer.close()

拓展

RLSchool:一个强化学习模仿环境合集

强化学习入门级实践教学

无人机悬浮操控使命

强化学习入门级实践教学

强化学习入门级实践教学

更多环境

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

强化学习入门级实践教学

1星环境:简单的弹跳和接球游戏:github.com/shivaverma/… 2星环境:GYM环境 Box2D (需求装置 box2d-py):gym.openai.com/envs/#box2d PyGame游戏环境(含Flappy Bird):github.com/ntasfi/PyGa… 3星环境:GYM环境 Robotics (需求装置 mujoco_py和试用许可证书):gym.openai.com/envs/#robot… 股票猜测环境:github.com/kh-kim/stoc… RLSchool四轴飞行器的 速度操控使命 “velocity_control”:github.com/PaddlePaddl… 4星环境:RLBench使命环境(运用机械臂完结某一项使命):github.com/stepjam/RLB… 5星环境:交通信号灯操控:github.com/Ujwal2910/S…