*** 这是一个尚未更新完全的博文***
引入包:
from jqdata import macro
import datetime
总体回测前:
'''
================================================================================
总体回测前
================================================================================
'''
#总体回测前要做的事情
def initialize(context):
set_params() #设置策参数
set_globalvars() #设置全局变量
set_slip_fee(context) #设置滑点和手续费
#设置策参数
def set_params():
set_option('use_real_price', True)#用真实价格交易
log.set_level('order', 'error')
# g.factors=["market_cap","roe"] # 用户选出来的因子
# # 因子等权重里1表示因子值越小越好,-1表示因子值越大越好
# g.weights=[[1],[-1]]
#设置全局变量
def set_globalvars():
g.t=0 #记录回测运行的天数
g.if_trade=False #当天是否交易
g.tc=15 # 调仓频率
g.maxnum =20 # 最大持仓数目
g.stockpool_index = '000002.XSHG' #设置初始股票池
g.buy_list = [] # 将要买入的股票
g.sell_lisy = [] # 将要卖出的股票
g.maxposition = 1.0 # 最大的持仓比例
# 根据不同的时间段设置滑点与手续费
def set_slip_fee(context):
# 将滑点设置为0
set_slippage(FixedSlippage(0))
# 根据不同的时间段设置手续费
dt=context.current_dt
log.info(type(context.current_dt))
if dt>datetime.datetime(2013,1, 1):
set_commission(PerTrade(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
elif dt>datetime.datetime(2011,1, 1):
set_commission(PerTrade(buy_cost=0.001, sell_cost=0.002, min_cost=5))
elif dt>datetime.datetime(2009,1, 1):
set_commission(PerTrade(buy_cost=0.002, sell_cost=0.003, min_cost=5))
else:
set_commission(PerTrade(buy_cost=0.003, sell_cost=0.004, min_cost=5))
每天开盘前:
'''
================================================================================
每天开盘前
================================================================================
'''
#每天开盘前要做的事情
def before_trading_start(context):
g.stockpool = get_index_stocks(g.stockpool_index) # 根据所选基础指数成分股更新当前基础股票池
g.maxposition = systematicrisk(context) # 获取当日可接受最高仓位
g.sell_list = [] # 清空g.sell_list
g.buy_list = [] # 清空g.buy_list
stoploss(context) # 运行止损函数,将满足止损条件的股票append到g.sell_list中
get_selllist(context) # 运行持仓股基本面分析函数,将满足售出基本面条件的股票append到g.sell_list中
if(context.current_dt.isoweekday() == 1):#判断是否周一
get_buylist(context) # 运行基本面分析函数,将基础股票池中满足买入基本面条件的股票append到g.buy_list中
filter_paused_stock(context)# 剔除停牌股票
def systematicrisk(context):
"""
分析市场系统性风险,并通过不同的风险等级返回当前可以接受最大仓位。
Parameters:
context
Returns:
float-当前可以接受最大仓位
Raises:
KeyError - raises an exception
"""
base_score = 50
#1
#前两日shibor 7天利率变动,降低10个基点以上则此项10分,降低25个基点以上则此项15分;
#上升10个基点以上则此项-10分,上升25个基点则此项-15分
query_object1 = query(macro.MAC_LEND_RATE.day,
macro.MAC_LEND_RATE.interest_rate
).filter(macro.MAC_LEND_RATE.currency_id=='1',
macro.MAC_LEND_RATE.market_id=='5',
macro.MAC_LEND_RATE.term_id=='7',
macro.MAC_LEND_RATE.day > context.current_dt - datetime.timedelta(days=3),
macro.MAC_LEND_RATE.day < context.current_dt
).limit(2)
df_shibor = macro.run_query(query_object1)
diff = df_shibor.iloc[1]['interest_rate'] - df_shibor.iloc[0]['interest_rate']
if(diff<=-0.25):
score1 = -15
elif(diff<=-0.10 && diff>-0.25):
score1 = -10
elif(score>=0.10 && diff<0.25):
score1 = 10
elif(score>=0.25):
score1 = 15
#2(每周只分析一次该项)
#消费者景气指数(月度),表名:MAC_CONSUMER_BOOM_IDX
#最近数据为预警,则此项为-5分,非预警则此项为5分。
if(context.current_dt.isoweekday() == 1):#判断是否周一
else:
score2 = 0
#3
#大盘10日均线死叉-15分,金叉+15分
#4
#市场情绪BRAR-情绪指标
#BR>400,暗示行情过热,-5分;BR<40 ,行情将起死回生,+5分;
#AR>180,能量耗尽,-5分;AR<40 ,能量已累积爆发力,+5分;
score = base_score + score1 + score2 + score3 + score4
if(score > 100):
score = 100
if(score < 10):
score = 10
return score * 0.01
def stoploss(context):
"""
持仓股止损函数,判断持仓股是否满足止损要求,并将满足要求的股票append到sell_list
Parameters:
context
Returns:
list- 需要止损的股票代码列表
Raises:
KeyError - raises an exception
"""
def get_selllist(context):
"""
基本面分析持仓股是否应该卖出,将满足要求的股票append到sell_list
Parameters:
context
Returns:
list- 通过基本面分析得出需要卖出的股票代码列表
Raises:
KeyError - raises an exception
"""
def get_buylist():
"""
基本面分析持仓股是否应该卖出,将满足要求的股票append到sell_list
Parameters:
context
Returns:
list- 通过基本面分析得出需要卖出的股票代码列表
Raises:
KeyError - raises an exception
"""
def filter_paused_stock(context):
"""
过滤掉当日停牌的股票,删除buy_list中和sell_list中停牌的股票
Parameters:
context
Returns:
list- 在buy_list中和sell_list中当日未停牌的股票
Raises:
KeyError - raises an exception
"""
current_data = get_current_data()
g.buy_list = [stock for stock in g.buy_list if not current_data[stock].paused]
g.sell_list = [stock for stock in g.sell_list if not current_data[stock].paused]
return g.buy_list , g.sell_list
每天交易时:
'''
================================================================================
每天交易时
================================================================================
'''
def handle_data(context, data):
if g.if_trade==True:
# 计算现在的总资产,以分配资金,这里是等额权重分配
g.everyStock=context.portfolio.portfolio_value/g.N
# 获得今天日期的字符串
todayStr=str(context.current_dt)[0:10]
# 获得因子排序
a,b=getRankedFactors(g.factors,todayStr)
# 计算每个股票的得分
points=np.dot(a,g.weights)
# 复制股票代码
stock_sort=b[:]
# 对股票的得分进行排名
points,stock_sort=bubble(points,stock_sort)
# 取前N名的股票
toBuy=stock_sort[0:g.N].values
# 对于不需要持仓的股票,全仓卖出
order_stock_sell(context,data,toBuy)
# 对于不需要持仓的股票,按分配到的份额买入
order_stock_buy(context,data,toBuy)
g.if_trade=False
#获得卖出信号,并执行卖出操作
#输入:context, data,toBuy-list
#输出:none
def order_stock_sell(context,data,toBuy):
#如果现有持仓股票不在股票池,清空
list_position=context.portfolio.positions.keys()
for stock in list_position:
if stock not in toBuy:
order_target(stock, 0)
#7
#获得买入信号,并执行买入操作
#输入:context, data,toBuy-list
#输出:none
def order_stock_buy(context,data,toBuy):
# 对于不需要持仓的股票,按分配到的份额买入
for i in range(0,len(g.all_stocks)):
if indexOf(g.all_stocks[i],toBuy)>-1:
order_target_value(g.all_stocks[i], g.everyStock)
#8
#查找一个元素在数组里面的位置,如果不存在,则返回-1
#输入:元素,对应数组
#输出:-1
def indexOf(e,a):
for i in range(0,len(a)):
if e==a[i]:
return i
return -1
#9
#取因子数据
#输入:f-全局通用的查询,d-str
#输出:因子数据,股票的代码-dataframe
def getRankedFactors(f,d):
# 获得股票的基本面数据,这个API里面有,g.q是一个全局通用的查询
df = get_fundamentals(g.q,d)
# 为了防止Python里面的浅复制现象,采用循环来定义二维数组
res = [([0] * len(f)) for i in range(len(df))]
# 把数据填充到刚才定义的数组里面
for i in range(0,len(df)):
for j in range(0,len(f)):
res[i][j]=df[f[j]][i]
# 用均值填充NaN值
fillNan(res)
# 将数据变成排名
getRank(res)
# 返回因子数据和股票的代码(这个是因为沪深300指数成分股一直在变,如果用未来的沪深300指数成分股在之前可能有一些股票还没上市)
return res,df['code']
#10
#把每列原始数据变成排序的数据
#输入:r-list
#输出:r-list
def getRank(r):
# 定义一个临时数组记住一开始的顺序
indexes=list(range(0,len(r)))
# 对每一列进行冒泡排序
for k in range(len(r[0])):
for i in range(len(r)):
for j in range(i):
if r[j][k] < r[i][k]:
# 交换所有的列以及用于记录一开始的顺序的数组
indexes[j], indexes[i] = indexes[i], indexes[j]
for l in range(len(r[0])):
r[j][l], r[i][l] = r[i][l], r[j][l]
# 将排序好的因子顺序变成排名
for i in range(len(r)):
r[i][k]=i+1
# 再进行一次冒泡排序恢复一开始的股票顺序
for i in range(len(r)):
for j in range(i):
if indexes[j] > indexes[i]:
indexes[j], indexes[i] = indexes[i], indexes[j]
for k in range(len(r[0])):
r[j][k], r[i][k] = r[i][k], r[j][k]
# 因为Python是引用传递,所以其实这个可以不用返回值也行,当然如果你想用另外一个变量来存储排序结果的话可以考虑返回值的方法
return r
#11
#用均值填充Nan
#输入:m-list
#输出:m-list
def fillNan(m):
# 计算出因子数据有多少行(行是不同的股票)
rows=len(m)
# 计算出因子数据有多少列(列是不同的因子)
columns=len(m[0])
# 这个循环是对每一列进行操作
for j in range(0,columns):
# 定义一个临时变量,用来存储每列加总的值
sum=0.0
# 定义一个临时变量,用来计算非NaN值的个数
count=0.0
# 计算非NaN值的总和和个数
for i in range(0,rows):
if not(isnan(m[i][j])):
sum+=m[i][j]
count+=1
# 计算平均值,为了防止全是NaN,如果当整列都是NaN的时候认为平均值是0
avg=sum/max(count,1)
for i in range(0,rows):
# 这个for循环是用来把NaN值填充为刚才计算出来的平均值的
if isnan(m[i][j]):
m[i][j]=avg
return m
#12
#定义一个冒泡排序的函数
#输入:numbers是股票的综合得分-list
#输出:indexes是股票列表-list
def bubble(numbers,indexes):
for i in range(len(numbers)):
for j in range(i):
if numbers[j][0] < numbers[i][0]:
# 在进行交换的时候同时交换得分以记录哪些股票得分比较高
numbers[j][0], numbers[i][0] = numbers[i][0], numbers[j][0]
indexes[j], indexes[i] = indexes[i], indexes[j]
return numbers,indexes
'''
================================================================================
每天收盘后
================================================================================
'''
# 每日收盘后要做的事情(本策略中不需要)
def after_trading_end(context):
return