这是一个简易的模拟实际交易流程的回测框架,所使用的行情数据是单档的tick成交数据。为了实现调用者可以实现自己的交易逻辑,本框架预留了几个函数予以调用者能够继承类后在子类中重写以实现买入卖出信号的生成(check_sell()和check_buy())。
因为最近比较忙,忙着实习、放假忙着骑车(找骑友,在上海),所以文档上的内容就没有写得很详细啦,如果想要进一步交流的欢迎私信我或者留言评论,如果需要数据来复线本文的话请私信。
这是一个简易的模拟实际交易流程的回测框架,所使用的行情数据是单档的tick成交数据
直接上图:

为了模拟实际交易流程,在属性中定义了账户信息、下单记录、成交记录和持仓信息这四个表来控制交易流程,交易流程如下所示:

为了实现调用者可以实现自己的交易逻辑,本框架预留了几个函数予以调用者能够继承类后在子类中重写以实现买入卖出信号的生成(check_sell()和check_buy())。
以下是Tickbacktest.py文件中的代码:
- # -*- coding=utf-8 -*-
- # --------------------------------
- # @Time : 2023年11月6日19:24:47
- # @Author : Noah Zhan
- # @File : tickbacktest.py
- # @Project : tickbacktest
- # @Function :tickbacktest类
- # --------------------------------
-
- from datetime import datetime
- import datetime as dt
- import logging
- #from typing_extensions import Self
- import numpy as np
- import pandas as pd
- import os
- from tqdm import tqdm
- from empyrical import max_drawdown,sharpe_ratio
-
-
- def timeformat(date)->datetime:
- """
- 将各种字符串格式的日期数据转化为datetime数据类型
- """
- if isinstance(date,datetime):
- return date
- elif isinstance(date,str):
- if("/" in date):
- return pd.to_datetime(datetime.strptime(date,'%Y/%m/%d %H:%M:%S'),format = '%Y-%m-%d %H:%M:%S')
- elif("-"in date):
- return pd.to_datetime(datetime.strptime(date,'%Y-%m-%d %H:%M:%S'),format = '%Y-%m-%d %H:%M:%S')
- else:
- return pd.to_datetime(datetime.strptime(date,'%Y%m%d %H:%M:%S'),format = '%Y-%m-%d %H:%M:%S')
-
-
- class tickbacktest(object):
-
- def __init__(self,name,start_dt,end_dt,now,total_assets = 1000*10000,commissions = 2,slip = 0.2,tick_data = dict(),context = dict()) -> None:
- '''
- 初始化tickbacktest类。
- '''
- self.start_dt = timeformat(start_dt)
- self.end_dt = timeformat(end_dt)
- self.now = timeformat(now)
- self.commissions = commissions
- if(tick_data):
- self.tick_data = tick_data
- tick_data['tick_all'] = tick_data['tick_all'][(tick_data['tick_all']['time_stamp']>=self.start_dt)&(tick_data['tick_all']['time_stamp']<=self.end_dt) ]
-
- #初始化accoun表
- self.account = pd.DataFrame(index = [name],columns = ['name','initial_cash','total_assets_lastday','total_assets','cash_useable','profit','total_mkt_cap'])
- self.account.loc[name,'name'] = name
- self.account.loc[name,'total_assets_lastday'] = total_assets
- self.account.loc[name,'total_assets'] = total_assets
- self.account.loc[name,'cash_useable'] = total_assets
- self.account.loc[name,'initial_cash'] = total_assets
- self.account.loc[name,'profit'] = 0
- self.account.loc[name,'total_mkt_cap'] = 0
- #初始化order表
- self.order = pd.DataFrame(columns=['order_id','stk_id','order_price','order_num','state','order_dt','direction','order_method','num_left'])
- #初始化deal表
- self.deal = pd.DataFrame(columns=['deal_id','stk_id','deal_price','deal_num','deal_dt','commission','direction','deal_amount'])
- #初始化portfolio表
- self.portfolio = pd.DataFrame(columns=['stk_id','port_num','intraday_buy_num','intraday_sell_num','port_amount','commission','hold_cost','price_now','dynamic_equity','hold_profit','realized_profit','state'])
- #初始化contex(用于存储可能用到的全局变量参数或资料)
- self.context = context
- context['name'] = name
- context['slip'] = 0.01*slip
-
- #创建文件夹保存相关文件
- self.mkdir(name)
- #配置日志输出
- logging.basicConfig(filename=name+'/log.txt',
- format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s-%(funcName)s',
- level=logging.INFO)
- self.firstday = True
-
- @staticmethod
- def mkdir(path) -> None:
- '''
- 创建文件夹,用于保存回测相关的数据和日志。
- '''
- folder = os.path.exists(path)
- if not folder: #判断是否存在文件夹如果不存在则创建为文件夹
- os.makedirs(path) #makedirs 创建文件时如果路径不存在会创建这个路径
- else:
- print("--- There is this folder! ---")
-
- @staticmethod
- def weighted_price(value,volume) -> float:
- '''
- 给定价格和数量,计算标的的平均价格。
- '''
-
- return np.average(value, weights=volume)
-
- def make_order(self,stk_id,order_price,order_num,direction,order_method) -> None:
- '''
- function:委托下单,将委托下单信息更新到order表中
- params:
- - stk_id: str,需要下单的标的id;
- - order_price: float,下单限价金额;
- - order_num: int,委托下单数量;
- - direction: str,候选值有'buy'和'sell',下单的方向;
- - order_method: 下单的类型,目前只支持'限价'单。
- return:
- -
- '''
- index = len(self.order)+1
- order_toadd = pd.DataFrame(index=[index],columns=['order_id','stk_id','order_price','order_num','state','order_dt','direction','order_method','num_left'])
- order_toadd.loc[index,'order_id'] = index
- order_toadd.loc[index,'stk_id'] = stk_id
- order_toadd['order_price'] = order_price
- order_toadd.loc[index,'order_num'] = order_num
- order_toadd.loc[index,'state'] = '未成'
- order_toadd.loc[index,'order_dt'] = self.now
- order_toadd.loc[index,'direction'] = direction
- order_toadd.loc[index,'order_method'] = order_method
- order_toadd.loc[index,'num_left'] = order_num
- self.order = pd.concat([self.order,order_toadd],axis=0)
-
- def clear_order(self,order_id) -> None:
- '''
- function:将未成的id为order_id单子撤掉,修改相应的单子的stata为'撤单'。
- params:
- - order_id: 要撤掉的单子的id。
- return:
- -
- '''
- self.order.loc[order_id,'state'] = '已撤'
- logging.info("info,撤单成功-id-"+str(order_id)+'-stkid-'+str(self.order.loc[order_id,'stk_id'])+"-time-"+str(self.order.loc[order_id,'order_dt'])+str(self.order.loc[order_id,'order_price'])+"-price-")
-
- def clear_allorder(self) -> None:
- '''
- function:将未成的单子全部撤掉,修改相应的单子的stata为'撤单'。
- params:
- - order_id: 要撤掉的单子的id。
- return:
- -
- '''
- order_ids = list(self.order[self.order['state']=='未成']['order_id'])
- for order_id in order_ids:
- self.clear_order(order_id)
-
- def excecute_deal(self,direction) -> None:
- '''
- function:将order表中未成的单子与当前tick的价格和数量进行比对,如果满足成交条件则成交,并修改account、portfolio、order、deal表的相关数据
- params:
- - direction:要执行的order的方向。
- return:
- -
- '''
- order_todo = self.order[(self.order['state']=='未成')&(self.order['direction']==direction)]
- for ind,data in order_todo.iterrows():
- dealable = self.tick_data['tick_now'][data['stk_id']]
- if(direction=='buy'):
- dealable = dealable[dealable['value']<=data['order_price']]
- elif(direction=='sell'):
- dealable = dealable[dealable['value']>=data['order_price']]
- # print('sell',dealable)
- if(not dealable.empty):
- dealable = pd.DataFrame(dealable)
- for inde,deal in dealable.iterrows():
- if(isinstance(self.order.loc[ind,'num_left'],pd.Series)):
- self.order = self.order.reset_index().drop_duplicates(subset=['index'], keep='last').set_index('index')
- if(((self.order.loc[ind,'num_left']>0) & (self.order.loc[ind,'state']=='未成'))):
- #判断可用金额是否足够,若不够,则自动调整下单量,以保证交易可执行,并输出日志
- if((direction=='buy')and(data['num_left'] * (self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume'])) + self.commissions) > self.account.loc[self.context['name'],'cash_useable'])):
- data['num_left'] = int(self.account.loc[self.context['name'],'cash_useable']/(self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume'])) + self.commissions))
- self.order.loc[inde,'num_left'] = data['num_left']
- logging.warning('warning,剩余现金不足,将order-'+str(ind)+"下单量自动调整为"+str(data['num_left']))
- #修改deal表
- index = len(self.deal)+1
- deal_toadd = pd.DataFrame(index=[index],columns=['deal_id','stk_id','deal_price','deal_num','deal_dt','commission','direction','deal_amount'])
- deal_toadd.loc[index,'deal_id'] = index
- deal_toadd.loc[index,'stk_id'] = data['stk_id']
- deal_toadd.loc[index,'deal_price'] = deal['value']
- deal_toadd.loc[index,'deal_num'] = min(deal['volume'],data['num_left'])
- deal_toadd.loc[index,'deal_dt'] = data['order_dt']
- deal_toadd.loc[index,'commission'] = deal_toadd.loc[index,'deal_num'] * self.commissions
- deal_toadd.loc[index,'direction'] = data['direction']
- deal_toadd.loc[index,'deal_amount'] = deal_toadd.loc[index,'deal_num'] * deal_toadd.loc[index,'deal_price']
- self.deal = pd.concat([self.deal,deal_toadd],axis=0)
-
- #修改order表
- self.order.loc[ind,'num_left'] = self.order.loc[ind,'num_left'] - deal_toadd.loc[index,'deal_num']
- if(self.order.loc[ind,'num_left']<=0): self.order.loc[ind,'state'] = '已成'
-
- #修改portfolio和account表
- if(data['direction']=='buy'):
- if (data['stk_id'] in list(self.portfolio['stk_id'])):
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_buy_num'] += deal_toadd.loc[index,'deal_num']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']=self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_buy_num']-self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_sell_num']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_amount'] += deal_toadd.loc[index,'deal_amount']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'commission'] += deal_toadd.loc[index,'commission']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_cost'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_amount'] + self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'commission']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'price_now'] = self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume']))
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'dynamic_equity'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num'] * self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'price_now']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_profit'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'dynamic_equity']-self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_cost']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'state'] = '持仓'
-
- self.account.loc[self.context['name'],'cash_useable'] -= (deal_toadd.loc[index,'deal_amount']+deal_toadd.loc[index,'commission'])
- self.account.loc[self.context['name'],'total_mkt_cap'] = sum(self.portfolio['dynamic_equity'])
- self.account.loc[self.context['name'],'total_assets'] =self.account.loc[self.context['name'],'cash_useable'] +self.account.loc[self.context['name'],'total_mkt_cap']
- self.account.loc[self.context['name'],'profit'] = self.account.loc[self.context['name'],'total_assets'] - self.account.loc[self.context['name'],'initial_cash']
- else:
- portfolio_toadd = pd.DataFrame(index=[len(self.portfolio)+1],columns=['stk_id','port_num','intraday_buy_num','intraday_sell_num','port_amount','commission','hold_cost','price_now','dynamic_equity','hold_profit','realized_profit','state'])
-
- portfolio_toadd.loc[len(self.portfolio)+1,'intraday_buy_num'] = deal_toadd.loc[index,'deal_num']
- portfolio_toadd.loc[len(self.portfolio)+1,'intraday_sell_num'] = 0
- portfolio_toadd.loc[len(self.portfolio)+1,'stk_id'] = data['stk_id']
- portfolio_toadd.loc[len(self.portfolio)+1,'port_num'] = deal_toadd.loc[index,'deal_num']
- portfolio_toadd.loc[len(self.portfolio)+1,'port_amount'] = deal_toadd.loc[index,'deal_amount']
- portfolio_toadd.loc[len(self.portfolio)+1,'commission'] = deal_toadd.loc[index,'commission']
- portfolio_toadd.loc[len(self.portfolio)+1,'hold_cost'] = deal_toadd.loc[index,'deal_amount'] + deal_toadd.loc[index,'commission']
- portfolio_toadd.loc[len(self.portfolio)+1,'price_now'] = self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume']))
- portfolio_toadd.loc[len(self.portfolio)+1,'dynamic_equity'] = deal_toadd.loc[index,'deal_num'] * self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume']))
- portfolio_toadd.loc[len(self.portfolio)+1,'hold_profit'] = portfolio_toadd.loc[len(self.portfolio)+1,'dynamic_equity'] - portfolio_toadd.loc[len(self.portfolio)+1,'hold_cost']
- portfolio_toadd.loc[len(self.portfolio)+1,'realized_profit'] = 0
- portfolio_toadd.loc[len(self.portfolio)+1,'state'] = '持仓'
- self.portfolio = pd.concat([self.portfolio,portfolio_toadd],axis=0)#合并
-
- self.account.loc[self.context['name'],'cash_useable'] -=(deal_toadd.loc[index,'deal_amount']+deal_toadd.loc[index,'commission'])
- self.account.loc[self.context['name'],'total_mkt_cap'] = sum(self.portfolio['dynamic_equity'])
- self.account.loc[self.context['name'],'total_assets'] = self.account.loc[self.context['name'],'cash_useable'] +self.account.loc[self.context['name'],'total_mkt_cap']
- self.account.loc[self.context['name'],'profit'] = self.account.loc[self.context['name'],'total_assets'] - self.account.loc[self.context['name'],'initial_cash']
-
-
- elif(data['direction']=='sell'):#卖出则持仓中必须有该标的资产
- if(self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']<=0):
- self.clear_order(order_id=data['order_id'])#持仓不足,撤卖单
- continue
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_sell_num'] +=deal_toadd.loc[index,'deal_num']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_buy_num'] - self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'intraday_sell_num']
- amount_temp = float(self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_amount'])
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_amount'] *=(self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']/(deal_toadd.loc[index,'deal_num'] +self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']))
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'commission'] -=deal_toadd.loc[index,'commission']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_cost'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_amount'] + self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'commission']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'price_now'] = self.weighted_price(list(self.tick_data['tick_now'][data['stk_id']]['value']),list(self.tick_data['tick_now'][data['stk_id']]['volume']))
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'dynamic_equity'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num'] * self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'price_now']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_profit'] = self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'dynamic_equity']-self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'hold_cost']
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'realized_profit'] += (deal_toadd.loc[index,'deal_num']*(self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'price_now']-self.commissions))-(deal_toadd.loc[index,'deal_num']/(deal_toadd.loc[index,'deal_num'] +self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']))*amount_temp
-
- if(self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'port_num']==0):
- self.portfolio.loc[self.portfolio[self.portfolio['stk_id'] == data['stk_id']].index[0],'state'] = '已卖'
-
- self.account.loc[self.context['name'],'cash_useable'] +=(deal_toadd.loc[index,'deal_amount']-deal_toadd.loc[index,'commission'])
- self.account.loc[self.context['name'],'total_mkt_cap'] = sum(self.portfolio['dynamic_equity'])
- self.account.loc[self.context['name'],'total_assets'] = self.account.loc[self.context['name'],'cash_useable'] +self.account.loc[self.context['name'],'total_mkt_cap']
- self.account.loc[self.context['name'],'profit'] = self.account.loc[self.context['name'],'total_assets'] - self.account.loc[self.context['name'],'initial_cash']
- logging.info('info,'+str(data['stk_id'])+"-"+str(deal_toadd.loc[index,'deal_num'])+"-"+str(data['direction'])+"-deal id:"+str(deal_toadd.loc[index,'deal_id']))
- # print('info,'+str(data['stk_id'])+"-"+str(deal_toadd.loc[index,'deal_num'])+"-"+str(data['direction'])+"-deal id:"+str(deal_toadd.loc[index,'deal_id']))
- return
-
-
- def every_tick(self) -> None:
- '''
- function:定义每一个新tick应该做一些什么,包括1)更新self.now,self.tick_data['tick_now'];2)更新portfolio表相关信息;3)更新account表相关信息;4)本tick要卖出的stk_id和数量list并下单、执行委托;5)本tick要买入的stk_id和数量list并下单、执行委托.
- params:
- -
- return:
- -
- '''
- #1)更新self.now,self.tick_data['tick_now']
- self.now = self.now + dt.timedelta(seconds=1)
- self.tick_data['tick_now'] = dict()
- for stk_id in set(self.tick_data['tick_all']['security_id']):
- self.tick_data['tick_now'][stk_id] = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']==self.now)&(self.tick_data['tick_all']['security_id']==stk_id)]
- i=1
- while(self.tick_data['tick_now'][stk_id].empty):
- self.tick_data['tick_now'][stk_id] = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']==(self.now+dt.timedelta(seconds=-1*i)))&(self.tick_data['tick_all']['security_id']==stk_id)]
- i += 1
- # print(self.tick_data['tick_now'])
- #2)更新portfolio表相关信息
- for ind,row in self.portfolio.iterrows():
- if(not self.tick_data['tick_now'][row['stk_id']].empty):
- self.portfolio.loc[ind,'price_now'] = self.weighted_price(list(self.tick_data['tick_now'][row['stk_id']]['value']),list(self.tick_data['tick_now'][row['stk_id']]['volume']))
- self.portfolio.loc[ind,'dynamic_equity'] = self.portfolio.loc[ind,'price_now'] * self.portfolio.loc[ind,'port_num']
- self.portfolio.loc[ind,'hold_profit'] = self.portfolio.loc[ind,'dynamic_equity'] - self.portfolio.loc[ind,'hold_cost']
-
- #3)更新account表相关信息
- self.account.loc[self.context['name'],'total_mkt_cap'] = sum(self.portfolio['dynamic_equity'])
- self.account.loc[self.context['name'],'total_assets'] = self.account.loc[self.context['name'],'total_mkt_cap'] + self.account.loc[self.context['name'],'cash_useable']
- self.account.loc[self.context['name'],'profit'] = self.account.loc[self.context['name'],'total_assets'] - self.account.loc[self.context['name'],'initial_cash']
-
- #4)本tick要卖出的stk_id和数量并下单、执行委托;
- sell_id_list,sell_num_list = self.check_sell()
- if(not(len(sell_id_list)==0 or len(sell_num_list)==0)):
- for sell_id,sell_num in zip(sell_id_list,sell_num_list):
- self.make_order(stk_id=sell_id, order_price = self.weighted_price(self.tick_data['tick_now'][sell_id]['value'],self.tick_data['tick_now'][sell_id]['volume'])*(1-self.context['slip']), order_num=sell_num, direction='sell', order_method='限价')
- self.excecute_deal(direction='sell')
-
- # 5)本tick要买入的stk_id和数量list并下单、执行委托;
- buy_id_list,buy_num_list = self.check_buy()
- if(not(len(buy_id_list)==0 or len(buy_num_list)==0)):
- for buy_id,buy_num in zip(buy_id_list,buy_num_list):
- self.make_order(stk_id=buy_id, order_price = self.weighted_price(self.tick_data['tick_now'][buy_id]['value'],self.tick_data['tick_now'][buy_id]['volume'])*(1+self.context['slip']), order_num=buy_num, direction='buy', order_method='限价')
- self.excecute_deal(direction='buy')
-
- #6)每一分钟,输出账户信息到文件中。
- if(self.now.second==0):
- account_toadd = self.account.copy()
- account_toadd['datetime'] = self.now
- account_toadd.set_index(['datetime'])
- if os.path.exists(self.context['name'] + '/account_tick.csv'):
- account_toadd.to_csv(self.context['name'] + '/account_tick.csv', mode='a', header=False)
- else:
- account_toadd.to_csv(self.context['name'] + '/account_tick.csv')
- return
-
- def check_sell(self) -> tuple:
- '''
- function: 用于生成当tick需要卖出的股票的列表和数量,同时这一方法是开放给使用时进行重写的,以实现调用者自己的策略逻辑;
- params:
- -
- return:
- - sell_id_list: 需要卖出的股票的列表;
- - sell_num_list: 需要卖出的股票对应的数量列表。
- '''
- sell_id_list,sell_num_list = [],[]
- return sell_id_list,sell_num_list
-
- def check_buy(self)-> tuple:
- '''
- function: 用于生成当tick需要买入的股票的列表和数量,同时这一方法是开放给使用时进行重写的,以实现调用者自己的策略逻辑;
- params:
- -
- return:
- - buy_id_list: 需要买入的股票的列表;
- - buy_num_list: 需要买入的股票对应的数量列表。
- '''
- buy_id_list,buy_num_list = [],[]
- return buy_id_list,buy_num_list
-
-
- def every_morning(self)-> None:
- '''
- function:定义每一个新的交易日应该做一些什么,包括1)更新self.now;2)保存上一日的持仓信息到表portfolio.csv中,并做相应更新;3)保存上一日的account信息到account.csv中,并做相应更新;4)进行使用者自行定义的早上要进行的操作。
- *注意
- params:
- -
- return:
- -
- '''
- #1)更新self.now
- self.context['last_dt'] = datetime(self.now.year,self.now.month,self.now.day,14,30,0)
- self.now = datetime(self.now.year,self.now.month,self.now.day+1,8,59,59)
- while(not datetime(self.now.year,self.now.month,self.now.day) in list(self.tick_data['tick_all']['time_stamp'].apply(lambda x:datetime(x.year,x.month,x.day)))):#要确保是交易日
- self.now = datetime(self.now.year,self.now.month,self.now.day+1,8,59,59)
- #2)保存上一日的持仓信息到表portfolio.csv中,并做相应更新;
- if(not self.portfolio.empty):
- portfolio_toadd = self.portfolio.copy()
- portfolio_toadd['date'] = self.context['last_dt']
- portfolio_toadd.set_index(['date','stk_id'])
- if os.path.exists(self.context['name'] + '/portfolio.csv'):
- portfolio_toadd.to_csv(self.context['name'] + '/portfolio.csv', mode='a', header=False)
- else:
- portfolio_toadd.to_csv(self.context['name'] + '/portfolio.csv')
- self.portfolio = self.portfolio[~(self.portfolio['state']=='已卖')]
-
- #3)保存上一日的account信息到account.csv中,并做相应更新
- if(not self.account.empty):
- account_toadd = self.account.copy()
- account_toadd['date'] = self.context['last_dt']
- account_toadd['return'] = account_toadd['total_assets'] / account_toadd['total_assets_lastday'] - 1
- account_toadd.set_index(['date'])
- if os.path.exists(self.context['name'] + '/account.csv'):
- account_toadd.to_csv(self.context['name'] + '/account.csv', mode='a', header=False)
- else:
- account_toadd.to_csv(self.context['name'] + '/account.csv')
- #更新self.account['total_assets_lastday']
- self.account['total_assets_lastday'] = account_toadd['total_assets']
-
- #4)进行使用者自行定义的早上要进行的操作
- self.dosomethin_in_morning()
- return
-
- def before_close(self)-> None:
- '''
- function:定义收盘前(收盘前一分钟)应该做一些什么,留给用户来重写,如果持仓不过夜,请在这里实现清仓,会在每日收盘前一分钟调用调用.
- params:
- -
- return:
- -
- '''
- self.clear_allorder()
-
- def dosomethin_in_morning(self)-> None:
- '''
- function:定义早上要做些什么,除了已经定义的every_morning内的其他操作之外,用户可以重写此方法在早上马上开盘时进行操作。
- params:
- -
- return:
- -
- '''
- pass
-
- def pipline(self)-> None:
- '''
- function: 在这个函数中进行回测流程的控制,调用这个函数以进行回测;
- params:
- -
- return:
- -
- '''
- logging.info('info,'+str(self.context['name'])+'回测开始')
- pbar = tqdm(len(list(set(self.tick_data['tick_all']['time_stamp']))))
- while(self.now<=self.end_dt):
- if(self.firstday):
- self.firstday = False
- else:
- self.every_morning()
- if(self.now>self.end_dt):break
- while(not((self.now.hour==14) and (self.now.minute==29) and (self.now.second==0))):
- self.every_tick()
- pbar.update(1)
- if(self.now>self.end_dt):break
- if(self.now>self.end_dt):break
- self.before_close()
- logging.info('info,'+str(self.context['name'])+'回测结束')
- return
-
- def summary(self)-> None:
- '''
- function: gross and net of commissions return, P&L, annualized volatility of the strategy, Sharpe, Maximum Drawdown and P-value.
- params:
- -
- return:
- -
- '''
- data = pd.read_csv(self.context['name'] + '/account.csv')
- if data.empty:
- print('请先进行回测。')
- else:
- data = data.reset_index()
- data = data.sort_values('date',ascending=True)
- #return
- return_ = data.iloc[-1,:]['total_assets'] / data.iloc[-1,:]['initial_cash'] - 1
- #P&L
- PandL = data.iloc[-1,:]['total_assets'] - data.iloc[-1,:]['initial_cash']
- #volatility
- volatility = data['return'].std()
- #Sharpe
- Sharpe = sharpe_ratio(data['return'], risk_free=0, period='daily')
- #Maximum Drawdown
- max_drawdown_ = max_drawdown(data['return'])
-
- from prettytable import PrettyTable
-
- x = PrettyTable()
- x.padding_width = 2
- x.add_column("回测", [self.context['name']])
- x.add_column("回测收益", [str(round(float(return_ * 100), 2)) + "%"])
- x.add_column("P&L", [str(round(float(PandL), 2))])
- x.add_column("年化波动率", [str(round(float(volatility), 2))])
- x.add_column("夏普", [str(round(Sharpe, 2))])
- x.add_column("最大回撤", [str(round(max_drawdown_*100, 2))+ "%"])
-
- print(x)
-
-
-
-
对上述代码进行实际应用,继承backtest方法并重写相关的方法,实现调用者自己的逻辑(check_sell,check_buy,before_close):
这里使用了简单的均线策略进行测试。
- import pandas as pd
- import numpy as np
- from tickbacktest import *
- import datetime as dt
- #读取数据
- data = pd.read_csv('Sample Tick Data.csv')
- data = data.groupby(['time_stamp','security_id','value'])['volume'].apply(lambda x:sum(x)).reset_index()
- data['time_stamp'] = pd.to_datetime(data['time_stamp'])
-
-
-
- #继承backtest方法并重写相关的方法,实现调用者自己的逻辑(check_sell,check_buy,before_close)
- class my_tickbacktest(tickbacktest):
- def __init__(self,stk_id,average_short_min,average_long_min,trade_period_restriction,
- name,start_dt,end_dt,now,total_assets = 1000*10000,commissions = 2,slip = 0.2,tick_data = dict(),context = dict()):
- super().__init__(name,start_dt,end_dt,now,total_assets,commissions,slip,tick_data,context)
- self.stk_id = stk_id
- self.context['signal_record'] = pd.DataFrame(columns=['time_stamp', 'contract', 'moving_average_1', 'moving_average_2', 'signal', 'return'])
- self.average_short_min = average_short_min
- self.average_long_min = average_long_min
- self.average_short_line = []
- self.average_long_line = []
- self.signal = 0
- self.trade_time = 0
- self.trade_period_restriction = trade_period_restriction
- self.istrading_buy = 0
- self.istrading_sell = 0
-
- def moving_average(self,stk_id):
- short_panel = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']>=self.now + dt.timedelta(seconds=-1*self.average_short_min*60))&(self.tick_data['tick_all']['time_stamp']<=self.now)]
- short_panel = short_panel[short_panel['security_id'] == stk_id]
- short_price = self.weighted_price(short_panel['value'],short_panel['volume'])
- long_panel = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']>=self.now + dt.timedelta(seconds=-1*self.average_long_min*60))&(self.tick_data['tick_all']['time_stamp']<=self.now)]
- long_panel = long_panel[long_panel['security_id'] == stk_id]
- long_price = self.weighted_price(long_panel['value'],long_panel['volume'])
- return short_price,long_price
-
- def check_sell(self):
- '''
- function:重写check_sell的功能,实现卖出选股
- params:
- -
- return:
- - sell_id_list: 需要卖出的股票的列表;
- - sell_num_list: 需要卖出的股票对应的数量列表。
- '''
- short_price,long_price = self.moving_average(self.stk_id)
- self.average_short_line.append(short_price)
- self.average_long_line.append(long_price)
-
- if(not (self.stk_id in list(self.portfolio['stk_id']))):
- return [],[]
- sell_id_list,sell_num_list = [],[]
-
- #生成信号
- if((len(self.average_long_line)>=2 and len(self.average_short_line)>=2)and(not self.istrading_sell)):
- if(self.average_long_line[-2]>self.average_short_line[-2] and self.average_long_line[-1]
1]): - #输出日志
- logging.info('info,卖出信号-'+str(self.stk_id)+'-short Average:'+str(self.average_long_line[-1])+'-short Average:'+str(self.average_long_line[-1]))
- #更新信号记录表
- temp_signal_record = pd.DataFrame(index=[self.now],columns=['time_stamp', 'contract', 'moving_average_1', 'moving_average_2', 'signal', 'return'])
- temp_signal_record.loc[self.now,'time_stamp'] = self.now
- temp_signal_record.loc[self.now,'contract'] = self.stk_id
- temp_signal_record.loc[self.now,'moving_average_1'] = self.average_short_line[-1]
- temp_signal_record.loc[self.now,'moving_average_2'] = self.average_long_line[-1]
- temp_signal_record.loc[self.now,'signal'] = -1
- temp_signal_record.loc[self.now,'return'] = self.account.loc[self.context['name'],'profit']/self.account.loc[self.context['name'],'initial_cash']
- self.context['signal_record'] = pd.concat([self.context['signal_record'],temp_signal_record],axis=0)
- if os.path.exists(self.context['name'] + '/signal_record.csv'):
- temp_signal_record.to_csv(self.context['name'] + '/signal_record.csv', mode='a', header=False)
- else:
- temp_signal_record.to_csv(self.context['name'] + '/signal_record.csv')
- #其他操作
- self.clear_allorder()
- self.signal = -1
- self.trade_time = 0
- self.istrading_buy = 0
- self.istrading_sell = 1
- elif((self.istrading_sell) and (self.trade_time < self.trade_period_restriction)):
- self.signal = -1
- else:
- self.signal = 0
- #根据信号生成sell_id_list,sell_num_list
- if(self.signal==-1 and self.istrading_sell==1):
- sell_num = int(self.portfolio[self.portfolio['stk_id']==self.stk_id]['port_num'] / (self.trade_period_restriction-self.trade_time))
- sell_id_list.append(self.stk_id)
- sell_num_list.append(sell_num)
- self.trade_time = self.trade_time + 1
- return sell_id_list,sell_num_list
-
- def check_buy(self):
- '''
- function:重写check_buy的功能,实现买入选股
- params:
- -
- return:
- - buy_id_list: 需要买入的股票的列表;
- - buy_num_list: 需要买入的股票对应的数量列表。
- '''
- buy_id_list,buy_num_list = [],[]
- #生成信号
- if((len(self.average_long_line)>=2 and len(self.average_short_line)>=2 and (not self.istrading_buy))):
- if(self.average_long_line[-2]
2] and self.average_long_line[-1]>self.average_short_line[-1]): - #更新日志
- logging.info('info,买入信号-'+str(self.stk_id)+'-short Average:'+str(self.average_long_line[-1])+'-short Average:'+str(self.average_long_line[-1]))
- #更新信号记录表
- temp_signal_record = pd.DataFrame(index=[self.now],columns=['time_stamp', 'contract', 'moving_average_1', 'moving_average_2', 'signal', 'return'])
- temp_signal_record.loc[self.now,'time_stamp'] = self.now
- temp_signal_record.loc[self.now,'contract'] = self.stk_id
- temp_signal_record.loc[self.now,'moving_average_1'] = self.average_short_line[-1]
- temp_signal_record.loc[self.now,'moving_average_2'] = self.average_long_line[-1]
- temp_signal_record.loc[self.now,'signal'] = 1
- temp_signal_record.loc[self.now,'return'] = self.account.loc[self.context['name'],'profit']/self.account.loc[self.context['name'],'initial_cash']
- self.context['signal_record'] = pd.concat([self.context['signal_record'],temp_signal_record],axis=0)
- if os.path.exists(self.context['name'] + '/signal_record.csv'):
- temp_signal_record.to_csv(self.context['name'] + '/signal_record.csv', mode='a', header=False)
- else:
- temp_signal_record.to_csv(self.context['name'] + '/signal_record.csv')
- #其他操作
- self.clear_allorder()
- self.signal = 1
- self.trade_time = 0
- self.istrading_buy = 1
- self.istrading_sell = 0
- elif((self.istrading_buy) and (self.trade_time < self.trade_period_restriction)):
- self.signal = 1
- else:
- self.signal = 0
- #根据信号生成buy_id_list,buy_num_list
- if(self.signal==1 and self.istrading_buy==1):
- try:
- buy_num = int(self.account.cash_useable / self.weighted_price(self.tick_data['tick_now'][self.stk_id]['value'],self.tick_data['tick_now'][self.stk_id]['volume']) / (self.trade_period_restriction-self.trade_time))
- except KeyError:
- temp_tick=pd.DataFrame()
- i = 1
- while(temp_tick.empty):
- temp_tick = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']==(self.now+dt.timedelta(seconds=-1*i)))&(self.tick_data['tick_all']['security_id']==self.stk_id)]
- i += 1
- buy_num = int(self.account.cash_useable / self.weighted_price(temp_tick['value'],temp_tick['volume']) / (self.trade_period_restriction-self.trade_time))
- if(not buy_num==0):
- buy_id_list.append(self.stk_id)
- buy_num_list.append(buy_num)
- self.trade_time = self.trade_time + 1
- return buy_id_list,buy_num_list
-
- def before_close(self)-> None:
- '''
- function:定义收盘前(收盘前一分钟)应该做一些什么,1)撤掉所有未成订单;2)以1分钟内的平均价格清仓所有持仓
- params:
- -
- return:
- -
- '''
- #1) 撤掉所有未成订单;
- self.clear_allorder()
- #2)以1分钟内的平均价格清仓所有持仓;(假设全成)
- #如果没有持仓则直接pass
- if(len(self.portfolio)<1):return
- else:
- #更新self.portfolio
- for ind,row in self.portfolio.iterrows():
- self.portfolio.loc[ind,'intraday_sell_num'] +=self.portfolio.loc[ind,'port_num']
- port_num_temp = self.portfolio.loc[ind,'port_num']
- self.portfolio.loc[ind,'port_num'] = 0
- self.portfolio.loc[ind,'port_amount'] = 0
- self.portfolio.loc[ind,'commission'] = 0
- hold_cost_temp = float(self.portfolio.loc[ind,'hold_cost'])
- self.portfolio.loc[ind,'hold_cost'] = 0
- self.portfolio.loc[ind,'price_now'] = '-'
- self.portfolio.loc[ind,'dynamic_equity'] = 0
- self.portfolio.loc[ind,'hold_profit'] = 0
- #计算一分钟内平均价
- temp_tick = self.tick_data['tick_all'][(self.tick_data['tick_all']['time_stamp']>=self.now)&(self.tick_data['tick_all']['time_stamp']<(self.now+dt.timedelta(minutes=1)))]
- temp_tick = temp_tick[temp_tick['security_id']==self.stk_id]
- value = temp_tick['value']
- volume = temp_tick['volume']
- self.portfolio.loc[ind,'realized_profit'] += (port_num_temp*(self.weighted_price(value,volume)-self.commissions) - hold_cost_temp)
- self.portfolio.loc[ind,'state'] = '已卖'
- #更新account
- self.account.loc[self.context['name'],'cash_useable'] += (port_num_temp*(self.weighted_price(value,volume)-self.commissions))
- self.account.loc[self.context['name'],'total_mkt_cap'] = sum(self.portfolio['dynamic_equity'])
- self.account.loc[self.context['name'],'total_assets'] =self.account.loc[self.context['name'],'cash_useable']+self.account.loc[self.context['name'],'total_mkt_cap']
- self.account.loc[self.context['name'],'profit'] = self.account.loc[self.context['name'],'total_assets'] - self.account.loc[self.context['name'],'initial_cash']
- #更新日志
-
-
- #进行回测
- tick_data = {'tick_all':data,
- 'tick_now':dict()}
- test = my_tickbacktest(stk_id='CLZ4',average_short_min=3,average_long_min=6,trade_period_restriction=60,
- name='CLZ4-3-6',start_dt='2014-11-03 9:00:00',end_dt='2014-11-03 11:00:00',now='2014-11-03 08:59:59',total_assets = 10*10000,commissions = 2,slip = 0.2,tick_data = tick_data,context = dict())
- test.pipline()
买入卖出信号预览:
pd.DataFrame([test.average_long_line,test.average_short_line]).T.plot()


回测结果:

回测结果保存到本地:
