您的位置:首页 > 移动开发

tensorflow训练ai玩flappybird

2018-09-10 14:10 281 查看

tensorflow训练ai玩flappybird

# -*-coding: utf-8 -*-

import time
import random
import numpy as np
import Tkinter as tk
import matplotlib.pyplot as plt
import tensorflow as tf
tf.enable_eager_execution()
tfe = tf.contrib.eager

class DeepQNetwork:
'''
这是ai的实现
'''

def __init__(self,
feature_dimension,
action_dimension,
adventure_rate=0.1,
learning_rate=0.01,
reward_decay=0.9,
experience_length=20000,
experience_replay_cycle=100,
learning_cycle=50,
learning_batch_length=32,
weights_path='./flappybird_dqn'):
'''
feature_dimension: 状态的特征数量
action_dimension: 可采取的行动数量
adventure_rate: 在学习阶段,用一点概率去采取随机行动
learning_rate: 学习率
reward_decay: 激励衰减
experience_length: 经验记忆长度
experience_replay_cycle: 参数替换周期
learning_cycle: 学习周期
learning_batch: 学习的批量
'''

self.feature_dimension = feature_dimension
self.action_dimension = action_dimension
self.adventure_rate = adventure_rate
self.learning_rate = learning_rate
self.reward_decay = reward_decay
self.experience_length = experience_length
self.experience_replay_cycle = experience_replay_cycle
self.learning_cycle = learning_cycle
self.learning_batch_length = learning_batch_length
self.model_weights_path = weights_path

self.evaluate_model = tf.keras.Sequential([
tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)),
tf.keras.layers.Dense(action_dimension)])

self.target_model = tf.keras.Sequential([
tf.keras.layers.Dense(16, activation=tf.nn.relu, input_shape=(feature_dimension,)),
tf.keras.layers.Dense(action_dimension)])

self.optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
self.global_step = tf.train.get_or_create_global_step()

self.experience_pool = np.zeros((experience_length, feature_dimension + 1 + 1 + feature_dimension))

self.train_loss_results = []
self.train_accuracy_results = []

self.learning_counter = 0
self.experience_counter = 0

def _loss(self, learning_batch):
q_next = self.target_model(learning_batch[:, -self.feature_dimension:]).numpy()
q = self.evaluate_model(learning_batch[:, :self.feature_dimension])
q_target = q.numpy()

batch_index = np.arange(self.learning_batch_length, dtype=np.int32)
eval_act_index = learning_batch[:, self.feature_dimension].astype(int)
reward = learning_batch[:, self.feature_dimension + 1]
q_target[batch_index, eval_act_index] = reward + self.reward_decay * np.max(q_next, axis=1)
return tf.reduce_mean(tf.squared_difference(q, q_target))

def _grad(self, learning_batch):
with tf.GradientTape() as tape:
loss_value = self._loss(learning_batch)
return loss_value, tape.gradient(loss_value, self.evaluate_model.trainable_variables)

def _train(self, learning_batch):
loss_value, grads = self._grad(learning_batch)
self.optimizer.apply_gradients(zip(grads, self.evaluate_model.variables),
self.global_step)

def adventure_action(self, s):
# ai给出的下一步动作
# 动作不完全是ai给出的
# 有部分需要冒险,才能训练ai
if self.experience_counter < self.experience_length or np.random.uniform() < self.adventure_rate:
return np.random.randint(0, self.action_dimension)

return self.get_action(s)

def get_action(self, s):
# 完全由ai给出下一步动作
s = s[np.newaxis, :]
q = self.evaluate_model(s)
return np.argmax(q)

def save_experience(self, s, a, r, s_next):
# 保存经验值,这是ai的学习数据
save = False
l = False
replay = False

expericence = np.hstack((s, [a, r], s_next))
self.experience_pool[self.experience_counter % self.experience_length, :] = expericence
self.experience_counter += 1
save = True

if self.experience_counter >= self.experience_length and self.experience_counter % self.learning_cycle == 0:
sample_index = np.random.choice(self.experience_length, size=self.learning_batch_length)
learning_batch = self.experience_pool[sample_index, :]
self._train(learning_batch)

self.learning_counter += 1
l = True

if self.experience_counter >= self.experience_length and self.learning_counter % self.experience_replay_cycle == 0:
self.evaluate_model.save_weights(self.model_weights_path)
self.target_model.load_weights(self.model_weights_path)
replay = True

return save, l, replay

def save_weights(self):
# 保存训练的ai
self.evaluate_model.save_weights(self.model_weights_path)

def load_weights(self):
# 读取训练好的ai
self.evaluate_model.load_weights(self.model_weights_path)
self.target_model.load_weights(self.model_weights_path)

TUBE_WIDTH = 18
TUBE_HEIGHT = 9
RECT_DENSITY = 10

def array_create(width, height):
array = []
for i in range(height):
line = []
for j in range(width):
line.append(0)
array.append(line)
return array

def block_add(tube, x):
direction = random.randint(0, 1)
height = random.randint(1, TUBE_HEIGHT - 2)

start = 0
end = height
if direction > 0:
start = TUBE_HEIGHT - height
end = TUBE_HEIGHT
height = 0 - height

for i in range(start, end):
tube[i][x] = 1

return height

class flappy_env:
'''
这是游戏的实现
'''

def __init__(self):
self.binary = []
for i in range(TUBE_WIDTH):
self.binary.append(2 ** i)

self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT)
self.bird = TUBE_HEIGHT / 2
self.tube_len = TUBE_WIDTH

self.block = [0, 0]
self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1)
self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)

def _create_observation(self):
return self._create_observation2()

def _create_observation0(self):
observation = []
for i in range(TUBE_HEIGHT):
observation.append(0)
for j in range(TUBE_WIDTH):
observation[i] += self.binary[j] * self.tube[i][j]

if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0:
observation[self.bird] += 1

return np.array(observation)

def _create_observation1(self):
observation = []
for i in range(TUBE_HEIGHT):
for j in range(TUBE_WIDTH):
observation.append(self.tube[i][j])

if 0 <= self.bird and self.bird < TUBE_HEIGHT and self.tube[self.bird][0] == 0:
observation[self.bird * TUBE_WIDTH] = 1

return np.array(observation)

def _create_observation2(self):
observation = [self.bird, self.tube_len % TUBE_WIDTH, self.block[0], self.block[1]]
return np.array(observation)

def reset(self):
self.tube = array_create(TUBE_WIDTH, TUBE_HEIGHT)
self.bird = TUBE_HEIGHT / 2
self.tube_len = TUBE_WIDTH
self.block = [0, 0]
self.block[0] = block_add(self.tube, TUBE_WIDTH / 2 - 1)
self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)

def feature_len(self):
# return TUBE_HEIGHT * TUBE_WIDTH
return 4

def action_len(self):
return 2

def step(self, action):
'''
输入
action, 动作
输出
observation,动作之后的状态
reward,该步动作的得分
done,游戏是否结束
'''
done = False
reward = 0

action = max(0, min(action, 1))
if action > 0:
self.bird -= 1
else:
self.bird += 1

for i in range(TUBE_HEIGHT):
for j in range(TUBE_WIDTH - 1):
self.tube[i][j] = self.tube[i][j + 1]

for i in range(TUBE_HEIGHT):
self.tube[i][TUBE_WIDTH - 1] = 0

self.tube_len += 1
if self.tube_len % (TUBE_WIDTH / 2) == 0:
self.block[0] = self.block[1]
self.block[1] = block_add(self.tube, TUBE_WIDTH - 1)

if self.bird < 0 or TUBE_HEIGHT <= self.bird or self.tube[self.bird][0] != 0:
done = True
reward = -10
else :
done = False
reward = 1
s = 0
for i in range(TUBE_HEIGHT):
s += self.tube[i][0]
if s > 0:
reward = 10

observation = self._create_observation()
return observation, reward, done

static_data = []

def tf_play():
'''
用训练好的ai玩游戏
'''
env = flappy_env()
env.reset()

RL = DeepQNetwork(env.feature_len(), env.action_len())
RL.load_weights()
# RL.restore_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt')

window = tk.Tk()
canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)
rect_array = []
for i in range(TUBE_HEIGHT):
rect_line = []
for j in range(TUBE_WIDTH):
rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')
rect_line.append(rect_item)
rect_array.append(rect_line)
canvas.pack()

def update():
for i in range(TUBE_HEIGHT):
for j in range(TUBE_WIDTH):
rect_item = rect_array[i][j]
rect_fill = 'white'
if env.tube[i][j] > 0:
rect_fill = 'black'
canvas.itemconfig(rect_item, fill=rect_fill)

if 0 <= env.bird and env.bird < TUBE_HEIGHT:
rect_item = rect_array[env.bird][0]
canvas.itemconfig(rect_item, fill='red')

window.update()

def show_run():
env.reset()
observation = env._create_observation()
done = False
one_count = 0
while not done:
update()
time.sleep(0.1)
action = RL.get_action(observation)
observation_, reward, done = env.step(action)
observation = observation_
one_count += 1
if one_count > 2000:
done = True

def run():
action = 0
while True:
try:
action = int(input('input number(0<=x<=2):'))
except:
pass

if action == 0:
break

show_run()

window.destroy()

window.after(100, run)
window.mainloop()

def tf_train():
'''
训练ai玩游戏
'''
global static_data
static_data = []
env = flappy_env()
env.reset()

RL = DeepQNetwork(env.feature_len(), env.action_len())

window = tk.Tk()
canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)
rect_array = []
for i in range(TUBE_HEIGHT):
rect_line = []
for j in range(TUBE_WIDTH):
rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')
rect_line.append(rect_item)
rect_array.append(rect_line)
canvas.pack()

def show_run():
env.reset()
observation = env._create_observation()
done = False
one_count = 0
while not done:
update()
time.sleep(0.2)
action = RL.get_action(observation)
observation_, reward, done = env.step(action)
observation = observation_
one_count += 1
if one_count > 2000:
done = True

def tst_rl(count):
score_max = 0
score_min = 10000000000
score_sum = 0
for i in range(count):
env.reset()
observation = env._create_observation()
done = False
one_count = 0
while not done:
action = RL.get_action(observation)
observation_, reward, done = env.step(action)
observation = observation_
one_count += 1
if one_count > 2000:
done = True
if done:
score_max = max(env.tube_len, score_max)
score_min =
3ff0
min(env.tube_len, score_min)
score_sum += env.tube_len
return score_min, score_max, score_sum / count

def update():
for i in range(TUBE_HEIGHT):
for j in range(TUBE_WIDTH):
rect_item = rect_array[i][j]
rect_fill = 'white'
if env.tube[i][j] > 0:
rect_fill = 'black'
canvas.itemconfig(rect_item, fill=rect_fill)

if 0 <= env.bird and env.bird < TUBE_HEIGHT:
rect_item = rect_array[env.bird][0]
canvas.itemconfig(rect_item, fill='red')

window.update()

def run():
global static_data
step = 0
score_sum = 0
score_max = 0
all_done = False
for count in range(1000001):  # 1000001
env.reset()
observation = env._create_observation()

done = False

one_count = 0
while not done:
action = RL.adventure_action(observation)
observation_, reward, done = env.step(action)
s, l, r = RL.save_experience(observation, action, reward, observation_)

observation = observation_
one_count += 1
if one_count > 2000:
done = True

if l and RL.learning_counter % 100 == 0:
done = True

if done:
score_max = max(env.tube_len, score_max)
score_sum += env.tube_len

if count % 50 == 0:
tst_data = tst_rl(20)
print count, tst_data[0], tst_data[1], tst_data[2]
static_data.append([count, tst_data[0], tst_data[1], tst_data[2]])
if tst_data[0] >= 2000:
all_done = True
break

step += 1

if all_done:
break;

for i in range(5):
tst_data = tst_rl(20)
count += 50
print count, tst_data[0], tst_data[1], tst_data[2]
static_data.append([count, tst_data[0], tst_data[1], tst_data[2]])

# RL.save_model('/home/kaie/work/web/workspace/HelloPython/src/dqnsample/flappy.ckpt')
RL.save_weights()

action = 0
while True:
try:
action = int(input('input number(0<=x<=2):'))
except:
pass

if action == 0:
break

show_run()

window.destroy()

window.after(100, run)
window.mainloop()

static_data = np.array(static_data)
plt.plot(static_data[:, 0], static_data[:, 1])
plt.plot(static_data[:, 0], static_data[:, 2])
plt.plot(static_data[:, 0], static_data[:, 3])
plt.show()
plt.plot(RL.loss_history)
plt.show()

def tk_play():
'''
人工玩游戏
'''
env = flappy_env()
env.reset()

window = tk.Tk()
canvas = tk.Canvas(window, bg='white', height=RECT_DENSITY * TUBE_HEIGHT, width=RECT_DENSITY * TUBE_WIDTH)
rect_array = []
for i in range(TUBE_HEIGHT):
rect_line = []
for j in range(TUBE_WIDTH):
rect_item = canvas.create_rectangle(RECT_DENSITY * j, RECT_DENSITY * i, RECT_DENSITY * j + RECT_DENSITY, RECT_DENSITY * i + RECT_DENSITY, fill='white')
rect_line.append(rect_item)
rect_array.append(rect_line)
canvas.pack()

def update():
for i in range(TUBE_HEIGHT):
for j in range(TUBE_WIDTH):
rect_item = rect_array[i][j]
rect_fill = 'white'
if env.tube[i][j] > 0:
rect_fill = 'black'
canvas.itemconfig(rect_item, fill=rect_fill)

if 0 <= env.bird and env.bird < TUBE_HEIGHT:
rect_item = rect_array[env.bird][0]
canvas.itemconfig(rect_item, fill='red')

window.update()

def run():
done = False
while not done:
update()

action = 3
while not (0 <= action and action <= 2):
try:
action = int(input('input number(0<=x<=2):'))
except:
pass

if action == 2:
done = True
break

observation, reward, done = env.step(action)

print 'game over:%d' % env.tube_len
window.destroy()

window.after(100, run)
window.mainloop()

if __name__ == '__main__':
tf_train()
# tf_play()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息