这篇文章共分为四个部分:第一个部分是因子测试结果,第二个部分是因子逻辑,第三个部分是因子代码,第四个部分是整个因子测试用的数据、代码、分析结果的下载地址。
因子测试结果:
测试代码:
import pandas as pd
import numpy as np
import os
import copy
from backtrader.vectors.cs import AlphaCs
# from backtrader.vectors.cal_functions import get_symbol
from itertools import product
from multiprocessing import Pool
# import time
import pickle
import tqdm
# 编写相应的策略
class AlphaCs0004(AlphaCs):
def cal_alpha(self, data):
symbol = self.symbol
look_back_days = self.params['look_back_days']
if look_back_days == 1:
look_back_days = 2
begin_date = self.params["begin_date"]
data = data[data['trading_date'] >= pd.to_datetime(begin_date, utc=True)]
if len(data) > look_back_days:
data['vol_pct'] = data['volume'].pct_change()
data['ret'] = (data['close'] - data['open']) / data['open']
data[symbol] = -1 * data["vol_pct"].rolling(look_back_days).corr(data["ret"])
else:
data[symbol] = np.nan
# print(data)
return data[symbol].dropna()
def run(params, datas, add_datas):
new_params = {"look_back_days": params[0], "hold_days": params[1], "percent": params[2],
"begin_date": params[3]}
add_arr = {
'opens_arr': copy.copy(add_datas['opens_arr']),
'closes_arr': copy.copy(add_datas['closes_arr']),
# 'highs_arr' : copy.copy(content['highs_arr']),
# 'lows_arr' : copy.copy(content['lows_arr']),
}
new_params.update(add_arr)
# print(new_params)
test_alpha = AlphaCs0004(datas, new_params)
return test_alpha.run()
if __name__ == "__main__":
print("等待回测")
# con = False
# while not con:
# with open("d:/result/run_schedule.txt", "r") as f:
# data = f.read()
# print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
# print(data)
# if "正在执行" not in data:
# con = True
# with open("d:/result/run_schedule.txt", "w") as f:
# f.write(f"{__file__}正在执行")
# time.sleep(18)
# 读取写入成功的数据
root_dir = os.path.dirname(os.path.abspath(__file__))
# print(root_dir)
with open(root_dir + "/data/factor_datas.pkl", "rb") as f:
datas = pickle.load(f)
# with open(root_dir+"/data/add_datas.pkl", "rb") as f:
# add_datas = pickle.load(f)
with open(root_dir + "/data/add_arrs.pkl", "rb") as f:
add_datas = pickle.load(f)
# time_a = time.perf_counter()
# # 测试单个参数组合
# param_list = [[1, 1, 0.4], [60, 20, 0.4], [120, 60, 0.4], [180, 100, 0.4]]
# # param_list = [[210, 140, 0.4]]
# for param in param_list:
# a = {"look_back_days": param[0],
# "hold_days": param[1],
# "percent": param[2],
# "begin_date": "2013-01-01",
# 'opens_arr': copy.copy(add_datas['opens_arr']),
# 'closes_arr': copy.copy(add_datas['closes_arr']),
# # 'highs_arr' : copy.copy(content['highs_arr']),
# # 'lows_arr' : copy.copy(content['lows_arr']),
# # "total_value_save_path": root_dir,
# # "commission": 0.0002,
# }
# # a.update(add_datas)
# test_alpha = Alpha222(datas, a)
# test_alpha.run()
# # if param[0] == 1:
# # test_alpha.plot()
# time_b = time.perf_counter()
# print(f"4个参数一共使用了{time_b - time_a}秒")
# assert 0
k01_list = list(range(1, 30))
k02_list = list(range(30, 90, 5))
k03_list = list(range(90, 310, 10))
k11_list = list(range(1, 30))
k12_list = list(range(30, 90, 5))
k13_list = list(range(90, 310, 10))
k2_list = [0.2, 0.3, 0.4]
# k2_list = [0.3]
k3_list = ["2013-01-01", "2013-04-01", "2013-07-01", "2013-10-01"]
param_list = list(product(k01_list, k11_list, k2_list, k3_list)) + list(product(k02_list, k12_list, k2_list, k3_list
)) + list(product(k03_list, k13_list, k2_list, k3_list))
pbar = tqdm.tqdm(total=len(param_list))
with Pool(processes=10) as p:
res = [p.apply_async(run, args=(param, datas, add_datas,), callback=lambda args: pbar.update()) for param in
param_list]
# print('非阻塞')
results = [i.get() for i in res]
result_df = pd.DataFrame(results)
# print(result_df)
result_df.columns = ["look_back_days", "hold_days", "percent", "begin_date",
"sharpe_ratio", "annual_ret", "drawdown"]
result_df.to_csv(root_dir + "/cs_单因子测试结果.csv")
del datas
print(f"{__file__} 回测结束,保存结果成功,并开始运行下一个策略")
代码数据下载地址:
链接: https://pan.baidu.com/s/1O5R-SvtOyIZmaZR2hk29cw?pwd=w2w9 提取码: w2w9 复制这段内容后打开百度网盘手机App,操作更方便哦