该模型是典型的数据预测模型。实现多参数输入含时序,预测多个结果数据。
输入维度(batch_size,历史数据长度,输入参数数量),这里取15个维度,包含:年、月、日、上证指数、深证指数与目标股票(开盘、最高、最低、收盘)*3。
输出维度(batch_size,预测数据长度,输出参数数量),这里取12个维度,包含:上证指数、深证指数与目标股票(开盘、最高、最低、收盘)*3。
训练流程,将数据整理成 历史数据+正确数据,分批次输入网络,经过编码器,循环解码器,累计loss,最后反向传播更新网络权重,以达到训练的目的。
预测流程,先将历史数据输入编码器,提取特征后,传递到解码器。解码器预测一天数据,把该预测数据作为输入,将上一状态与预测结果继续解码器,如此循环预测后续几天的数据。最后得到下面的预测结果。
预测效果(前段是已知数据,后段是预测走向):
使用涨跌幅度进行预测,下面为数据加载辅助类。按区间读取数据用于训练与预测:
import os
import pandas
import datetime
import random
import numpy as np
import matplotlib.pyplot as plt
import mpl_finance as mpf
plt.rcParams['font.sans-serif']=['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus']=False # 用来正常显示负号
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
class GuPiaoLoader():
"""加载股票文件"""
def __init__(self):
pass
def load_one(self, file_name):
"""加载数据文件,并把价格转换成升降比例"""
print('加载文件', file_name)
df = pandas.read_csv(file_name)
# 增加列
col_name = df.columns.tolist()
col_name.append('开盘%') # 默认值为NaN
col_name.append('最高%') # 默认值为NaN
col_name.append('最低%') # 默认值为NaN
col_name.append('收盘%') # 默认值为NaN
df = df.reindex(columns=col_name)
# 填充NaN值为0
df = df.fillna(value=0.0)
# 第一条记录,用开盘价计算
old_plice = df.loc[0, '开盘']
df.loc[0, '开盘%'] = df.loc[0, '开盘']/old_plice-1
df.loc[0, '最高%'] = df.loc[0, '最高']/old_plice-1
df.loc[0, '最低%'] = df.loc[0, '最低']/old_plice-1
df.loc[0, '收盘%'] = df.loc[0, '收盘']/old_plice-1
for i in range(1, len(df)):
old_plice = df.loc[i-1, '收盘']
df.loc[i, '开盘%'] = df.loc[i, '开盘']/old_plice-1
df.loc[i, '最高%'] = df.loc[i, '最高']/old_plice-1
df.loc[i, '最低%'] = df.loc[i, '最低']/old_plice-1
df.loc[i, '收盘%'] = df.loc[i, '收盘']/old_plice-1
return df
def get_random_data(self, df_sh, df_sz, df_target, history_size, target_size, start_index=None):
"""根据数据窗口获取数据"""
data = []
labels = []
# 日期同步
tmp_df_sh = df_sh.loc[df_sh['日期'] >= df_target.loc[0,'日期'],
['日期', '开盘%', '最高%', '最低%', '收盘%']]
tmp_df_sz = df_sz.loc[df_sz['日期'] >= df_target.loc[0,'日期'],
['日期', '开盘%', '最高%', '最低%', '收盘%']]
tmp_df_target = df_target.loc[:, ['日期', '开盘%', '最高%', '最低%', '收盘%']]
# 随机取一段时间数据
if start_index==None:
start_index = random.randint(history_size, len(tmp_df_sh)-target_size)
tmp_df_sh = tmp_df_sh[start_index-history_size:start_index+target_size]
# 数据归一化
tmp_df_sh.loc[:, '开盘%'] = tmp_df_sh.apply(lambda x: x['开盘%'] * 10, axis=1)
tmp_df_sh.loc[:, '最高%'] = tmp_df_sh.apply(lambda x: x['最高%'] * 10, axis=1)
tmp_df_sh.loc[:, '最低%'] = tmp_df_sh.apply(lambda x: x['最低%'] * 10, axis=1)
tmp_df_sh.loc[:, '收盘%'] = tmp_df_sh.apply(lambda x: x['收盘%'] * 10, axis=1)
tmp_df_sz.loc[:, '开盘%'] = tmp_df_sz.apply(lambda x: x['开盘%'] * 10, axis=1)
tmp_df_sz.loc[:, '最高%'] = tmp_df_sz.apply(lambda x: x['最高%'] * 10, axis=1)
tmp_df_sz.loc[:, '最低%'] = tmp_df_sz.apply(lambda x: x['最低%'] * 10, axis=1)
tmp_df_sz.loc[:, '收盘%'] = tmp_df_sz.apply(lambda x: x['收盘%'] * 10, axis=1)
tmp_df_target.loc[:, '开盘%'] = tmp_df_target.apply(lambda x: x['开盘%'] * 10, axis=1)
tmp_df_target.loc[:, '最高%'] = tmp_df_target.apply(lambda x: x['最高%'] * 10, axis=1)
tmp_df_target.loc[:, '最低%'] = tmp_df_target.apply(lambda x: x['最低%'] * 10, axis=1)
tmp_df_target.loc[:, '收盘%'] = tmp_df_target.apply(lambda x: x['收盘%'] * 10, axis=1)
# 合并数据
tmp_df_merge = pandas.merge(tmp_df_sh, tmp_df_sz, how='left', on='日期', sort=False,
suffixes=('_sh', '_sz'))
tmp_df_merge = pandas.merge(
tmp_df_merge, tmp_df_target, how='left', on='日期', sort=False)
# 删除NaN值
tmp_df_merge = tmp_df_merge.dropna()
# 增加列
col_name = tmp_df_merge.columns.tolist()
col_name.insert(1, '年') # 默认值为NaN
col_name.insert(2, '月') # 默认值为NaN
col_name.insert(3, '日') # 默认值为NaN
tmp_df_merge = tmp_df_merge.reindex(columns=col_name)
# 日期数据归一化
tmp_df_merge.loc[:, '年'] = tmp_df_merge.apply(lambda x: (datetime.datetime.strptime(x['日期'],'%Y/%m/%d').year-2000) / 20, axis=1)
tmp_df_merge.loc[:, '月'] = tmp_df_merge.apply(lambda x: (datetime.datetime.strptime(x['日期'],'%Y/%m/%d').month) / 12, axis=1)
tmp_df_merge.loc[:, '日'] = tmp_df_merge.apply(lambda x: (datetime.datetime.strptime(x['日期'],'%Y/%m/%d').day) / 31, axis=1)
return tmp_df_merge
def get_data_to_train(self, df_sh, df_sz, df_target, batch_size, history_size, target_size, start_index=None):
"""
数据格式化用于训练
batch_size:批次大小
history_size:训练数据大小
target_size:预测数据大小
"""
x = []
y = []
for _ in range(batch_size):
tmp_df = self.get_random_data(df_sh, df_sz, df_target, history_size, target_size, start_index)
tmp_values = tmp_df.values[:,1:]
# print('tmp_values', tmp_values.shape)
x.append(tmp_values[:history_size,:].tolist())
y.append(tmp_values[history_size:history_size+target_size,:].tolist())
x = np.array(x)
y = np.array(y)
return x, y
def get_data_to_predict(self, df_sh, df_sz, df_target, history_size, target_size, start_index=None):
"""
数据格式化用于训练
batch_size:批次大小
history_size:训练数据大小
target_size:预测数据大小
"""
if start_index==None:
start_index = len(df_target)
tmp_df = self.get_random_data(df_sh, df_sz, df_target, history_size, 0, start_index)
# print(tmp_df)
# 排除日期列
tmp_values = tmp_df.values[:,1:]
# print('tmp_values', tmp_values.shape)
x = tmp_values[:history_size,:]
x = np.expand_dims(x, axis=0)
time_step = self.create_time(tmp_df.iloc[history_size-1,:].loc['日期'], target_size)
time_step = np.expand_dims(time_step, axis=0)
return x, time_step
def data_generator(self, df_sh, df_sz, df_target, batch_size, history_size, target_size):
"""循环生成数据"""
while True:
x, y = self.get_data_to_train(df_sh, df_sz, df_target, batch_size, history_size, target_size)
yield x, y
def create_time(self, start_time, target_size):
'''
创建预测时序
target_size:预测数据大小
'''
tmp_start_time = datetime.datetime.strptime(start_time,'%Y/%m/%d')
result = []
for i in range(target_size):
if tmp_start_time.weekday==4:
tmp_start_time = tmp_start_time + datetime.timedelta(days=3)
else:
tmp_start_time = tmp_start_time + datetime.timedelta(days=1)
tmp_year = (tmp_start_time.year - 2000) / 20
tmp_month = tmp_start_time.month / 12
tmp_day = tmp_start_time.day / 31
result.append([tmp_year, tmp_month, tmp_day])
result = np.array(result)
return result
def show_image(self, history_data, target_data=None):
'''
显示K线图
history_data:(None,15)
target_data:(None,15)
'''
all_data = history_data
if target_data is not None:
all_data = np.append(history_data, target_data, axis=0)
show_history_data = pandas.DataFrame({'data':[i for i in range(all_data.shape[0])],
'open':all_data[:,-4],
'high':all_data[:,-3],
'low':all_data[:,-2],
'close':all_data[:,-1]})
now_close = 50
for i in range(len(show_history_data)):
show_history_data.loc[i,'open'] = now_close*(1+show_history_data.loc[i,'open']*0.1)
show_history_data.loc[i,'high'] = now_close*(1+show_history_data.loc[i,'high']*0.1)
show_history_data.loc[i,'low'] = now_close*(1+show_history_data.loc[i,'low']*0.1)
now_close = now_close*(1+show_history_data.loc[i,'close']*0.1)
show_history_data.loc[i,'close'] = now_close
# 创建一个子图
fig, ax = plt.subplots(facecolor=(0.5, 0.5, 0.5))
fig.subplots_adjust(bottom=0.2)
plt.title("股票K线图")
plt.xlabel("时间")
plt.ylabel("股价变化(%)")
all_values = show_history_data.values
if target_data is not None:
mpf.candlestick_ohlc(ax,all_values[:len(history_data)],width=0.5,colorup='r',colordown='g')
mpf.candlestick_ohlc(ax,all_values[len(history_data):],width=0.5,colorup='y',colordown='b')
else:
mpf.candlestick_ohlc(ax,all_values,width=0.5,colorup='r',colordown='g')
plt.show()
数据下载可参考:
数据结构如下的CSV文件:
下面为预测模型代码实现:
1.引入依赖模块
import tensorflow as tf
import os
import sys
import numpy as np
import time
print("TensorFlow version: {}".format(tf.version.VERSION))
print("Eager execution: {}".format(tf.executing_eagerly()))
# 根目录
ROOT_DIR = os.path.abspath("./")
# GuPiaoLoader
sys.path.append(ROOT_DIR)
from DataLoader.gupiao_loader import GuPiaoLoader
2.编码器,对历史记录进行编码,用于提取历史记录特征:
class Encoder(tf.keras.Model):
'''编码器'''
def __init__(self):
super(Encoder, self).__init__()
# GRU
self.gru1 = tf.keras.layers.GRU(128, return_sequences=True, return_state=True, activation=tf.keras.activations.relu, name='feature_gru1')
self.gru2 = tf.keras.layers.GRU(128, return_state=True, activation=tf.keras.activations.relu, name='feature_gru2')
def call(self, input_data):
'''
input_data:批量已知数据(None,history_size,15)
'''
x, gru_state1 = self.gru1(input_data)
x, gru_state2 = self.gru2(x)
return x, gru_state1, gru_state2
3.注意力模块,用于长序列预测的记忆保留:
class BahdanauAttention(tf.keras.Model):
'''注意力模块'''
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units, name='feature_denseW1')
self.W2 = tf.keras.layers.Dense(units, name='feature_denseW2')
self.V = tf.keras.layers.Dense(1, name='feature_denseV')
def call(self, query, values):
'''
query:状态(batch_size,hidden_size)
values:编码器输出,记忆(batch_size,1,hidden_size)
'''
# hidden shape == (batch_size, hidden size)
# hidden_with_time_axis shape == (batch_size, 1, hidden size)
hidden_with_time_axis = tf.expand_dims(query, 1)
# 分数(batch_size, 1, 1),通过上一状态与记忆计算分数,觉得保留多少记忆
score = self.V(tf.nn.tanh(
self.W1(values) + self.W2(hidden_with_time_axis)))
# attention_weights shape == (batch_size, max_length, 1)
# 注意力权重,0-1范围(batch_size, 1, 1)
attention_weights = tf.nn.softmax(score, axis=1)
# context_vector shape after sum == (batch_size, hidden_size)
# 通过注意力权重决定保留多少记忆信息
context_vector = attention_weights * values
# 求和第二维度(batch_size, hidden_size)
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
4.解码器,循环预测序列数据,使用两层GUR:
class Decoder(tf.keras.Model):
'''解码器'''
def __init__(self, class_num):
super(Decoder, self).__init__()
# used for attention
# 注意力模块
self.attention1 = BahdanauAttention(128)
self.attention2 = BahdanauAttention(128)
# GRU
self.gru1 = tf.keras.layers.GRU(128, return_sequences=True, return_state=True, activation=tf.keras.activations.relu, name='feature_gru1')
self.gru2 = tf.keras.layers.GRU(128, return_state=True, activation=tf.keras.activations.relu, name='feature_gru2')
# 输出
self.dense1 = tf.keras.layers.Dense(class_num, name='feature_dense1')
def call(self, input_data, gru_state1, gru_state2, encoder_output):
'''
input_data:单步预测数据(None,1,15)
gru_state1:上一步的状态(None,128)
gru_state2:上一步的状态(None,128)
encoder_output:编码器最后状态,已知数据提取的特征(None,128)
'''
context_vector1, _ = self.attention1(
gru_state1, encoder_output)
context_vector2, _ = self.attention2(
gru_state2, encoder_output)
# print('context_vector', context_vector1.shape, context_vector2.shape)
# tf.print('context_vector', tf.shape(context_vector1), tf.shape(context_vector2))
x = tf.concat([context_vector1, context_vector2, input_data], axis=-1)
x = tf.expand_dims(x, 1)
x, gru_state1 = self.gru1(x, initial_state=gru_state1)
x, gru_state2 = self.gru2(x, initial_state=gru_state2)
x = self.dense1(x)
return x, gru_state1, gru_state2
5.股票预测模型,使用JIT模式进行训练,用JIT或Eager模式进行 预测。JIT模式适合大规模预测,Eager单次预测加载速度快。
模型代码如下:
class GuPiaoModel():
'''股票预测模型'''
def __init__(self, output_num, model_path='./data/gupiao_model'):
# 预测数据维度
self.output_num = output_num
# 加载模型路径
if not os.path.exists(model_path):
os.makedirs(model_path)
self.model_path = model_path
# 建立模型
self.build_model()
# 加载模型
self.load_model()
def build_model(self):
'''建立模型'''
self.encoder_model = Encoder()
self.decoder_model = Decoder(self.output_num)
# 优化器
self.optimizer = tf.keras.optimizers.RMSprop(clipvalue=1.0, lr=0.001)
# 损失函数
self.loss_object = tf.keras.losses.MeanAbsoluteError()
# 保存模型
self.checkpoint = tf.train.Checkpoint(optimizer=self.optimizer,
encoder=self.encoder_model,
decoder=self.decoder_model)
self.checkpoint_manager = tf.train.CheckpointManager(self.checkpoint, self.model_path, max_to_keep=3)
@tf.function(input_signature=(
tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),
tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),
))
def train_step(self, input_data, target_data):
'''
训练
input_data:(batch_size, history_size, 15)
target_data:(batch_size, target_size, 15)
'''
print('Tracing with train_step', type(input_data), type(target_data))
print('Tracing with train_step', input_data.shape, target_data.shape)
loss = 0.0
with tf.GradientTape() as tape:
# 编码
# encoder_output(history_size,128)
# encoder_state1(history_size,128)
# encoder_state2(history_size,128)
encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)
decoder_state1 = encoder_state1
decoder_state2 = encoder_state2
decoder_input = input_data[:,-1,:]
# 解码
for target_index in tf.range(tf.shape(target_data)[1]):
# 正确值
true_target = target_data[:,target_index,3:]
# 解码
decoder_output, decoder_state1, decoder_state2 = self.decoder_model(
decoder_input, decoder_state1, decoder_state2, encoder_output)
# 计算损失
batch_loss = self.loss_object(y_true=true_target, y_pred=decoder_output)
loss += batch_loss
decoder_input = target_data[:,target_index,:]
total_loss = (loss / float(tf.shape(target_data)[1]))
trainable_variables = self.encoder_model.trainable_variables + self.decoder_model.trainable_variables
gradients = tape.gradient(loss, trainable_variables )
self.optimizer.apply_gradients(zip(gradients, trainable_variables))
return loss, total_loss
def fit_generator(self, generator, steps_per_epoch, epochs, initial_epoch=1, auto_save=False):
'''训练'''
for epoch in range(initial_epoch, epochs+1):
start = time.process_time()
epoch_loss = 0
for steps in range(1, steps_per_epoch+1):
x, y = next(generator)
# print('generator', x.shape, y.shape)
loss, total_loss = self.train_step(x, y)
epoch_loss += total_loss
print('\rsteps:%d/%d, epochs:%d/%d, loss:%0.4f, total_loss:%0.4f'
% (steps, steps_per_epoch, epoch, epochs, loss, total_loss), end='')
end = time.process_time()
print('\rsteps:%d/%d, epochs:%d/%d, %0.4f S, loss:%0.4f, total_loss:%0.4f, epoch_loss:%0.4f'
% (steps, steps_per_epoch, epoch, epochs, (end - start), loss, total_loss, epoch_loss))
if auto_save:
self.save_model()
@tf.function(input_signature=(
tf.TensorSpec(shape=(None, None, 15), dtype=tf.float32),
tf.TensorSpec(shape=(None, None, 3), dtype=tf.float32),
tf.TensorSpec(shape=None, dtype=tf.int32),
))
def predict_jit(self, input_data, time_step, output_size):
'''
预测(编译模式)
input_data:(1, history_size,15)
time_step:预测时间序列,(1,target_size,3)
output_size:预测数量
'''
predict_data = tf.TensorArray(dtype=tf.float32, size=output_size, dynamic_size=True)
# 编码
# encoder_output(history_size,128)
# encoder_state1(history_size,128)
# encoder_state2(history_size,128)
encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)
decoder_state1 = encoder_state1
decoder_state2 = encoder_state2
decoder_input = input_data[:,-1,:]
# 解码
for i in tf.range(output_size):
# 解码
decoder_output, decoder_state1, decoder_state2 = self.decoder_model(
decoder_input, decoder_state1, decoder_state2, encoder_output)
decoder_input = tf.concat([time_step[:,i,:], decoder_output], axis=1)
# 记录预测值
predict_data = predict_data.write(i, decoder_input)
# 交换维度
predict_data = predict_data.stack()
predict_data = tf.transpose(predict_data, perm=[1, 0, 2])
return predict_data
def predict_eager(self, input_data, time_step, output_size):
'''
预测(即时模式)))
input_data:(1, history_size,15)
time_step:预测时间序列,(1,target_size,3)
output_size:预测数量
'''
predict_data = []
input_data = tf.constant(input_data, dtype=tf.float32)
# 编码
# encoder_output(history_size,128)
# encoder_state1(history_size,128)
# encoder_state2(history_size,128)
encoder_output, encoder_state1, encoder_state2 = self.encoder_model(input_data)
decoder_state1 = encoder_state1
decoder_state2 = encoder_state2
decoder_input = input_data[:,-1,:]
# 解码
for i in range(output_size):
# 解码
decoder_output, decoder_state1, decoder_state2 = self.decoder_model(
decoder_input, decoder_state1, decoder_state2, encoder_output)
decoder_input = tf.concat([time_step[:,i,:], decoder_output], axis=1)
# 记录预测值
predict_data.append(decoder_input.numpy())
predict_data = np.array(predict_data)
# 交换维度
predict_data = predict_data.swapaxes(0,1)
return predict_data
def save_model(self):
'''保存模型'''
save_path = self.checkpoint_manager.save()
print('保存模型 {}'.format(save_path))
def load_model(self):
'''加载模型'''
self.checkpoint.restore(self.checkpoint_manager.latest_checkpoint)
if self.checkpoint_manager.latest_checkpoint:
print('加载模型 {}'.format(self.checkpoint_manager.latest_checkpoint))
6.训练及预测代码:
def main():
# 输入数据维度,(年 月 日+(开盘 最高 最低 收盘)*3)
input_num = 15
# 预测数据维度
output_num = 12
batch_size = 20
# 历史数据长度
history_size = 30
# 预测数据长度
target_size = 5
# 创建模型
print('创建模型')
gupiao_model = GuPiaoModel(output_num)
# 加载数据
print('加载数据')
gupiao_loader = GuPiaoLoader()
df_sh = gupiao_loader.load_one('./data/gupiao_data/999999.SH.csv')
df_sz = gupiao_loader.load_one('./data/gupiao_data/399001.SZ.csv')
df_target = gupiao_loader.load_one('./data/gupiao_data/XXXXXX.SH.csv')
print('训练前预测')
x, time_step = gupiao_loader.get_data_to_predict(df_sh, df_sz, df_target, history_size, target_size)
print('x', x.shape, 'time_step', time_step.shape)
y = gupiao_model.predict_jit(x, time_step, target_size)
print('y', y.shape)
# 显示预测值
gupiao_loader.show_image(x[0,:,:], y[0,:,:])
# 开始训练
print('开始训练')
gupiao_model.fit_generator(
gupiao_loader.data_generator(df_sh, df_sz, df_target, batch_size, history_size, target_size),
steps_per_epoch=int(len(df_target)/2),
epochs=20, auto_save=True)
# 预测
print('预测')
y = gupiao_model.predict_jit(x, time_step, target_size)
# 显示预测值
gupiao_loader.show_image(x[0,:,:], y[0,:,:])
if __name__ == '__main__':
main()
转载请标明出处,谢谢!!!