【vn.py学习笔记(七)】vn.py rqdata封装、datasbase等数据服务源码阅读
写在前面
笔者刚接触量化投资,对量化投资挺感兴趣,在闲暇时间进行量化投资的学习,只能进行少量资金进行量化实践。目前在进行基于vnpy的A股市场的量化策略学习,主要尝试攻克的技术难点在:A股市场日线数据的免费获取维护、自动下单交易、全市场选股程序、选股策略的回测程序、基于机器学习的股票趋势预测。
现阶段的计划是阅读vn.py的源码,学习vn.py架构机制,在学习的过程中,会以分享的形式记录,以加深对vn.py的理解,有不对的地方欢迎大家批评指正。
欢迎志同道合的朋友加我QQ(1163962054)交流。
分享的github仓库:https://github.com/PanAndy/quant_share。
这次来看一看vn.py的数据服务是怎么写的,与数据服务相关的主要是对rqdata的封装和database模块。参考rqdata的封装,我实现了一个对tushare获取股票日线数据的封装,可以参考之前的文章《基于tushare的A股市场行情维护程序》。下一篇将会来学习vn.py 核心trader的最后一部分代码vnpy/trader/utility.py的内容。
1 RqdataClient架构
对rqdata数据服务的封装位于vnpy/trader/rqdata.py内,主要是通过RqdataClient类来实现的。RqdataClient主要包含四个变量username、password、inited、symbols,四个函数init、to_rq_symbol、query_history、query_tick_history,其中主要以query_history和query_tick_history向外提供服务。query_history和query_tick_history的基本逻辑就是构造查询内容,向rqdata查询,然后将返回的df拼接成list[bardata],再返回给调用者。
1.1 query_history
def query_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""
Query history bar data from RQData.
"""
if self.symbols is None:
return None
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None
rq_interval = INTERVAL_VT2RQ.get(interval)
if not rq_interval:
return None
# For adjust timestamp from bar close point (RQData) to open point (VN Trader)
adjustment = INTERVAL_ADJUSTMENT_MAP[interval]
# For querying night trading period data
end += timedelta(1)
# Only query open interest for futures contract
fields = ["open", "high", "low", "close", "volume"]
if not symbol.isdigit():
fields.append("open_interest")
df = rqdata_get_price(
rq_symbol,
frequency=rq_interval,
fields=fields,
start_date=start,
end_date=end,
adjust_type="none"
)
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
dt = row.name.to_pydatetime() - adjustment
dt = CHINA_TZ.localize(dt)
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=dt,
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("open_interest", 0),
gateway_name="RQ"
)
data.append(bar)
return data
1.2 query_tick_history
def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
"""
Query history bar data from RQData.
"""
if self.symbols is None:
return None
symbol = req.symbol
exchange = req.exchange
start = req.start
end = req.end
rq_symbol = self.to_rq_symbol(symbol, exchange)
if rq_symbol not in self.symbols:
return None
# For querying night trading period data
end += timedelta(1)
# Only query open interest for futures contract
fields = [
"open",
"high",
"low",
"last",
"prev_close",
"volume",
"limit_up",
"limit_down",
"b1",
"b2",
"b3",
"b4",
"b5",
"a1",
"a2",
"a3",
"a4",
"a5",
"b1_v",
"b2_v",
"b3_v",
"b4_v",
"b5_v",
"a1_v",
"a2_v",
"a3_v",
"a4_v",
"a5_v",
]
if not symbol.isdigit():
fields.append("open_interest")
df = rqdata_get_price(
rq_symbol,
frequency="tick",
fields=fields,
start_date=start,
end_date=end,
adjust_type="none"
)
data: List[TickData] = []
if df is not None:
for ix, row in df.iterrows():
dt = row.name.to_pydatetime()
dt = CHINA_TZ.localize(dt)
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=dt,
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
pre_close=row["prev_close"],
last_price=row["last"],
volume=row["volume"],
open_interest=row.get("open_interest", 0),
limit_up=row["limit_up"],
limit_down=row["limit_down"],
bid_price_1=row["b1"],
bid_price_2=row["b2"],
bid_price_3=row["b3"],
bid_price_4=row["b4"],
bid_price_5=row["b5"],
ask_price_1=row["a1"],
ask_price_2=row["a2"],
ask_price_3=row["a3"],
ask_price_4=row["a4"],
ask_price_5=row["a5"],
bid_volume_1=row["b1_v"],
bid_volume_2=row["b2_v"],
bid_volume_3=row["b3_v"],
bid_volume_4=row["b4_v"],
bid_volume_5=row["b5_v"],
ask_volume_1=row["a1_v"],
ask_volume_2=row["a2_v"],
ask_volume_3=row["a3_v"],
ask_volume_4=row["a4_v"],
ask_volume_5=row["a5_v"],
gateway_name="RQ"
)
data.append(tick)
return data
2 Database架构
vn.py最新版本(2.1.9)对数据库管理端进行了重构(vnpy.database),采用类似gateway(底层接口)和app(上层应用)的设计模式。
- 在vnpy.trader.database中,定义数据库管理端的通用接口,包括:抽象模板类BaseDatabase、数据库时区常量DB_TZ、时区转换函数convert_tz、以及K线数据整体概况BarOverview类,用于大幅提高DataManager组件的数据库概况查询速度。
- 在vnpy.database模块下,继承BaseDatabase实现具体的数据库管理端,包括:
- SQL类:SQLite(sqlite),轻量级单文件数据库,无需安装和配置数据服务程序,vn.py的默认选项,适合入门新手用户;MySQL(mysql):世界最流行的开源关系型数据库,文档资料极为丰富,且可替换其他高NewSQL兼容实现(如TiDB);PostgreSQL(postgresql),特性更为丰富的开源关系型数据库,支持通过扩展插件来新增功能,只推荐熟手使用。
- NoSQL类:MongoDB(mongodb),基于分布式文件储存(bson格式)的非关系型数据库,内置的热数据内存缓存实现更快读写速度;InfluxDB(influxdb),针对时序数据专门设计的非关系型数据库,列式数据储存提供极高的读写效率和外围分析应用。
Database的实现架构如下图所示,具体实现的代码就不往下展开了,一般不需要修改。