您的位置:首页 > 编程语言

LSTM实现股票预测--pytorch版本【120+行代码】

2018-12-20 20:45 627 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/a19990412/article/details/85139058

简述

网上看到有人用Tensorflow写了的但是没看到有用pytorch写的。
所以我就写了一份。写的过程中没有参照任何TensorFlow版本的(因为我对TensorFlow目前理解有限),所以写得比较简单,看来来似乎也比较容易实现(欢迎各位大佬改进之后,发家致富,带带小弟hhh)。

效果

先简单的看看效果(会有点夸张hhh):

  • 注意,我没有用全部数据!!而是真的用的训练集合来做的,下面的都是真实的…
  • 不过,也没那么夸张,后面有讲解这幅图。

基于:

我以前写的几篇文章。

项目描述

模型假设

我这里认为每天的沪深300的最高价格,是依赖于当天的前n天的沪深300的最高价。
然后用RNN的LSTM模型来估计(捕捉到时序信息)。
让模型学会用前n天的最高价,来判断当天的最高价。

思路很简单,所以模型也很简单~

导入包

import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader

读取数据的函数

  • 其中
    generate_df_affect_by_n_days
    函数,通过一个序列来生成一个矩阵(用于处理时序的数据)。就是把当天的前n天作为参数,当天的数据作为label。
  • readData中的文件名为:
    sh.csv
    也可以是其他的文件大致类似的。自己可以修改。参数n就是之前模型中所说的n。train_end表示的是后面多少个数据作为测试集。
def generate_df_affect_by_n_days(series, n, index=False):
if len(series) <= n:
raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n))
df = pd.DataFrame()
for i in range(n):
df['c%d' % i] = series.tolist()[i:-(n - i)]
df['y'] = series.tolist()[n:]
if index:
df.index = series.index[n:]
return df

def readData(column='high', n=30, all_too=True, index=False, train_end=-300):
df = pd.read_csv("sh.csv", index_col=0)
d
24000
f.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d"), df.index))
df_column = df[column].copy()
df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index)
if all_too:
return df_generate_from_df_column_train, df_column, df.index.tolist()
return df_generate_from_df_column_train

类(RNN和数据读取类)

  • RNN的类很简单,看模型也知道其实就两部分
  • 数据读取类,这个主要为了满足pytorch习惯而设定的(建议大家也要养成这样的习惯)。
class RNN(nn.Module):
def __init__(self, input_size):
super(RNN, self).__init__()
self.rnn = nn.LSTM(
input_size=input_size,
hidden_size=64,
num_layers=1,
batch_first=True
)
self.out = nn.Sequential(
nn.Linear(64, 1)
)

def forward(self, x):
r_out, (h_n, h_c) = self.rnn(x, None)  # None 表示 hidden state 会用全0的 state
out = self.out(r_out)
return out

class TrainSet(Dataset):
def __init__(self, data):
# 定义好 image 的路径
self.data, self.label = data[:, :-1].float(), data[:, -1].float()

def __getitem__(self, index):
return self.data[index], self.label[index]

def __len__(self):
return len(self.data)

超参数

  • n为模型中的n
  • LR是模型的学习率
  • EPOCH是多次循环
  • train_end这个在之前的数据集中有提到。(注意是负数
n = 30
LR = 0.0001
EPOCH = 100
train_end = -500

模型训练

其实前面都是准备工作
这里就正式开始了。

获取数据

  • 训练模型仍然使用minibatch的思路
  • 注意,模型必须要先把数据标准化,不然损失会很难降低下来。
# 数据集建立
df, df_all, df_index = readData('high', n=n, train_end=train_end)

df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')

df_numpy = np.array(df)

df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)

df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)

trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=10, shuffle=True)

训练模型部分

  • 这个部分如果是不想要训练的话(比如已经训练好了模型),替换为
    rnn = torch.load('rnn.pkl')
    记得把原来的注释掉
rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)  # optimize all cnn parameters
loss_func = nn.MSELoss()

for step in range(EPOCH):
for tx, ty in trainloader:
output = rnn(torch.unsqueeze(tx, dim=0))
loss = loss_func(torch.squeeze(output), ty)
optimizer.zero_grad()  # clear gradients for this training step
loss.backward()  # back propagation, compute gradients
optimizer.step()
print(step, loss)
if step % 10:
torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')

画图

generate_data_train = []
generate_data_test = []

test_index = len(df_all) + train_end

df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
y = rnn(x)
if i < test_index:
generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()

画图的结果就是:

  • 这里,我在测试集合上,也是使用真实的当天的前n天的真实数据来作为输入。表面上看起来有点奇怪,但是这是可以理解的。现实生活中是可以做到的。
  • 但是这幅图的话,太密了,下面我会画一张其中的片段的图片。
  • 这是放大之后的:

会发现出现有一定的滞后效应。不过也很正常啊,输入就只有价格…来预测价格… 这个模型的解释力真的不太行…

完整代码

import pandas as pd
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader
def generate_df_affect_by_n_days(series, n, index=False):
if len(series) <= n:
raise Exception("The Length of series is %d, while affect by (n=%d)." % (len(series), n))
df = pd.DataFrame()
for i in range(n):
df['c%d' % i] = series.tolist()[i:-(n - i)]
df['y'] = series.tolist()[n:]
if index:
df.index = series.index[n:]
return df

def readData(column='high', n=30, all_too=True, index=False, train_end=-300):
df = pd.read_csv("sh.csv", index_col=0)
df.index = list(map(lambda x: datetime.datetime.strptime(x, "%Y-%m-%d"), df.index))
df_column = df[column].copy()
df_column_train, df_column_test = df_column[:train_end], df_column[train_end - n:]
df_generate_from_df_column_train = generate_df_affect_by_n_days(df_column_train, n, index=index)
if all_too:
return df_generate_from_df_column_train, df_column, df.index.tolist()
return df_generate_from_df_column_train

class RNN(nn.Module):
def __init__(self, input_size):
super(RNN, self).__init__()
self.rnn = nn.LSTM(
input_size=input_size,
hidden_size=64,
num_layers=1,
batch_first=True
)
self.out = nn.Sequential(
nn.Linear(64, 1)
)

def forward(self, x):
r_out, (h_n, h_c) = self.rnn(x, None)  # None 表示 hidden state 会用全0的 state
out = self.out(r_out)
return out

class TrainSet(Dataset):
def __init__(self, data):
# 定义好 image 的路径
self.data, self.label = data[:, :-1].float(), data[:, -1].float()

def __getitem__(self, index):
return self.data[index], self.label[index]

def __len__(self):
return len(self.data)
n = 30
LR = 0.0001
EPOCH = 100
train_end = -500# 数据集建立
df, df_all, df_index = readData('high', n=n, train_end=train_end)

df_all = np.array(df_all.tolist())
plt.plot(df_index, df_all, label='real-data')

df_numpy = np.array(df)

df_numpy_mean = np.mean(df_numpy)
df_numpy_std = np.std(df_numpy)

df_numpy = (df_numpy - df_numpy_mean) / df_numpy_std
df_tensor = torch.Tensor(df_numpy)

trainset = TrainSet(df_tensor)
trainloader = DataLoader(trainset, batch_size=10, shuffle=True)
# rnn = torch.load('rnn.pkl')

rnn = RNN(n)
optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)  # optimize all cnn parameters
loss_func = nn.MSELoss()

for step in range(EPOCH):
for tx, ty in trainloader:
output = rnn(torch.unsqueeze(tx, dim=0))
loss = loss_func(torch.squeeze(output), ty)
optimizer.zero_grad()  # clear gradients for this training step
loss.backward()  # back propagation, compute gradients
optimizer.step()
print(step, loss)
if step % 10:
torch.save(rnn, 'rnn.pkl')
torch.save(rnn, 'rnn.pkl')#
generate_data_train = []
generate_data_test = []

test_index = len(df_all) + train_end

df_all_normal = (df_all - df_numpy_mean) / df_numpy_std
df_all_normal_tensor = torch.Tensor(df_all_normal)
for i in range(n, len(df_all)):
x = df_all_normal_tensor[i - n:i]
x = torch.unsqueeze(torch.unsqueeze(x, dim=0), dim=0)
y = rnn(x)
if i < test_index:
generate_data_train.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
else:
generate_data_test.append(torch.squeeze(y).detach().numpy() * df_numpy_std + df_numpy_mean)
plt.plot(df_index[n:train_end], generate_data_train, label='generate_train')
plt.plot(df_index[train_end:], generate_data_test, label='generate_test')
plt.legend()
plt.show()plt.cla()
plt.plot(df_index[train_end:-400], df_all[train_end:-400], label='real-data')
plt.plot(df_index[train_end:-400], generate_data_test[:-400], label='generate_test')
plt.legend()
plt.show()
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: