大家做交易都很清楚一个点:成交量的大幅涨跌必定会带来价格的大幅涨跌,而对于A股来说,游资又是市场上的弄潮儿,怎么赶上游资的脚步喝口汤一直笔者的追求。论坛里面有很多大佬写了很多相关主题的文章,类似于分析龙虎榜等。而我则想通过分析成交量与股价的关系来构建因子,进一步看看能不能窥探到游资的动向。但是目前没有找到可以白嫖的早盘历史数据(早盘数据是指早盘开始竞价的时间9:15到早上10点的数据),当然这个时间段时笔者自己设定的,大家也可以根据代码进行调整获取符合自己要求的数据。
笔者在寻找盘前数据的时间,发现akshare提供了一个接口: stock_zh_a_hist_pre_min_em,可以获取每天从早上9点15分到下午3:00的分钟数据,具体信息大家可以参考akshare官方文档:https://www.akshare.xyz/data/stock/stock.html#id9里面的盘前数据介绍。在知道这个信息之后,笔者就想到能不能每天定时获取这个数据,然后需要用的时候自动拼接起来呢?
第一部分:获取所有个股的早盘数据
import os
import time
import pandas as pd
import akshare as ak
import warnings
import datetime as datetime
warnings.filterwarnings("ignore")
from apscheduler.schedulers.blocking import BlockingScheduler
pd.set_option('max_rows', None) # 显示最多行数
pd.set_option('max_columns', None) # 显示最多列数
pd.set_option('expand_frame_repr', False) # 当列太多时显示不清楚
pd.set_option('display.unicode.east_asian_width', True) # 设置输出右对齐
# 参数设置
start_date = datetime.datetime.now()
#利用东财实时行情数据接口获取所有股票代码接口
df = ak.stock_zh_a_spot_em()
code_list = df[['序号', '代码', '名称']].values
# 创建文件存储路径
def create_path(ak_code):
date_str = str(pd.to_datetime(start_date).date()) # 日期转换成字符串
path = os.path.join(".", "all_stock_candle", "stock", date_str)
# 保存数据
if not os.path.exists(path):
# os.mkdir(path) # 可以建一级文件夹
os.makedirs(path) # 可以建多级文件夹
file_name = ak_code + ".csv"
return os.path.join(path, file_name)
def do_load_data():
# 获取沪深京股票的1分钟数据
global df
for i, ak_code, ak_name in code_list:
print(f'已更新第{i}只股票:{ak_name}')
for i in range(5):
try:
# 利用东方财富接口获取一分钟股票数据
df = ak.stock_zh_a_hist_pre_min_em(symbol=ak_code)
if df.empty:
continue
df['股票代码'] = ak_code
df['股票名称'] = ak_name
df.rename(columns={'时间': '交易日期', '开盘': '开盘价', '最高': '最高价',
'最低': '最低价', '收盘': '收盘价', '成交量': '成交量'}, inplace=True)
df = df[['交易日期', '股票代码', '股票名称', '开盘价', '最高价', '最低价', '收盘价', '成交量', '成交额']]
df.reset_index(drop=True, inplace=True)
break
except Exception as e:
print(e)
path = create_path(ak_code)
df.to_csv(path, index=False, mode='w', encoding='gbk')
time.sleep(2)
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(do_load_data, 'cron', hour=16, minute=0) # 每天的16时执行任务。
scheduler.start()
第二步 将每个日期命名文件内数据进行拼接
import pandas as pd
import os
import glob
pd.set_option(‘max_rows’, None) # 显示最多行数
pd.set_option(‘max_columns’, None) # 显示最多列数
pd.set_option(‘expand_frame_repr’, False) # 当列太多时显示不清楚
pd.set_option(‘display.unicode.east_asian_width’, True) # 设置输出右对齐
_ = os.path.abspath(‘.’) # 返回本层目录的绝对路径
path_list = glob.glob(_ + ‘/all_stock_candle/stock//.csv’) # 返回所有的csv文档路径
#print(path_list)
filename_list = []
for path in path_list:
filename = os.path.split(path)[1]
filename_list.append(filename)
filename_list = set(filename_list) # 创建一个无序不重复的元素集
for filename in filename_list:
code = filename.split(‘.’)[0]
code_path_list = glob.glob(_ + f’/all_stock_candle/stock//{code}.csv’)
all_data = pd.DataFrame()
for code_path in code_path_list:
# print(code_path)
data = pd.read_csv(code_path,encoding='gbk')
all_data = all_data.append(data, ignore_index=True)
# 排序并重新索引
all_data.sort_values(by='交易日期', inplace=False)
all_data.reset_index(drop=True, inplace=True)
save_path = _ + f'/all_stock_candle/stock/merge_stock_data'
if not os.path.exists(save_path):
os.makedirs(save_path) # 可以建多级文件夹
file_name = code + ".csv"
data_path = os.path.join(save_path, file_name)
all_data.to_csv(data_path)
# print(all_data)