您的位置:首页 > 理论基础 > 计算机网络

循环神经网络(RNN)练习:比特币市场的分析与预测

2017-07-14 15:50 871 查看

题目:

⽐特币(Bitcoin)是⼀种⽤区块链作为⽀付系统的加密货币。由中本聪在2009 年基于⽆国界的对等⽹络,⽤共识主动性开源软件发明创⽴,通过加密数字签名,不需通过任何第 三⽅信⽤机构,解决了电⼦货币的⼀币多付和交易安全问题,从⽽演化成为⼀个超主权货币体系(以上内容根据维基百科)。2017 年,⽐特币的价格持续上涨,⽐特币的价格从今年年初的约 1000 美元,涨到了约 3000 美元,⽬前回落⾄ 2500 美元附近波动。下图展⽰了最近两个⽉⽐特币的价格波动情况。



我们⾮常希望能准确把握其价格的波动情况,习题为利⽤ LSTM 来预测:

(1) 当⽇⽐特币价格;

(2) ⼀周内的波动率(Volatility),它反映了进⾏⽐特币投资时可能遇到的风险。

注记:

(1) 之所以选择价格和波动率这两个量来进⾏预测,是因为当我们进⾏投资时,我们不只需要知道预期的收益,也需要知道相应的风险,只有同时了解了这两个⽅⾯的信息,我们才能更理性地进⾏投资。

(2) 在参考例中,原作者的TensorFlow 程序直接预测的是股票的“价格”,然⽽这并不是⾦融领域的标准做法。在⾦融理论中,有效市场假说(Efficient Market Hypothesis)可以导出下⾯的结论:如果股市具有弱形式有效性(Weak form efficiency),那么股票价格的变化服从随机行走。股票价格的“随机行走”并⾮是指股票价格完全随机,⽽是指股票价格的波动服从随机⾏⾛。⽤⼀个例⼦来简要说明数据的预处理⽅法,假设四天内股票的收盘价格 y 为 [100, 101, 103, 97],那么我们通常将价格数据预处理为
x 为 [0.01, 0.0198, -0.0583] ,即:



通常所说的⾦融市场上的“随机⾏⾛”指的是价格波动情况 x 的随机行走。而波动率(Volatility)通常被引⼊来衡量标的资产价格或投资回报率波动的剧烈程度。简单来说,波动率 Vol 被定义为⼀段时间内 x 的标准差 Vol = std(x)。我们在⽤ RNN 进⾏价格预测时,建议先对数据进⾏上述的预处理,以便于以后进⾏更为系统的计量经济学或⾦融物理学研究。

数据处理:

(1) 选最高价格High,用20天的数据预测未来一天的价格;

      【all dataset:1512,train dataset:1300,test dataset:212】

(2) 将High价格提取出来作预处理,计算上面说的收益率x,一周波动率Vol,选x为特征,预测对象为Vol。

      网站上下载的原始数据是这样(左),提取后是这样(右):

      【all dataset:1526,train dataset:1300,test dataset:226】

      

          


LSTM代码和预测结果:

两个目标的代码基本一致,只有输入数据size有些不同

(1) 用一段时间(20天)的最高价格数据预测未来一天(第21天)的价格

#coding=utf-8

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

time_step=50      #时间步
rnn_unit=30       #hidden layer units
batch_size=60     #每一批次训练多少个样例
input_size=1      #输入层维度
output_size=1     #输出层维度
lr=0.0006         #学习率
FLAG='train'

#——————————————————导入数据——————————————————————
f=open('.\dataset\Bitcoin_rev_high.csv')
df=pd.read_csv(f)
data=df.iloc[:,0].values
normalize_data=(data-np.mean(data))/np.std(data)  #标准化
normalize_data=normalize_data[:,np.newaxis]       #增加维度

data_x,data_y=[],[]
for i in range(len(normalize_data)-time_step-1):
x=normalize_data[i:i+time_step]
y=normalize_data[i+time_step]    #用前20天预测未来1天,短期预测
data_x.append(x.tolist())
data_y.append(y.tolist())
data_y=np.reshape(data_y,(-1,1,1))
#分训练集和测试集
train_num=1300
train_x=data_x[0:train_num]
train_y=data_y[0:train_num]
test_x=data_x[train_num:]
test_y=data_y[train_num:]

#—————————————————定义神经网络变量————————————————
X=tf.placeholder(tf.float32, [None,time_step,input_size])
Y=tf.placeholder(tf.float32, [None,1,output_size])
#输入层、输出层权重、偏置
weights={
'in':tf.Variable(tf.random_normal([input_size,rnn_unit])),
'out':tf.Variable(tf.random_normal([rnn_unit,1]))
}
biases={
'in':tf.Variable(tf.constant(1.0,shape=[rnn_unit,])),
'out':tf.Variable(tf.constant(1.0,shape=[1,]))
}

#—————————————————定义神经网络—————————————————
def lstm(batch):
w_in=weights['in']
b_in=biases['in']
X_in=tf.reshape(X,[-1,input_size])    #将X转换成2维,为了输入层'in'的输入
input_rnn=tf.matmul(X_in,w_in)+b_in
input_rnn=tf.reshape(input_rnn,[-1,time_step,rnn_unit])   #将tensor转换回3维,作为lstm cell的输入
cell=tf.nn.rnn_cell.BasicLSTMCell(rnn_unit)
init_state=cell.zero_state(batch,dtype=tf.float32)
output_rnn, final_states=tf.nn.dynamic_rnn(cell, input_rnn,initial_state=init_state, dtype=tf.float32)  #output_rnn是记录lstm每个输出节点的结果,final_states是最后一个cell的结果
outputs=tf.unstack(tf.transpose(output_rnn, [1,0,2]))   #作为输出层'out'的输入
w_out=weights['out']
b_out=biases['out']
pred=tf.matmul(outputs[-1],w_out)+b_out  #time_step只取最后一项
return pred,final_states

#——————————————————训练模型——————————————————
def train_lstm():
global batch_size
pred,_=lstm(batch_size)
print('train pred', pred.get_shape())
#损失函数
loss=tf.reduce_mean(tf.square(tf.reshape(pred,[-1,1])-tf.reshape(Y,[-1,1])))
train_op=tf.train.AdamOptimizer(lr).minimize(loss)

saver=tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
start=0
end=start+batch_size
for i in range(1000):
_,loss_=sess.run([train_op,loss],feed_dict={X:train_x[start:end],Y:train_y[start:end]})
loss_list.append(loss_)
start+=batch_size
end=start+batch_size
print(i,loss_)
if i%30==0:
print("保存模型:",saver.save(sess,'./savemodel_1/bitcoin.ckpt'))
if end<len(train_x):
start=0
end=start+batch_size

#————————————————预测模型————————————————————
def prediction():
global test_y
pred,_=lstm(len(test_x)) #预测时只输入[test_batch,time_step,input_size]的测试数据
print('test pred', pred.get_shape())
saver=tf.train.Saver()
with tf.Ses
4000
sion() as sess:
#参数恢复
module_file = './savemodel_1/bitcoin.ckpt'
saver.restore(sess, module_file)
#取测试样本,shape=[test_batch,time_step,input_size]
test_pred=sess.run(pred, feed_dict={X:test_x})
test_pred=np.reshape(test_pred, (-1))
test_y=np.reshape(test_y, (-1))
#以折线图表示结果
plt.figure()
plt.plot(range(len(test_y)), test_y, 'r-', label='real')
plt.plot(range(len(test_pred)), (test_pred), 'b-', label='pred')
plt.legend(loc=0)
plt.title('prediction')
plt.show()

if __name__ == '__main__':
if FLAG=='train':
train_lstm()
elif FLAG=='test':
prediction()
训练的损失曲线(左),预测的价格曲线(右):





(2) 用第t天的收益率x,预测第t天的未来一周波动率Vol

#coding=utf-8

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

#定义常量
rnn_unit=10
input_size=1
output_size=1
lr=0.0006
FLAG='train'

#——————————————————导入数据——————————————————————
f=open('.\dataset\Bitcoin_proc_rev_x.csv')
df=pd.read_csv(f)
data=df.iloc[:,0:2].values  #取第3-10列

#获取训练集
def get_train_data(batch_size=40,time_step=15,train_begin=0,train_end=1300):
batch_index=[]
data_train=data[train_begin:train_end,]
data_train=(data_train-np.mean(data_train,axis=0))/np.std(data_train,axis=0) #标准化
train_x,train_y=[],[]
for i in range(len(data_train)-time_step):
if i % batch_size==0:
batch_index.append(i)
x=data_train[i:i+time_step,0:1]
y=data_train[i:i+time_step,1]
train_x.append(x.tolist())
train_y.append(y.tolist())
batch_index.append((len(data_train)-time_step))
return batch_index,train_x,train_y

#获取测试集
def get_test_data(time_step=15,test_begin=1300):
data_test=data[test_begin:]
mean=np.mean(data_test,axis=0)
std=np.std(data_test,axis=0)
data_test=(data_test-mean)/std   #标准化
size=(len(data_test)+time_step-1)//time_step
test_x,test_y=[],[]
for i in range(size-1):
x=data_test[i*time_step:(i+1)*time_step,0:1]
y=data_test[i*time_step:(i+1)*time_step,1]
test_x.append(x.tolist())
test_y.extend(y)
test_x.append((data_test[(i+1)*time_step:,0:1]).tolist())
test_y.extend((data_test[(i+1)*time_step:,1]).tolist())
return mean,std,test_x,test_y

#—————————————————定义网络变量————————————————
#输入层、输出层权重、偏置
weights={
'in':tf.Variable(tf.random_normal([input_size,rnn_unit])),
'out':tf.Variable(tf.random_normal([rnn_unit,1]))
}
biases={
'in':tf.Variable(tf.constant(0.1,shape=[rnn_unit,])),
'out':tf.Variable(tf.constant(0.1,shape=[1,]))
}

#—————————————————定义神经网络————————————————
def lstm(X):
batch_size=tf.shape(X)[0]
time_step=tf.shape(X)[1]
w_in=weights['in']
b_in=biases['in']
input=tf.reshape(X,[-1,input_size])
input_rnn=tf.matmul(input,w_in)+b_in
input_rnn=tf.reshape(input_rnn,[-1,time_step,rnn_unit])  #将tensor转成3维,作为lstm cell的输入
cell=tf.nn.rnn_cell.BasicLSTMCell(rnn_unit)
init_state=cell.zero_state(batch_size,dtype=tf.float32)
output_rnn,final_states=tf.nn.dynamic_rnn(cell, input_rnn,initial_state=init_state, dtype=tf.float32)  #output_rnn是记录lstm每个输出节点的结果,final_states是最后一个cell的结果
output=tf.reshape(output_rnn,[-1,rnn_unit]) #作为输出层的输入
w_out=weights['out']
b_out=biases['out']
pred=tf.matmul(output,w_out)+b_out
return pred,final_states

#——————————————————训练模型——————————————————
def train_lstm(batch_size=40,time_step=15,train_begin=0,train_end=1300):   #共1526
X=tf.placeholder(tf.float32, shape=[None,time_step,input_size])
Y=tf.placeholder(tf.float32, shape=[None,time_step,output_size])
batch_index,train_x,train_y=get_train_data(batch_size,time_step,train_begin,train_end)
train_x = np.reshape(train_x,(-1,time_step,input_size))
train_y = np.reshape(train_y,(-1,time_step,output_size))
pred,_=lstm(X)
#损失函数
loss=tf.reduce_mean(tf.square(tf.reshape(pred,[-1])-tf.reshape(Y, [-1])))
train_op=tf.train.AdamOptimizer(lr).minimize(loss)
#train_op=tf.train.GradientDescentOptimizer(lr).minimize(loss)

saver=tf.train.Saver()
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(1500):
for step in range(len(batch_index)-1):
_,loss_=sess.run([train_op,loss],feed_dict={X:train_x[batch_index[step]:batch_index[step+1]],Y:train_y[batch_index[step]:batch_index[step+1]]})
if i % 50==0:
print("保存模型:",saver.save(sess,'./savemodel/bitcoin.ckpt'))

#——————————————————预测模型——————————————————
def prediction(time_step=15):
X=tf.placeholder(tf.float32, shape=[None,time_step,input_size])
#Y=tf.placeholder(tf.float32, shape=[None,time_step,output_size])
mean,std,test_x,test_y=get_test_data(time_step)
pred,_=lstm(X)
saver=tf.train.Saver()
with tf.Session() as sess:
#参数恢复
module_file = './savemodel/bitcoin.ckpt'
saver.restore(sess, module_file)
test_predict=[]
for step in range(len(test_x)-1):
prob=sess.run(pred,feed_dict={X:[test_x[step]]})
predict=prob.reshape((-1))
test_predict.extend(predict)
test_y=np.array(test_y)*std[0]+mean[0]
test_predict=np.array(test_predict)*std[0]+mean[0]
#以折线图表示结果
plt.figure()
plt.plot(list(range(len(test_predict))), test_predict, color='b')
plt.plot(list(range(len(test_y))), test_y,  color='r')
plt.title('prediction')
plt.show()

if __name__ == '__main__':
if FLAG=='train':
train_lstm()
elif FLAG=='test':
prediction()
训练的损失曲线(左),预测的波动率曲线(右):





分析:价格单因素预测比较准,说明LSTM的短期预测有效,长期的目前没试过;而波动率预测差异较大,但因为对象是一周波动的量化,这个波动率值本身就小,所以差距也 是在可以理解的范围内。

参考资料:

[1] 比特币的历史价格数据:https://coinmarketcap.com/

[2] Tensorflow实例:利用LSTM预测股票每日最高价(一)

[3] Tensorflow实例:利用LSTM预测股票每日最高价(二)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息