强化学习教程(一):Q-Learning原理与应用

技术

picture.image

点击上方蓝字,订阅话题,关注我们

picture.image

强化学习教程(一):Q-Learning原理与应用

picture.image

Q-Learning基本思想

    强化学习作为机器学习的一个大家族,已经有了丰富的算法理论支撑,比如:
  • 基于价值选取行为的算法:Q-Learning、Saras、DQN

  • 直接选取行为的算法:PG(Policy Gradients)

  • ... ...

      强化学习是基于一种无监督学习思想。对于Q-Learning而言,可以想象一个行为人(agent)在不知道目标标签是什么状态下,行为人(agent)在当前状态下(state)通过采取行动(action)获取一个(负)奖励(reward),在获取到奖励后,更新Q表中的Q值,经过所有轮次(episodes)迭代达到收敛后,更新完整个Q表,从而能够给出任意状态下,agent能够采取获得最大reward下的action的轨迹。
    

picture.image

Q-Learning原理理解

  • Q值的更新过程其实就是围绕下面的公式进行

picture.image

picture.image

  • Q-Learning算法流程

picture.image

picture.image

picture.image

案例1-Mountain Car

    这里展示一个经典的小车爬坡的案例,需要用到tf\_agent、OpenAI Gym等包,由于Gym目前在windows上不够友好,使用Colab

作为这个游戏的展示。

  • 安装环境包

                
1!sudo apt-get install -y xvfb ffmpeg x11-utils  
2!pip install -q 'gym==0.10.11'  
3!pip install -q 'imageio==2.4.0'  
4!pip install -q PILLOW  
5!pip install -q 'pyglet==1.3.2'  
6!pip install -q pyvirtualdisplay  
7!pip install -q tf-agents  

            
  • 可视化Mountain Car爬坡

                
 1import tf_agents  
 2from tf_agents.environments import suite_gym  
 3import PIL.Image  
 4import pyvirtualdisplay  
 5  
 6display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()  
 7  
 8env_name = 'MountainCar-v0'  
 9env = suite_gym.load(env_name)  
10env.reset()  
11PIL.Image.fromarray(env.render())
            

                
 1import gym  
 2from gym.wrappers import Monitor  
 3import glob  
 4import io  
 5import base64  
 6from IPython.display import HTML  
 7from pyvirtualdisplay import Display  
 8from IPython import display as ipythondisplay  
 9  
10display = Display(visible=0, size=(1400, 900))  
11display.start()  
12  
13"""  
14Utility functions to enable video recording of gym environment   
15and displaying it.  
16To enable video, just do "env = wrap\_env(env)""  
17"""  
18  
19def show\_video():  
20  mp4list = glob.glob('video/*.mp4')  
21  if len(mp4list) > 0:  
22    mp4 = mp4list[0]  
23    video = io.open(mp4, 'r+b').read()  
24    encoded = base64.b64encode(video)  
25    ipythondisplay.display(HTML(data='''<video alt="test" autoplay   
26                loop controls style="height: 400px;">  
27                <source src="data:video/mp4;base64,{0}" type="video/mp4" />  
28             </video>'''.format(encoded.decode('ascii'))))  
29  else:   
30    print("Could not find video")  
31  
32  
33def wrap\_env(env):  
34  env = Monitor(env, './video', force=True)  
35  return env
            

picture.image

小车爬坡环境提供了如下动作:

  • 0-向左的力

  • 1-无力

  • 2-向右的力

山地车环境由以下连续值组成:

  • state[0] - 表示位置

  • state[1] - 表示速度

  • 下面来看一下小车的位置和速度的变化(render用于环境的渲染):


                
 1import gym  
 2env = wrap_env(gym.make("MountainCar-v0"))  
 3env.reset()  
 4done = False  
 5  
 6i = 0  
 7while not done:  
 8    i += 1  
 9    state, reward, done, _ = env.step(2)  
10    env.render()  
11    print(f"Step {i}: State={state}, Reward={reward}")  
12  
13env.close()  

            
  • 位置、速度、奖励的部分轮次展示

                
 1Step 167: State=[-0.47331782  0.01239426], Reward=-1.0  
 2Step 168: State=[-0.46029923  0.01301859], Reward=-1.0  
 3Step 169: State=[-0.44675254  0.01354669], Reward=-1.0  
 4Step 170: State=[-0.43277711  0.01397543], Reward=-1.0  
 5Step 171: State=[-0.41847444  0.01430267], Reward=-1.0  
 6Step 172: State=[-0.4039472   0.01452724], Reward=-1.0  
 7Step 173: State=[-0.3892982  0.014649 ], Reward=-1.0  
 8Step 174: State=[-0.37462943  0.01466878], Reward=-1.0  
 9Step 175: State=[-0.3600411   0.01458833], Reward=-1.0  
10Step 176: State=[-0.34563082  0.01441028], Reward=-1.0  
11Step 177: State=[-0.33149278  0.01413803], Reward=-1.0  
12Step 178: State=[-0.3177171   0.01377568], Reward=-1.0  
13Step 179: State=[-0.30438921  0.01332789], Reward=-1.0  
14Step 180: State=[-0.29158942  0.01279979], Reward=-1.0  
15Step 181: State=[-0.27939257  0.01219685], Reward=-1.0  
16Step 182: State=[-0.26786777  0.0115248 ], Reward=-1.0  
17Step 183: State=[-0.25707826  0.01078951], Reward=-1.0  
18Step 184: State=[-0.24708137  0.00999688], Reward=-1.0  
19Step 185: State=[-0.23792856  0.00915281], Reward=-1.0  
20Step 186: State=[-0.22966547  0.00826309], Reward=-1.0  
21Step 187: State=[-0.2223321   0.00733338], Reward=-1.0  
22Step 188: State=[-0.21596293  0.00636917], Reward=-1.0  
23Step 189: State=[-0.21058716  0.00537577], Reward=-1.0  
24Step 190: State=[-0.20622886  0.0043583 ], Reward=-1.0  
25Step 191: State=[-0.20290716  0.0033217 ], Reward=-1.0  
26Step 192: State=[-0.20063641  0.00227075], Reward=-1.0  
27Step 193: State=[-0.19942631  0.00121011], Reward=-1.0  
28Step 194: State=[-1.99281965e-01  1.44341652e-04], Reward=-1.0  
29Step 195: State=[-0.200204   -0.00092203], Reward=-1.0  
30Step 196: State=[-0.20218851 -0.00198451], Reward=-1.0  
31Step 197: State=[-0.20522704 -0.00303853], Reward=-1.0  
32Step 198: State=[-0.20930653 -0.00407949], Reward=-1.0  
33Step 199: State=[-0.21440914 -0.00510261], Reward=-1.0  
34Step 200: State=[-0.22051217 -0.00610302], Reward=-1.0  

            
  • 小车运动的动画展示

                
 1env = wrap_env(gym.make("MountainCar-v0"))  
 2observation = env.reset()  
 3while True:  
 4    env.render()  
 5    #your agent goes here  
 6    action = env.action_space.sample()   
 7    observation, reward, done, info = env.step(action)   
 8    if done:   
 9      break;   
10env.close()  
11show_video()  

            
  • 下面演示小车整个的运动过程,由于Q-Learning不能处理连续值,因此需要对浮点型状态转化为离散的状态,即对每个合适的状态范围划分桶。

                
 1import gym  
 2import numpy as np  
 3  
 4# This function converts the floating point state values into   
 5# discrete values. This is often called binning.  We divide   
 6# the range that the state values might occupy and assign   
 7# each region to a bucket.  
 8def calc\_discrete\_state(state):  
 9    discrete_state = (state - env.observation_space.low)/buckets  
10    return tuple(discrete_state.astype(np.int))    

            
  • 创建一个运行游戏函数,提供一个q_table参数、是否应该更新q_table参数以及是否渲染小车动画。

                
 1def run\_game(q\_table, render, should\_update):  
 2    done = False  
 3    discrete_state = calc_discrete_state(env.reset())  
 4    success = False  
 5  
 6    while not done:  
 7        # Exploit or explore  
 8        if np.random.random() > epsilon:  
 9            # Exploit - use q-table to take current best action   
10            # (and probably refine)  
11            action = np.argmax(q_table[discrete_state])  
12        else:  
13            # Explore - t  
14            action = np.random.randint(0, env.action_space.n)  
15  
16        # Run simulation step  
17        new_state, reward, done, _ = env.step(action)  
18  
19        # Convert continuous state to discrete  
20        new_state_disc = calc_discrete_state(new_state)  
21  
22        # Have we reached the goal position (have we won?)?  
23        if new_state[0] >= env.unwrapped.goal_position:  
24            success = True  
25  
26        # Update q-table  
27        if should_update:  
28            max_future_q = np.max(q_table[new_state_disc])  
29            current_q = q_table[discrete_state + (action,)]  
30            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * \  
31              (reward + DISCOUNT * max_future_q)  
32            q_table[discrete_state + (action,)] = new_q  
33  
34        discrete_state = new_state_disc  
35  
36        if render:  
37            env.render()  
38  
39    return success
            

                
 1LEARNING_RATE = 0.1  
 2DISCOUNT = 0.95  
 3EPISODES = 50000  
 4SHOW_EVERY = 1000  
 5  
 6DISCRETE_GRID_SIZE = [10, 10]  
 7START_EPSILON_DECAYING = 0.5  
 8END_EPSILON_DECAYING = EPISODES//10  
 9  
10env = wrap_env(gym.make("MountainCar-v0"))  
11  
12epsilon = 1    
13epsilon_change = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)  
14buckets = (env.observation_space.high - env.observation_space.low) \  
15    /DISCRETE_GRID_SIZE  
16q_table = np.random.uniform(low=-3, high=0, size=(DISCRETE_GRID_SIZE \  
17    + [env.action_space.n]))  
18success = False
            

                
 1success_count = 0  
 2  
 3# Loop through the required number of episodes  
 4while episode<EPISODES:  
 5    episode+=1  
 6    done = False  
 7  
 8    # Run the game.  If we are local, display render animation at SHOW\_EVERY  
 9    # intervals.   
10    if episode % SHOW_EVERY == 0:  
11        print(f"Current episode: {episode}, success: {success\_count}" +\  
12              " ({float(success\_count)/SHOW\_EVERY})")  
13        success = run_game(q_table, True, False)  
14        success_count = 0  
15    else:  
16        success = run_game(q_table, False, True)  
17  
18    # Count successes  
19    if success:  
20        success_count += 1  
21  
22    # Move epsilon towards its ending value, if it still needs to move  
23    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:  
24        epsilon = max(0, epsilon - epsilon_change)  
25  
26print(success)
            

                
1run_game(q_table, True, False)  
2show_video()  

            
  • 小车输出实例

                
 1Current episode: 1000, success: 0 (0.0)  
 2Current episode: 2000, success: 0 (0.0)  
 3Current episode: 3000, success: 0 (0.0)  
 4Current episode: 4000, success: 29 (0.029)  
 5Current episode: 5000, success: 345 (0.345)  
 6Current episode: 6000, success: 834 (0.834)  
 7Current episode: 7000, success: 797 (0.797)  
 8Current episode: 8000, success: 679 (0.679)  
 9Current episode: 9000, success: 600 (0.6)  
10Current episode: 10000, success: 728 (0.728)  
11Current episode: 11000, success: 205 (0.205)  
12Current episode: 12000, success: 612 (0.612)  
13Current episode: 13000, success: 733 (0.733)  
14Current episode: 14000, success: 1000 (1.0)  
15Current episode: 15000, success: 998 (0.998)  
16... ...  
17Current episode: 37000, success: 301 (0.301)  
18Current episode: 38000, success: 322 (0.322)  
19Current episode: 39000, success: 292 (0.292)  
20Current episode: 40000, success: 299 (0.299)  
21Current episode: 41000, success: 281 (0.281)  
22Current episode: 42000, success: 233 (0.233)  
23Current episode: 43000, success: 380 (0.38)  
24Current episode: 44000, success: 598 (0.598)  
25Current episode: 45000, success: 933 (0.933)  
26Current episode: 46000, success: 986 (0.986)  
27Current episode: 47000, success: 1000 (1.0)  
28Current episode: 48000, success: 1000 (1.0)  
29Current episode: 49000, success: 1000 (1.0)  
30Current episode: 50000, success: 1000 (1.0)  
31True  

            
  • 观察q_table的变化

                
1import pandas as pd  
2  
3df = pd.DataFrame(q_table.argmax(axis=2))  
4  
5df.columns = [f'v-{x}' for x in range(DISCRETE_GRID_SIZE[0])]  
6df.index = [f'p-{x}' for x in range(DISCRETE_GRID_SIZE[1])]  
7df
            

picture.image

picture.image

案例2-走出迷宫

  • 这个迷宫案例会更直观的看到奖励是如何给出,以及在每一个状态转移后,Q_table中Q值是如何更新的。

  • 首先给出了一个房间0-4,5是行为人(agent)最终需要走的地方,即外面的世界。可以将0-5都看作agent的当前状态。

picture.image

  • 基于上述描述可以简化建模如下,其中箭头表示状态之间可以转移,即房间是互通的,而状态5转向自身,即agent已经达到目的,获得最大奖励,不需要进行状态转移,即状态可以终止,即收敛。

picture.image

  • 基于上述的简化建模,我们在初始化agent在某个状态下的时候,需要实现确定每个状态转移的奖励值。我们知道,达到状态5是最终目标,即可以给出最大奖励100,而房间0-4状态虽然能互相转移,但并没有达到目的,可以认为获得奖励为0,而房间不能互通的获得奖励为-1,得到奖励矩阵为:

picture.image picture.image

  • 同时,给出初始化的Q表,我们定义为零矩阵:

picture.image

  • 此时假设agent的初始状态1,即在房间1,在R矩阵第二行能够获取非负奖励的只有状态3或者状态5,随机地,转移至状态5(也可以叫采取的行动为5),那么状态1转移至下一个状态5,更新的Q(1,5)为:

picture.image

  • 此时agent更新的Q表为:

picture.image

  • 至此,我们称完成一个episode,下一个episode,需要重新更新初始化状态s,从而继续更新Q表。
  • 最后,Q表能够收敛更新为:

picture.image

  • 对Q表非0元素除以Q表最大元素值进行规范化处理(省略百分号):

picture.image

  • 此时,给出任意状态s,都将会根据Q表给出获取最大奖励的行为轨迹,例如这里当agent在状态2下时,其行为轨迹就为2-3-1-5(由于贪婪策略也可能随机选择4)。

picture.image

picture.image

参考文献:

END

欢迎加入学习交流群

picture.image

添加助手微信,可加入微信交流群

picture.image

或者后台回复“学习交流群”

picture.image

picture.image

发现“在看”和“赞”了吗,戳我试试吧

picture.image

0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论