您的位置:首页 > 其它

【强化学习笔记】基于蒙特卡洛的强化学习算法

2018-11-13 21:49 302 查看
[code]import numpy as np
import pandas as pd
import random

class MC_RL(object):

def __init__(self, states_list, action_lists, gamma):
self.states = states_list
self.actions = action_lists
self.gamma = gamma

# 返回按照随机规则生成的 num 组实验的state_sample,action_sample,reward_sample(起始点随机+中间过程中选择动作也完全随机)
def gen_randompi_sample(self, num):  # 采集num次的实验
state_sample = []
action_sample = []
reward_sample = []
for i in range(num):
s_tmp = []
a_tmp = []
r_tmp = []
# 随机选择“初始状态”
s = self.states[int(random.random() * len(self.states))]
t = False  # 标记是否到达终点
while t == False:
a = self.actions[int(random.random() * len(self.actions))]  # 过程中随机选择“动作”
t, state_next, r = self.transform(s, a)
# t:这次行动是否到达终点
# state_next:这次行动后到达了哪一个状态
# r:这次行动获得的奖励
s = state_next
# 接下来更新实验的轨迹
s_tmp.append(s)
a_tmp.append(a)
r_tmp.appned(r)
# 当跳出循环时,s_tmp a_tmp r_tmp  已经是完整过程(起点——>终点)的序列了
state_sample.append(s_tmp)
reward_sample.append(r_tmp)
action_sample.append(a_tmp)
return state_sample, action_sample, reward_sample

# 输入:采样的state_sample,action_sample,reward_sample
# 返回: 更新后的vfunc,里面是每一个状态对应的值函数
def mc_evaluate(self, state_sample, action_sample, reward_sample):
vfunc = dict()
nfunc = dict()
for s in self.states:
vfunc[s] = 0.0
nfunc[s] = 0.0
for iter1 in range(len(state_sample)):  # 遍历num组实验
G = 0.0  # 要针对每组实验,计算每一组实验的初始状态 的 累计回报
for step in range(len(state_sample[iter1]) - 1, -1, -1):
G = G * self.gamma
G = G + reward_sample[iter1][step]
# 对于此条实验的初始状态的累计回报 G ,计算完毕。

# 接下来 正向计算 每条状态的 累计回报G
for step in range(len(state_sample[iter1])):
s = state_sample[iter1][step]
vfunc[s] = vfunc[s] + G
nfunc[s] = nfunc[s] + 1
G = (G - reward_sample[iter1][step]) / self.gamma
#  至此,所有状态处 的 累计回报值 更新完毕
for s in self.states:
if nfunc[s] >= 1:
vfunc[s] = vfunc[s] / nfunc[s]
print("MC_SAMPLE完毕")
return vfunc

# ****************************** 以上两个方法实现了蒙特卡洛评估*********************************************

# ****************************** 接下来用蒙特卡洛实现 策略改进 即 对q(s,a)进行更新**************************

# 输入: 策略改进时候的采样轮数 and epsilon   (策略改进的时候,对动作的选取,采用的策略是epsilon-greedy策略)
# 输出: 蒙特卡洛改进后的 状态-动作 函数
def mc(self, num_iter, epsilon):
qfunc = dict()  # 根据greedy策略生成 策略改进 的采样样本时,需要使用到qfunc,我们在这里提前定义好
nqfunc = dict()
for s in self.states:
for a in self.actions:
qfunc["%s_%d" % (s, a)] = 0.0  # 动作 用字符串表示 , 状态 用整数表示
nqfunc["%s_%d" % (s, a)] = 0.0
# 策略改进过程中,生成采样的策略是epsilon-greedy策略,下面就是epsilon-greedy采样部分
for iter in range(num_iter):
s_sample = []
a_sample = []
r_sample = []
# epsilon-greedy的策略要求:先随机选一个初始点,在接下来过程中的动作选择中,遵循epsilon-greedy策略
s = self.states[int(random.random() * len(self.states))]
t = False  # 标记是否到达终点
while t == False:
a = epsilon_greedy(qfunc, s, epsilon)  # epsilon_greedy()未定义 按需求自己给出
r, s_next, t = env_grid.transform(s, a)  # env_grid 是环境
s_sample.append(s)
a_sample.append(a)
r_sample.append(r)
s = s_next
# 针对刚才生成的一幕完整的实验episode,进行 策略改进(计算出现在episode中的每一个 状态-动作对,若出现多次)
g = 0.0
# 计算采样序列的 起始状态的 回报值 g
for i in range(len(s_sample) - 1, -1, -1):  # 反向遍历采样到的序列
g = g * self.gamma
g = g + r_sample[i]
# 正向遍历采样到的序列 当中的 每一个 状态-动作对
for i in range(len(s_sample)):
key = "%d_%s" % (s_sample[i], a_sample[i])
nqfunc[key] = nqfunc[key] + 1
# 把新的s-a的回报g 和 旧的 qfunc[s,a]的回报 ,一起重新计算,求得  更新后的 qfunc[s,a]
qfunc[key] = (qfunc[key] * (nqfunc[key] - 1) + g) / nqfunc[key]
# 现在的g对应的是i,针对下一个状态i+1,更新下一个状态的回报g
g = (g - r_sample[i]) / self.gamma
# 总体思路是: 进行 num_iter 轮采样,每一轮采样后对策略q(s,a)进行一次更新
# 更新的具体做法是: 遍历采样序列的每一个s-a对,根据本轮新求得的每一个sa的回报,综合以前求得sa的回报,根据经历sa的次数,做平均
return qfunc

直接上代码 注释很清楚了 初学 有误麻烦帮忙指正

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: