基于DolphinDB的因子计算最佳实践(上)

因子挖掘是量化交易的基础。除传统的基本面因子外,从中高频行情数据中挖掘有价值的因子,并进一步建模和回测以构建交易系统,是一个量化团队的必经之路。金融或者量化金融是一个高度市场化、多方机构高度博弈的领域。因子的有效时间会随着博弈程度的加剧而缩短,如何使用更加高效的工具和流程,更快的找到新的有效的因子,是每一个交易团队必须面对的问题。

近年来,DolphinDB 越来越成为国内乃至国际上大量基金(私募和公募)、资管机构、券商自营团队进行因子挖掘的利器。基于大量客户的反馈,我们特撰写此白皮书,总结使用 DolphinDB 进行因子挖掘的最佳实践。

1. 概述

交易团队用于因子挖掘的常见技术栈有几个大的类别:

  • 使用 python、matlab 等数据分析工具
  • 委托第三方开发有图形界面的因子挖掘工具
  • 使用 java、c++ 等编程语言自行开发挖掘工具
  • 在 DolphinDB 等专业工具上进行二次开发

我们暂且不讨论各个技术栈的优缺点。但不管使用何种技术栈,都必须解决以下几个问题:

  • 能处理不同频率不同规模的数据集
  • 能计算不同风格的因子
  • 能处理因子数量不断增长的问题
  • 能高效的存取原始数据和因子数据
  • 能提升因子挖掘的开发效率
  • 能提升因子计算的运行效率(高吞吐,低延时)
  • 能解决研究的因子用于生产(实盘交易)的问题
  • 能解决多个交易员(研究员)或交易团队一起使用时的各种工程问题,如代码管理、单元测试、权限管理、大规模计算等

DolphinDB 作为分布式计算、实时流计算及分布式存储一体化的高性能时序数据库,非常适合因子的存储、计算、建模、回测和实盘交易。通过部署 DolphinDB 单机或集群环境,用户可以快速地处理 GB 级别甚至 PB 级别的海量数据集,日级、分钟级、快照和逐笔委托数据均能高效响应。

DolphinDB 内置了多范式的编程语言(函数式,命令式,向量式、SQL式),可以帮助研发人员高效开发不同风格的因子。此外,DolphinDB 还提供了丰富且性能高效的函数库(超1400个内置函数),尤其是窗口处理方面经过优化的内置算子,大大缩短了因子计算的延时。

DolphinDB 自带的数据回放和流式增量计算引擎可以方便地解决因子挖掘中研发和生产一体化的问题。DolhpinDB 的分布式存储和计算框架,天生便于解决工程中的可靠性、扩展性等问题。

本文基于国内 A 股市场各个频率的数据来演示 DolphinDB 计算和规划因子库存储的方案。根据批量因子计算、实时因子计算、多因子建模、因子库存储规划、因子计算工程化等各个场景的实操演练,以及针对不同方案的对比分析,本文总结出了在 DolphinDB 中进行因子计算的最佳实践。

2. 测试数据集

本文的因子计算基于三类国内 A 股行情数据集:逐笔数据、快照数据和 K 线数据(分钟 K 线和日 K 线)。快照数据以两种形式存储:(1)各挡数据分别存储为一列;(2)用 array vector 将所有档位的数据存储为一列。

2.1 逐笔成交数据

逐笔成交是交易所公布买卖双方具体成交的每一笔数据,每3秒发布一次,每次包含这3秒内的所有成交记录。每一笔成交撮合,都由买方和卖方的一笔具体委托组成。上述数据样例采用字段 BuyNo 和 SellNo 标注买卖双方的委托单号,其它关键字段分别为: SecurityID(标的物代码),TradeTime(成交时刻),TradePrice(成交价格),TradeQty(本笔成交量)和 TradeAmount(本笔成交金额)。

每个交易日的原始数据量在 8 GB 上下。根据上表的分区机制进行建库建表,点击查看对应脚本:逐笔成交数据建库建表完整代码

2.2 快照数据

股票交易所每3秒发布一次,每次涵盖这3秒结束时的日内累计成交量(TotalVolumeTrade),日内累计成交金额(TotalValueTrade),3秒终了时的盘口买卖双方挂单(买方为 Bid,卖方在有些数据源字段为 Offer,在有些数据源字段为 Ask,其余字段以此类推:BidPrice 为买方各档价格,OfferPrice 为卖方各档价格,OrderQty 为买卖双方各档的委托单总量, Orders 为买卖双方委托单数),3秒终了时的最近一笔成交价格(LastPx),全天开盘价(OpenPx),日内截止当下最高价(HighPx),日内截止当下最低价(LowPx)等各字段。其他和逐笔成交一致的字段不再赘述,涵义一致,详情可参见交易所数据说明字典。

每个交易日的原始数据量约在 10G 左右。

在 DolphinDB 2.0版本的 TSDB 存储引擎中,支持 array vector 的存储机制,即可以允许数据表中一个 cell 存储一个向量。在本白皮书的案例中,后面文章会详细介绍 array vector 存储方案和普通存储方案的区别。快照数据的买10档或卖10档在本例中作为一个 vector 存入单个 cell 中,其他各字段和普通快照数据表都相同。

两种存储模式的建库建表可以参考Snapshot普通及arrayVector形式建库和建表完整代码

快照数据的 array_vector 存储形式:

2.3 分钟K数据

包含每只股票,每分钟的开盘价、最高价、最低价、收盘价,四个价格字段,同时记录本分钟的成交量和成交金额。另外,数据 K 线可以依据基本字段计算衍生字段,比如:k 线均价(vwap 价格)。k 线数据是由逐笔成交数据聚合产生,具体代码可以参考第三章3.3.4基于快照数据的分钟聚合

日 K 数据,存储形式和字段跟分钟 k 线一致,可以由分钟 k 线或高频数据聚合产生,这里不作赘述。

日 k 数据,分钟数据的建库建表可以参考:k 线数据建库建表完整代码

3. 投研阶段的因子计算

在投研阶段,会通过历史数据批量计算生成因子。通常,推荐研究员将每一种因子的计算都封装成自定义函数。根据因子类型和使用者习惯的不同,DolphinDB 提供了面板和 SQL 两种计算方式。

在面板计算中,自定义函数的参数一般为向量,矩阵或表,输出一般为向量,矩阵或表;在 SQL 模式中,自定义函数的参数一般为向量(列),输出一般为向量。因子函数的粒度尽可能小,只包含计算单个因子的业务逻辑,也不用考虑并行计算加速等问题。这样做的优点包括:(1)易于实现流批一体,(2)便于团队的因子代码提交和管理,(3)方便用统一的框架运行因子计算作业。

3.1 面板数据模式

面板数据(panel data)是以时间为索引,标的为列,指标作为内容的一种数据载体,它非常适用于以标的集合为单位的指标计算,将数据以面板作为载体,可以大大简化脚本的复杂度,通常最后的计算表达式可以从原始的数学公式中一对一的翻译过来。除此之外,可以充分利用DolphinDB矩阵计算的高效能。

在因子计算中,面板数据通常可以通过panel函数,或者exec搭配pivot by得到,具体样例如下表:每一行是一个时间点,每一列是一个股票。

                         000001     000002  000003  000004  ...
                        --------- ---------- ------- ------ ---
2020.01.02T09:29:00.000|3066.336 3212.982  257.523 2400.042 ...
2020.01.02T09:30:00.000|3070.247  3217.087 258.696 2402.221 ...
2020.01.02T09:31:00.000|3070.381 3217.170  259.066 2402.029 ...
复制代码

在面板数据上,由于是以时间为索引,标的为列,因子可以方便地在截面上做各类运算。DolphinDB包含row系列函数以及各类滑动窗口函数,在下面两个因子计算例子中,原本复杂的计算逻辑,在面板数据中,可以用一行代码轻松实现。

  • Alpha 1 因子计算中,下例使用了rowRank函数,可以在面板数据中的每一个时间截面对各标的进行排名;iif条件运算,可以在标的向量层面直接筛选及计算;mimax及mstd等滑动窗口函数也是在标的层面垂直计算的。因此,在面板计算中合理应用 DolphinDB 的内置函数,可以从不同维度智慧计算。

    //alpha 1 //Alpha#001公式:rank(Ts_ArgMax(SignedPower((returns<0?stddev(returns,20):close), 2), 5))-0.5

    @state def alpha1TS(close){ return mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20),close), 2.0), 5) }

    def alpha1Panel(close){ return rowRank(X=alpha1TS(close), percent=true) - 0.5 }

    input = exec close from loadTable("dfs://k_minute","k_minute") where date(tradetime) between 2020.01.01 : 2020.01.31 pivot by tradetime, securityid res = alpha1Panel(input)

  • Alpha 98 因子计算中,同时使用了三个面板数据,分别是vwap, open和vol。不仅各矩阵内部运用了rowRank函数横向截面运算以及m系列垂直滑动窗口计算,矩阵之间也进行了二元运算。用一行代码解决了多维度的复杂的嵌套计算逻辑。

    //alpha 98 //Alpha #98计算公式: (rank(decay_linear(correlation(vwap, sum(adv5, 26.4719), 4.58418), 7.18088)) - rank(decay_linear(Ts_Rank(Ts_ArgMin(correlation(rank(open), rank(adv15), 20.8187), 8.62571), 6.95668), 8.07206)))

    def prepareDataForDDBPanel(raw_data, start_time, end_time){ t = select tradetime,securityid, vwap,vol,open from raw_data where date(tradetime) between start_time : end_time return dict(vwapopen`vol, panel(t.tradetime, t.securityid, [t.vwap, t.open, t.vol])) }

    @state def alpha98Panel(vwap, open, vol){ return rowRank(X = mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7),percent=true) - rowRank(X=mavg(mrank(9 - mimin(mcorr(rowRank(X=open,percent=true), rowRank(X=mavg(vol, 15),percent=true), 21), 9), true, 7), 1..8),percent=true) }

    raw_data = loadTable("dfs://k_minute","k_day") start_time = 2020.01.01 end_time = 2020.12.31 input = prepareDataForDDBPanel(raw_data, start_time, end_time) timer alpha98DDBPanel = alpha98Panel(input.vwap, input.open, input.vol)

基于面板数据的因子计算,耗时主要在面板数据准备和因子计算两个阶段。在很多场景下,面板数据准备的耗时可能超过因子计算本身。为解决这个问题,DolphinDB的TSDB引擎提供了宽表存储,即把面板数据直接存储在数据库表中(面板中每一个列存储为表中的每一个列),这样通过SQL查询可以直接获取面板数据,而不需要通过转置行列来获取,从而大大缩短准备面板数据的时间。在本文的第5章中,我们有详细的宽表和竖表存储性能的对比。

3.2 SQL模式

DolphinDB在存储和计算框架上都是基于列式结构,表中的一个列可以直接作为一个向量化函数的输入参数。因此如果一个因子的计算逻辑只涉及股票自身的时间序列数据,不涉及多个股票横截面上的信息,可以直接在SQL中按股票分组,然后在select中调用因子函数计算每个股票在一段时间内的因子值。如果数据在数据库中本身是按股票分区存储的,那么可以非常高效地实现数据库内并行计算。

def sum_diff(x, y){
    return (x-y)\(x+y)
}

@state
def factorDoubleEMA(price){
    ema_2 = ema(price, 2)
    ema_4 = ema(price, 4)
    sum_diff_1000 = 1000 * sum_diff(ema_2, ema_4)
    return ema(sum_diff_1000, 2) - ema(sum_diff_1000, 3)
}

res = select tradetime, securityid, `doubleEMA as factorname, factorDoubleEMA(close) as val from loadTable("dfs://k_minute","k_minute") where  tradetime between 2020.01.01 : 2020.01.31 context by securityid
复制代码

在上面的例子中,我们定义了一个因子函数 factorDoubleEMA,只需要用到股票的价格序列信息。我们在 SQL 中通过 context by 子句按股票代码分组,然后调用factorDoubleEMA函数,计算每个股票的因子序列。值得注意的是,context by 是 DolphinDB SQL 对 group by 的扩展,是 DolphinDB 特有的 SQL 语句。group by 只适用于聚合计算,也就是说输入长度为n,输出长度是1。context by 适用于向量计算,输入长度是n,输出长度也是n。另外因子函数 factorDOubleEMA 除了可以接受一个向量作为输入,也可以接受一个面板数据作为输入。这也是我们前面强调的,因子函数的粒度尽可能细,这样可以应用于很多场景。

时间序列的因子函数非常普遍,talib 中的所有技术分析指标都属于此类函数,因此都可以使用上述SQL方式或面板数据模式来调用。但是3.1中提到的 alpha1 和 alpha98 等因子,涉及到时间序列和横截面两个维度的计算,我们称之为截面因子,无法将因子逻辑封装在一个自定义函数中,然后在一个 SQL 语句中被调用。通常面对截面因子,我们建议将表作为自定义因子函数的入参,内部用 SQL 进行操作,函数最后返回一个表。

//alpha1
def alpha1SQL(t){
	res = select tradetime, securityid, mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20), close), 2.0), 5) as val from t by securityid
	return select tradetime, securityid, rank(val, percent=true) - 0.5 as val from res context by tradetime
}
input = select tradetime,securityid, close from loadTable("dfs://k_day_level","k_day") where tradetime between 2010.01.01 : 2010.12.31
alpha1DDBSql = alpha1SQL(input)

//alpha98
def alpha98SQL(mutable t){
	update t set adv5 = mavg(vol, 5), adv15 = mavg(vol, 15) context by securityid
	update t set rank_open = rank(X = open,percent=true), rank_adv15 =rank(X=adv15,percent=true) context by date(tradetime)
	update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by securityid
	return select tradetime,securityid, `alpha98 as factorname, rank(X =decay7,percent=true)-rank(X =decay8,percent=true) as val from t context by date(tradetime)
}
input = select tradetime,securityid, vwap,vol,open from  loadTable("dfs://k_day_level","k_day") where tradetime between 2010.01.01 : 2010.12.31
alpha98DDBSql = alpha98SQL(input)
复制代码

3.3 不同频率的因子开发举例

不同频率数据的因子,有着不同的特点。本章节将分别举例分钟频、日频、快照、逐笔数据的特点因子,阐述不同频率数据计算因子的最佳实践。

3.3.1 分钟级和日级数据

日级数据的计算,通常是涉及多个截面的复杂计算,在3.1 面板数据模式中已展现。对于稍简单的计算,则与分钟级数据的因子相像。

针对分钟级数据,下面的例子是日内收益率偏度的因子计算,对于这类只涉及表内字段的计算,通常使用 SQL 模式,配合 group by 语句将计算分组:

defg dayReturnSkew(close){
	return skew(ratios(close))	
}

minReturn = select `dayReturnSkew as factorname, dayReturnSkew(close) as val from loadTable("dfs://k_minute_level", "k_minute") where date(tradetime) between 2020.01.02 : 2020.01.31 group by date(tradetime) as tradetime, securityid

#output
tradetime  securityid factorname    val               
---------- ---------- ------------- -------
2020.01.02 000019     dayReturnSkew 11.8328
2020.01.02 000048     dayReturnSkew 11.0544
2020.01.02 000050     dayReturnSkew 10.6186
复制代码

3.3.2 基于快照数据的有状态因子计算

有状态的因子,意为因子的计算需要基于之前的计算结果,如一般的滑动窗口计算,聚合计算等,都是有状态的因子计算。

下例flow这个自定义函数中,参数为四个列字段,运用 mavg 滑动平均函数以及 iif 条件运算函数,可以直接在SQL中得到因子结果:

@state
def flow(buy_vol, sell_vol, askPrice1, bidPrice1){
        buy_vol_ma = round(mavg(buy_vol, 5*60), 5)
        sell_vol_ma = round(mavg(sell_vol, 5*60), 5)
        buy_prop = iif(abs(buy_vol_ma+sell_vol_ma) < 0, 0.5 , buy_vol_ma/ (buy_vol_ma+sell_vol_ma))
        spd = askPrice1 - bidPrice1
        spd = iif(spd < 0, 0, spd)
        spd_ma = round(mavg(spd, 5*60), 5)
        return iif(spd_ma == 0, 0, buy_prop / spd_ma)
}

res_flow = select TradeTime, SecurityID, `flow as factorname, flow(BidOrderQty[1],OfferOrderQty[1], OfferPrice[1], BidPrice[1]) as val from loadTable("dfs://LEVEL2_Snapshot_ArrayVector","Snap") where date(TradeTime) <= 2020.01.30 and date(TradeTime) >= 2020.01.01 context by SecurityID

# output sample
TradeTime               SecurityID factorname val              
----------------------- ---------- ---------- -----------------
2020.01.22T14:46:27.000 110065     flow       3.7587
2020.01.22T14:46:30.000 110065     flow       3.7515
2020.01.22T14:46:33.000 110065     flow       3.7443
...
复制代码

3.3.3 快照数据的多档赋权无状态因子计算

计算Level 2的多档快照数据,传统的方式是将多档量价数据存储成为多个列, 再将多档挂单或者报价用 matrix 转换与权重做计算。 更推荐的做法是,将多档数据存储为 array vector,仍旧可以用原来的自定义函数,但是资源消耗包括效率都有提升。 下面的例子是计算多档报价的权重偏度因子,使用 array vector 后计算时间从4秒缩短到2秒。

def mathWghtCovar(x, y, w){
	v = (x - rowWavg(x, w)) * (y - rowWavg(y, w))
	return rowWavg(v, w)
}

@state
def mathWghtSkew(x, w){
	x_var = mathWghtCovar(x, x, w)
	x_std = sqrt(x_var)
	x_1 = x - rowWavg(x, w)
	x_2 = x_1*x_1
	len = size(w)
	adj = sqrt((len - 1) * len) \ (len - 2)
	skew = rowWsum(x_2, x_1) \ (x_var * x_std) * adj \ len
	return iif(x_std==0, 0, skew)
}

//weights:
w = 10 9 8 7 6 5 4 3 2 1

//权重偏度因子:
resWeight =  select TradeTime, SecurityID, `mathWghtSkew as factorname, mathWghtSkew(BidPrice, w)  as val from loadTable("dfs://LEVEL2_Snapshot_ArrayVector","Snap")  where date(TradeTime) = 2020.01.02 map
resWeight1 =  select TradeTime, SecurityID, `mathWghtSkew as factorname, mathWghtSkew(matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9), w)  as val from loadTable("dfs://snapshot_SH_L2_TSDB", "snapshot_SH_L2_TSDB")  where date(TradeTime) = 2020.01.02 map

#output
TradeTime               SecurityID factorname val               
----------------------- ---------- ---------- ------
...
2020.01.02T09:30:09.000 113537     array_1    -0.8828 
2020.01.02T09:30:12.000 113537     array_1    0.7371 
2020.01.02T09:30:15.000 113537     array_1    0.6041 
...
复制代码

3.3.4 基于快照数据的分钟聚合

投研中经常需要基于快照数据聚合分钟线的 OHLC ,下例就是这一场景中的通用做法:

//基于快照因子的分钟聚合OHLC,vwap
tick_aggr = select first(LastPx) as open, max(LastPx) as high, min(LastPx) as low, last(LastPx) as close, sum(totalvolumetrade) as vol,sum(lastpx*totalvolumetrade) as val,wavg(lastpx, totalvolumetrade) as vwap from loadTable("dfs://LEVEL2_Snapshot_ArrayVector","Snap") where date(TradeTime) <= 2020.01.30 and date(TradeTime) >= 2020.01.01 group by SecurityID, bar(TradeTime,1m) 
复制代码

3.3.5 逐笔数据

逐笔数据量较大,一般会针对成交量等字段进行计算,下面的例子计算了每天主买成交量占全部成交量的比例,同样使用 SQL 模式,发挥库内并行计算的优势,并使用 csort 语句用来对组内数据按照时间顺序排序:

@state
def buyTradeRatio(buyNo, sellNo, tradeQty){
    return cumsum(iif(buyNo>sellNo, tradeQty, 0))\cumsum(tradeQty)
}

factor = select TradeTime, SecurityID, `buyTradeRatio as factorname, buyTradeRatio(BuyNo, SellNo, TradeQty) as val from loadTable("dfs://tick_SH_L2_TSDB","tick_SH_L2_TSDB") where date(TradeTime)<2020.01.31 and time(TradeTime)>=09:30:00.000 context by SecurityID, date(TradeTime) csort TradeTime

#output
TradeTime           SecurityID factorname val              
------------------- ---------- ---------- ------
2020.01.08T09:30:07 511850     buyTradeRatio    0.0086
2020.01.08T09:30:31 511850     buyTradeRatio    0.0574
2020.01.08T09:30:36 511850     buyTradeRatio    0.0569
...
复制代码

4. 生产环境的流式因子计算

在生产环境中,DolphinDB 提供了实时流计算框架。在流计算框架下,用户在投研阶段封装好的基于批量数据开发的因子函数,可以无缝投入交易和投资方面的生产程序中,这就是通常所说的批流一体。使用流批一体可以加速用户的开发和部署。同时流计算框架还在算法的路径上,做了极致的优化,在具有高效开发的优势的同时,又兼顾了计算的高效性能。在这一章中,将会基于实际的状态因子案例,展示实时流计算的使用方法。

DolphinDB 流计算解决方案的核心部件是流计算引擎和流数据表。流计算引擎用于时间序列处理、横截面处理、窗口处理、表关联、异常检测等操作。流数据表可以看作是一个简化版的消息中间件,或者说是消息中间件中的一个主题(topic),可以往其发布(publish)数据,也可以从其订阅(subscribe)数据。流计算引擎和流数据表均继承于 DolphinDB 的数据表(table),因此都可以通过 append! 函数往其注入数据。流计算引擎的输出也是数据表的形式,因此多个计算引擎可以跟搭积木一样自由组合,形成流式处理的流水线。

4.1 流式增量计算

金融方面的原始数据和计算指标,在时间上通常有延续性的关系。以最简单的五周期移动均线 mavg(close,5) 为例,当新一个周期的数据传入模型时,可以将之前最远的第五周期值从 sum 中减出,再把最新一个周期的值加入 sum ,这样就不必每个周期只更新一个值时都重算一遍 sum 。这种增量计算是流计算的核心,可以大大降低实时计算的延时。DolphinDB内置了大量量化金融中需要用到的基本算子,并为这些算子实现了高效的增量算法。不仅如此,DolphinDB还支持自定义函数的增量实现。在前一章节中,部分自定义的因子函数加了修饰符 @state ,表示该函数支持增量计算。

4.1.1 主买成交量占比因子的流式处理

第三章3.3.5的逐笔数据因子的例子展示了主买成交量占比因子(buyTradeRatio)的批量实现方式。这儿我们演示如何使用响应式状态引擎(reactive state engine)来实现该因子的流式增量计算。

@state
def buyTradeRatio(buyNo, sellNo, tradeQty){
    return cumsum(iif(buyNo>sellNo, tradeQty, 0))\cumsum(tradeQty)
}

tickStream = table(1:0, `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNo`SellNo, [SYMBOL,DATETIME,DOUBLE,INT,DOUBLE,LONG,LONG])
result = table(1:0, `SecurityID`TradeTime`Factor, [SYMBOL,DATETIME,DOUBLE])
factors = <[TradeTime, buyTradeRatio(BuyNo, SellNo, TradeQty)]>
demoEngine = createReactiveStateEngine(name="demo", metrics=factors, dummyTable=tickStream, outputTable=result, keyColumn="SecurityID")
复制代码

上述代码创建了一个名为demo的响应式状态引擎,SecurityID作为分组键,输入的消息格式同内存表tickStrea。需要计算的指标定义在factors中,其中1个是输入表中的原始字段TradeTime,另一个是我们需要计算的因子的函数表示。输出到内存表result,除了在factors中定义的指标外,输出结果还会添加分组键。请注意,自定义的因子函数跟批计算中的完全一致!创建完引擎之后,我们即可往引擎中插入几条数据,并观察计算结果。

insert into demoEngine values(`000155, 2020.01.01T09:30:00, 30.85, 100, 3085, 4951, 0)
insert into demoEngine values(`000155, 2020.01.01T09:30:01, 30.86, 100, 3086, 4951, 1)
insert into demoEngine values(`000155, 2020.01.01T09:30:02, 30.80, 200, 6160, 5501, 5600)

select * from result

SecurityID TradeTime           Factor
---------- ------------------- ------
000155     2020.01.01T09:30:00 1
000155     2020.01.01T09:30:01 1
000155     2020.01.01T09:30:02 0.5
复制代码

从这个例子可以看出,在DolphinDB中实现因子的流式增量计算非常简单。如果在投研阶段,已经用我们推荐的方式自定义了一个因子函数,在生产阶段只要程序性的创建一个流式计算引擎即可实现目标。这也是为什么我们一再强调,自定义的因子函数必须使用规范的接口,而且只包含核心的因子逻辑,不用考虑并行计算等问题。

4.1.2 大小单的流式处理

资金流分析是逐笔委托数据的一个重要应用场景。在实时处理逐笔数据时,大小单的统计是资金流分析的一个具体应用。大小单在一定程度上能反映主力、散户的动向。但在实时场景中,大小单的生成有很多难点:(1) 大小单的计算涉及历史状态,如若不能实现增量计算,当计算下午的数据时,可能需要回溯有关这笔订单上午的数据,效率会非常低下。 (2)计算涉及至少两个阶段。在第一阶段需要根据订单分组,根据订单的累计成交量判断大小单,在第二阶段要根据股票来分组,统计每个股票的大小单数量及金额。

大小单是一个动态的概念。一个小单在成交量增加后可能变成一个大单。DolphinDB的两个内置函数dynamicGroupCumsumdynamicGroupCumcount用于对动态组的增量计算。完整的代码请参考:章节附件4.1.2 大小单的流式处理

@state
def factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag){
	cumsumTradeAmount = cumsum(tradeAmount)
	smallSellCumAmount, bigSellCumAmount = dynamicGroupCumsum(sellCumAmount, prevSellCumAmount, sellOrderFlag, prevSellOrderFlag, 2)
	smallBuyCumAmount, bigBuyCumAmount = dynamicGroupCumsum(buyCumAmount, prevBuyCumAmount, buyOrderFlag, prevBuyOrderFlag, 2) 
	f = (smallBuyCumAmount - smallSellCumAmount) \ cumsumTradeAmount
	return smallBuyCumAmount, smallSellCumAmount, cumsumTradeAmount, f
}

def createStreamEngine(result){
	tradeSchema = createTradeSchema()
	result1Schema = createResult1Schema()
	result2Schema = createResult2Schema()
	engineNames = ["rse1", "rse2", "res3"]
	cleanStreamEngines(engineNames)
	
	metrics3 = <[TradeTime, factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag)]>
	rse3 = createReactiveStateEngine(name=engineNames[2], metrics=metrics3, dummyTable=result2Schema, outputTable=result, keyColumn="SecurityID")
	
	metrics2 = <[BuyNo, SecurityID, TradeTime, TradeAmount, BuyCumAmount, PrevBuyCumAmount, BuyOrderFlag, PrevBuyOrderFlag, factorOrderCumAmount(TradeAmount)]>
	rse2 = createReactiveStateEngine(name=engineNames[1], metrics=metrics2, dummyTable=result1Schema, outputTable=rse3, keyColumn="SellNo")
	
	metrics1 = <[SecurityID, SellNo, TradeTime, TradeAmount, factorOrderCumAmount(TradeAmount)]>
	return createReactiveStateEngine(name=engineNames[0], metrics=metrics1, dummyTable=tradeSchema, outputTable=rse2, keyColumn="BuyNo")
}
复制代码

自定义函数factorSmallOrderNetAmountRatio是一个状态因子函数,用于计算小单的净流入资金占总的交易资金的比例。createStreamEngine创建流式计算引擎。我们一共创建了3个级联的响应式状态引擎,后一个作为前一个的输出,因此从最后一个引擎开始创建。前两个计算引擎rse1和rse2分别以买方订单号(BuyNo)和卖方订单号(SellNo)作为分组键,计算每个订单的累计交易量,并以此区分是大单或小单。第三个引擎rse3把股票代码(SecurityID)作为分组键,统计每个股票的小单净流入资金占总交易资金的比例。下面我们输入一些样本数据来观察流计算引擎的运行。

result = createResultTable()
rse = createStreamEngine(result)
insert into rse values(`000155, 1000, 1001, 2020.01.01T09:30:00, 20000)
insert into rse values(`000155, 1000, 1002, 2020.01.01T09:30:01, 40000)
insert into rse values(`000155, 1000, 1003, 2020.01.01T09:30:02, 60000)
insert into rse values(`000155, 1004, 1003, 2020.01.01T09:30:03, 30000)

select * from result

SecurityID TradeTime smallBuyOrderAmount smallSellOrderAmount totalOrderAmount factor
---------- ------------------- ------------------- -------------------- ---------------- ------
000155     2020.01.01T09:30:00 20000               20000                20000            0
000155     2020.01.01T09:30:01 60000               60000                60000            0
000155     2020.01.01T09:30:02 0                   120000               120000           -1
000155     2020.01.01T09:30:03 30000               150000               150000           -0.8
复制代码

4.1.3 复杂因子Alpha #1流式计算的快捷实现

从前一个大小单的例子可以看到,有些因子的流式实现比较复杂,需要创建多个引擎进行流水线处理来完成。完全用手工的方式来创建多个引擎其实是一件耗时的工作。如果输入的指标计算只涉及一个分组键,DolphinDB提供了一个解析引擎streamEngineParser来解决此问题。下面我们以第三章3.1面板数据模式的alpha #1因子为例,展示streamEngineParser的使用方法。完整代码参考Alpha #1流式计算。以下为核心代码。

@state
def alpha1TS(close){
	return mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20),close), 2.0), 5)
}

def alpha1Panel(close){
	return rowRank(X=alpha1TS(close), percent=true) - 0.5
}

inputSchema = table(1:0, ["SecurityID","TradeTime","close"], [SYMBOL,TIMESTAMP,DOUBLE])
result = table(10000:0, ["TradeTime","SecurityID", "factor"], [TIMESTAMP,SYMBOL,DOUBLE])
metrics = <[SecurityID, alpha1Panel(close)]>
streamEngine = streamEngineParser(name="alpha1Parser", metrics=metrics, dummyTable=inputSchema, outputTable=result, keyColumn="SecurityID", timeColumn=`tradetime, triggeringPattern='keyCount', triggeringInterval=4000)
复制代码

因子alpha1实际上包含了时间序列处理和横截面处理,需要响应式状态引擎和横截面引擎串联来处理才能完成。但这儿我们仅仅使用了streamEngineParser就创建了全部引擎,大大简化了创建过程。

前面三个例子展示了DolphinDB如何通过流计算引擎实现因子在生产环境中的增量计算。值得注意的是,流式计算时直接使用了投研阶段生成的核心因子代码,这很好的解决了传统金融分析面临的批流一体问题。在传统的研究框架下,用户往往需要对同一个因子计算逻辑写两套代码,一套用于在历史数据上建模、回测,另外一套专门处理盘中传入的实时数据。这是因为数据传入程序的形状(机制)不统一,又甚至是编程语言也无法统一。比如研究分析使用了 python 或者 R,在 python 或 R 的研究程序确定模型和参数后,生产交易的程序必须用 C++ 再实现这套模型,才能保证交易时的执行效率。在两套代码完成后,还要再校验它们计算出来的结果是否一致。这样的业务流程毫无疑问加重了研究员和程序员们的负担,也让基金经理们没法更快地让新交易思路迭代上线。在DolphinDB的流式计算中,实时行情订阅、行情数据收录、交易实时计算、盘后研究建模,全都用同一套代码完成,保证在历史回放和生产交易当中数据完全一致。

除了三个例子中用到的响应式状态引擎(reactive state engine)和横截面引擎(cross sectional engine),DolphinDB 还提供了多种流数据处理引擎包括做流表连接的 asof join engine,equal join engine,lookup join engine,window join engine ,时间序列聚合引擎(time series engine),异常检测引擎(anomaly detection engine),会话窗口引擎(session window engine)等。

4.2 数据回放

前一节我们介绍了因子计算的批流一体实现方案,简单地说,就是一套代码(自定义的因子函数),两种引擎(批计算引擎和流计算引擎)。事实上,DolphinDB提供一种更为简洁的批流一体实现方案,那就是在历史数据建模时,通过数据回放,也用流引擎来实现计算。

在第三章中介绍了用SQL语句方式批处理计算factorDoubleEMA因子的例子,这里介绍如何使用流计算的方式回放数据,计算 factorDoubleEMA 的因子值。全部代码参考章节附件4.2 流计算factorDoubleEMA因子

//创建流引擎,并传入因子算法factorDoubleEMA
factors = <[TradeTime, factorDoubleEMA(close)]>
demoEngine = createReactiveStateEngine(name=engineName, metrics=factors, dummyTable=inputDummyTable, outputTable=resultTable, keyColumn="SecurityID")
	
//demo_engine订阅snapshotStreamTable流表
subscribeTable(tableName=snapshotSharedTableName, actionName=actionName, handler=append!{demoEngine}, msgAsTable=true)

//创建播放数据源供replay函数历史回放;盘中的时候,改为行情数据直接写入snapshotStreamTable流表
inputDS = replayDS(<select SecurityID, TradeTime, LastPx from tableHandle where date(TradeTime)<2020.07.01>, `TradeTime, `TradeTime)
复制代码

4.3 对接交易系统

DolphinDB 本身具有多种常用编程语言的API,包括C++, java, javascript, c#, python, go等。使用这些语言的程序,都可以调用该语言的 DolphinDB 接口,订阅到 DolphinDB 服务器的流数据。本例提供一个简单的python接口订阅流数据样例。

DolphinDB-Python API订阅流数据例子:

current_ddb_session.subscribe(host=DDB_datanode_host,tableName=stream_table_shared_name,actionName=action_name,offset=0,resub=False,filter=None,port=DDB_server_port,
handler=python_callback_handler,#此处传入python端要接收消息的回调函数
)
复制代码

在金融生产环境中,更常见的情况,是流数据实时的灌注到消息队列中,供下游的其他模块消费。DolphinDB 也支持将实时计算结果推送到消息中间件,与交易程序对接。示例中提供的样例,使用 DolphinDB 的开源 ZMQ 插件,将实时计算的结果推送到 ZMQ 消息队列,供下游ZMQ协议的订阅程序消费(交易或展示)。除ZMQ之外,其他支持的工具都在 DolphinDB 插件库中提供。所有已有的 DolphinDB 插件都是开源的,插件的编写组件也是开源的,用户也可按自己的需要编写。

DolphinDB向ZMQ消息队列推送流数据代码样例:

1.首先启动下游的ZMQ数据消费程序,作为监听端(ZeroMQ消息队列的服务端),完整代码见章节附件4.3.2 向ZMQ推送流数据

zmq_context = Context()
zmq_bingding_socket = zmq_context.socket(SUB)#见完整版代码设定socket选项
zmq_bingding_socket.bind("tcp://*:55556")		
async def loop_runner():    
    while True:
        msg=await zmq_bingding_socket.recv()#阻塞循环until收到流数据
        print(msg)#在此编写下游消息处理代码	
asyncio.run(loop_runner())
复制代码

2.启动因子数据的流处理计算和发布

在外部消费ZMQ消息的程序启动后,DolphinDB端要启动流计算,并开始对外发布计算结果。以下是DolphinDB端的代码。输出结果表之前的所有代码部分,和4.2中流处理计算doubleEma因子例子的一致,故下例代码中不再赘述。

resultSchema=table(1:0,["SecurityID","TradeTime","factor"], [SYMBOL,TIMESTAMP,DOUBLE])//输出到消息队列的表结构

def zmqPusherTable(zmqSubscriberAddress,schemaTable){
	SignalSender=def (x) {return x}
	pushingSocket = zmq::socket("ZMQ_PUB", SignalSender)
	zmq::connect(pushingSocket, zmqSubscriberAddress)
	pusher = zmq::createPusher(pushingSocket, schemaTable)
	return pusher
}

zmqSubscriberAddress="tcp://192.168.1.195:55556"//引擎demoEngine向zmq队列推送,使用时根据不同的zmq地址修改此字符串

pusherTable=zmqPusherTable(zmqSubscriberAddress,resultSchema)//生成一个逻辑表向上述地址发送zmq包,字段结构参照resultSchema

demoEngine = createReactiveStateEngine(name="reactiveDemo", metrics=<[TradeTime,doubleEma(LastPx)]>, dummyTable=snapshotSchema, outputTable=pusherTable, keyColumn="SecurityID",keepOrder=true)//创建流引擎,output指定输出到pusher表
复制代码

5. 因子的存储和查询

无论是批量计算还是实时计算,将DolphinDB中计算生成的因子保存下来提供给投研做后续的分析都是很有意义的。本章主要是根据存储、查询,使用方式等方面,来分析如何基于使用场景来选择更高效的存储模型。

在实际考虑数据存储方案,我们需要从以下三个方面考虑:

  • 选择OLAP引擎还是TSDB引擎。OLAP最适合全量跑批计算,TSDB则在序列查询上优势突出,性能和功能上比较全面。
  • 因子的存储方式是单值纵表方式还是多值宽表方式。 单值方式的最大优点是灵活性强,增加因子和股票时,不用修改表结构,缺点是数据冗余度高。多值宽表的数据冗余度很低,配合TSDB引擎的array vector,存储效率很高,但是新因子或新股票的出现,需要重新生成因子表。
  • 分区方式选择。可用于分区的列包括时间列,股票代码列和因子列。OLAP引擎推荐的分区大小为原始数据100MB左右。TSDB引擎推荐的分区设置为原始数据100MB~1GB范围会性能最佳。

结合以上考虑因素,我们以4000只股票,1000个因子,存储分钟级因子库为例,我们有如下三种选择:

  • 以纵表存储,使用OLAP引擎,每行按时间存储一只股票一个因子数据,分区方案 VALUE(天)+ HASH(因子名,125)。
  • 以纵表存储,使用TSDB引擎,每行按时间存储一只股票一个因子数据,分区方案 VALUE(月)+ HASH(因子名,50), 按股票代码+时间排序。
  • 以宽表存储,使用TSDB引擎,每行按时间存储全部股票一个因子,或者一支股票全部因子数据,分区方案VALUE(月)+ HASH(因子名,20),按因子名+时间排序。

OLAP引擎是纯列式存储,不适合表过宽,在列数超过80以后,写入性能会逐渐下降,故不做考虑。

纵表结构:

宽表结构:

5.1 因子存储

我们以存储5个因子一年的分钟级数据来进行测试,比对这三种存储模式在数据大小、实际使用的存储空间、写入速度等方面的优劣。

我们以存储5个因子一年的分钟级数据来进行测试,比对这三种存储模式在数据大小、实际使用的存储空间、写入速度等方面的优劣。

从比对结果来看,宽表 TSDB 模式的写入速度是纵表 OLAP 的4倍,纵表 TSDB 的5倍,存储空间上宽表 TSDB 和 OLAP 纵表相近,均约为 TSDB 纵表的三分之二,压缩比上纵表 OLAP 最优,纵表 TSDB 次之,宽表 TSDB 最差。这是因为首先实际产生的数据字节上,纵表模式是宽表模式的三倍,这决定了宽表 TSDB 的的写入速度最优,磁盘使用空间最优,也导致了宽表 TSDB 模式的压缩比会相对差一些,另外模拟数据随机性很多大,也影响了 TSDB 引擎宽表得数据压缩;其次 TSDB 引擎会进行数据排序,生成索引,所以同样是纵表,TSDB 引擎在存储空间、存储速度、压缩比方面都要略逊于 OLAP 引擎。

具体脚本参考因子数据存储模拟脚本(从此处下载 factor_data_simulation_script.zip 文件)。

5.2 因子查询

下面我们模拟大数据量来进行查询测试,模拟4000支股票,200个因子,一年的分钟级数据,详细数据信息及分区信息见下面表格:

下面我们通过多个角度的查询测试来比对这三种存储方式的查询性能。因子查询测试脚本

  • 查询1个因子1只股票指定时间点数据

在点查询上TSDB引擎优势明显,而宽表TSDB因为数据行数少,速度上还要快于纵表TSDB模式。

  • 查询1个因子1只股票一年分钟级数据

查询单因子单股票一年的分钟级数据宽表TSDB引擎速度最快,这是因为TSDB引擎分区较大,读取的文件少,且数据有排序,而OLAP引擎本身数据分区较小,需要扫描的行数又同样不少,所以速度最慢。

  • 查询1个因子全市场股票一年分钟级数据

宽表TSDB读取速度最快,读取的总数据量比较大时,这几种模式都会读取很多完整分区,而宽表TSDB模式因为实际数据比较小,所以速度上是纵表OLAP的一半,是纵表TSDB的三分之一略多。

  • 查询3个因子全市场股票一年分钟级数据

更大数据量的数据读取,查询耗时线性增长,同样原因,宽表TSDB读取速度仍然最快。

  • 查询一只股票全部因子一年的分钟级数据

宽表在进行该查询时,查询SQL应只选择需要股票代码列,SQL如下:

//纵表查询sql, 查询全部字段,使用通配符*
tsdb_symbol_all=select  * from tsdb_min_factor where symbol=`sz000056 

//宽表查询sql,只检索部分字段,详细列出
select mtime,factorname,sz000001 from tsdb_wide_min_factor
复制代码

以上结果可以看到,宽表 TSDB 引擎和纵表 TSDB 都可以很快的查出数据,而纵表模式 OLAP 则需要百倍以上的时间才能查询出数据。这是因为纵表模式 OLAP 的分区字段是时间和因子,这种情况下查询某只股票所有的因子需要扫描全部分区的全部列才能取出所需的数据;而宽表TSDB引擎只需要取三列数据,所以可以很快查出数据;纵表TSDB引擎可以按股票代码进行索引检索所以速度也比较快。

综上所述,因子的存储需根据不同的查询习惯去做规划。本节中的这些查询,推荐使用宽表TSDB的方式存储因子。

5.3 在线获取面板数据

针对不同的存储模型,在使用时若需要面板数据,DolphinDB 也有在线转换的方式。

  • 生成1个因子全市场股票一年分钟级面板数据

    //纵表模式取面板数据sql olap_factor_year_pivot_1=select val from olap_min_factor where factorcode=f0002 pivot by tradetime,symbol //宽表模式取面板数据sql wide_tsdb_factor_year=select * from tsdb_wide_min_factor where factorname =f0001

宽表 TSDB 模式查询面板数据时的速度是纵表 OLAP 和纵表 TSDB 的十倍以上,这是因为宽表 TSDB 的数据本身就以类似面板数据的方式存储,不需要再转换为面板数据,而纵表模式无论 OLAP 引擎还是 TSDB 引擎查询出数据后还要使用 pivot by 进行列转行操作,这个命令要进行数据比对去重、排序等操作,所以会耗费一些时间,在数据量大时,耗时明显,所以速度会大幅幅度落后于宽表 TSDB 模式。

  • 生成3个因子全市场股票一年分钟级面板数据

    //纵表模式取面板数据sql olap_factor_year_pivot=select val from olap_min_factor where factorcode in ('f0001','f0002','f0003') pivot by tradetime,symbol ,factorcode //宽表模式取面板数据sql wide_tsdb_factor_year=select * from tsdb_wide_min_factor where factorname in ('f0001','f0002','f0003')

宽表 TSDB 引擎具有最佳的查询性能,随着数据量上升,纵表数据列转行操作要额外增加 piovt by 的列,从而增加更多的去重、排序操作,导致生成面板数据的耗时进一步增加。

使用宽表 TSDB 模式存储在以下方面均有明显优势:

  • (1) 存储空间:虽然宽表 TSDB 在压缩比上相对逊色,但是由于宽表模式本书数据字节只有纵表模式的三分之一,所以在空间开销上宽表 TSDB 模式使用最小;
  • (2) 存储速度:宽表 TSDB 模式的在写入相同有效数据的情况下写入速度是纵表 OLAP 的4倍,纵表 TSDB 的5倍;
  • (3) 直接检索数据: 宽表TSDB模式在不同场景的查询速度至少是纵表OLAP和纵表TSDB的1.5倍,甚至可能达到100倍以上;
  • (4) 以面板模式检索数据:宽表 TSDB 模式的查询速度是纵表 OLAP 和纵表 TSDB 的至少10倍以上;
  • (5) 在以非分区维度检索数据:例如,按因子分区的按股票检索数据,此场景宽表TSDB模式查询速度是纵表 OLAP 和纵表 TSDB 的300倍和500倍。

综上,如果一定时期内股票和因子数量固定,因子存储的最佳选择方式为TSDB宽表的模式进行存储,用户可以按实际的查询习惯,来选择生成以股票名或因子名做为列的宽表。

猜你喜欢

转载自juejin.im/post/7101124145001283614