量化投研配置本地数据库是量化投资领域中非常重要的一环。通过搭建本地数据库,可以方便地存储和管理股票市场的日频数据,为后续的因子计算和策略研究提供支持。
首先我们抛出一个问题:真的需要数据库吗?
前段时间跑了一个回测代码(这里用的是mysql):用海龟策略跑2011年到2022年12年的分钟线数据,源数据是csv文件,我们将其存储到数据库过程就非常耗时,从数据库读出来也耗费了很长时间,大概有20多分钟的样子,但是如果我们将csv文件转化为pickle二进制文件,从硬盘读取到内存只用2分钟,而且内存占用量也下降了2倍多。如果说用金融时序数据库的话速度可能会快一些。
缺点:IO操作非常耗时,占用硬盘空间大。
优点:方便数据管理添加维护、框架集成,用户操作方便。
为了简化数据库搭建的过程,我们采用Veighna框架自带的数据管理模块来作为本地数据库管理工具。
PostgreSQL是特性更为丰富的开源关系型数据库,只推荐熟手使用。相比于MySQL,其特点如下:
Windows上PostgreSQL安装配置教程:https://blog.csdn.net/my1324/article/details/103226622
我们首先进入到vnpy\trader\database.py这个python文件中
主要是BarOverview类和TickOverview类,这两个类封装了底层接口数据结构,提供数据视图,用来区分存储的数据。
BaseDatebase(ABC):是数据库基类,封装了常用的数据保存加载删除等抽象方法,由子类实现。
定义了8个抽象方法,继承了基类的子类都必须实现这些方法,从命名中我们也可以很清楚的知道每个方法的意思。
返回的数据类型,比如BarData,这种自定义的数据结构是在vnpy\trader\object.py文件中定义的。
def get_database() -> BaseDatabase:
""""""
# Return database object if already inited
global database
if database:
return database
# Read database related global setting
database_name: str = SETTINGS["database.name"]
module_name: str = f"vnpy_{database_name}"
# Try to import database module
try:
module: ModuleType = import_module(module_name)
except ModuleNotFoundError:
print(f"找不到数据库驱动{module_name},使用默认的SQLite数据库")
module: ModuleType = import_module("vnpy_sqlite")
# Create database object from module
database = module.Database()
return database
可以看到,Veighna是从SETTINGS配置文件里面读取设置的database名称,通过import_module()导入该数据库模块,所以我们需要提前下载好对应的vnpy_{}包,否则会默认使用vnpy_sqlite包作为项目数据库服务模块。
本文使用的是postgresql数据库,因此需要运行下面这行代码。
pip install vnpy_PostgreSQL Postgres
首先找到vnpy_postgresql文件夹,一般在你下载的虚拟环境envs\vnpy\Lib\site-packages\vnpy_postgresql下。
可以使用pycharm中File——Settings——Project:xxx——project Structure导入这个模块到你的项目结构下。
我们点击postgresql_database.py这个文件,看到它的结构如下。
其实Veighna就是使用peewee这个python的ORM框架对数据结构进行进一步封装,对数据库的操作映射成对类、对象的操作,避免了我们直接写在SQL语句。看到这边,相信你对Veighna的数据管理已经有了直观的理解,下面我们就进行具体的配置操作。
.vntrader和run.py一定是同级目录
程序加载setting里面的配置信息都是从这个文件夹里面的json文件获取。
def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
"""
Get path where trader is running in.
"""
cwd: Path = Path.cwd()
temp_path: Path = cwd.joinpath(temp_name)
# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
return cwd, temp_path
# Otherwise use home path of system.
home_path: Path = Path.home()
temp_path: Path = home_path.joinpath(temp_name)
# Create .vntrader folder under home path if not exist.
if not temp_path.exists():
temp_path.mkdir()
return home_path, temp_path
TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
sys.path.append(str(TRADER_DIR))
我们可以在vnpy\trader\utility.py里面找到这串代码,意思就是从当前工作目录获取.vntrader文件,添加到系统环境变量里面。如果当前环境没有这个文件,就会到Path.home()里面找,也就是你c盘用户目录C:\Users\Administrator
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_datamanager import DataManagerApp
def main():
"""Start VeighNa Trader"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_app(DataManagerApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()
第一次运行报错:
找不到数据库驱动vnpy_{},使用默认的SQLite数据库
没关系,这是因为我们在.vntrader里面没有配置要使用的数据库,我们将下面这个文件保存到.vntrader文件夹下面。
vt_setting.json
{
"font.family": "微软雅黑",
"font.size": 8,
"log.active": true,
"log.level": 50,
"log.console": true,
"log.file": true,
"email.server": "smtp.qq.com",
"email.port": 465,
"email.username": "",
"email.password": "",
"email.sender": "",
"email.receiver": "",
"datafeed.name": "",
"datafeed.username": "",
"datafeed.password": "",
"database.timezone": "Asia/Shanghai",
"database.name": "postgresql",
"database.database": "vnpy",
"database.host": "localhost",
"database.port": 5432,
"database.user":"postgres",
"database.password": "123456"
}
在SQL Shell(psql)下创建数据库
重启run.py文件
pip install vnpy_binance
vnpy_binance下面有三个接口,使用时需要注意本接口:
from vnpy_datamanager import DataManagerApp
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
from vnpy_binance import (
BinanceSpotGateway, # 现货交易
BinanceUsdtGateway, # 合约交易
BinanceInverseGateway # 用于对接币安反向合约的交易接口
)
def main():
"""Start VeighNa Trader"""
qapp = create_qapp()
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_app(DataManagerApp)
main_engine.add_gateway(BinanceSpotGateway)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()
qapp.exec()
if __name__ == "__main__":
main()
记住自己的API和Secret key 并且不要泄露出去,否则会有资金安全风险。
首先 pip install requests这个包
运行上面代码后,发现报错如下:
在trader下的constant的Exchange类下添加一个常量
当我们看到这个界面后说明配置完成!
from datetime import datetime, timedelta
from typing import List
from vnpy_binance import BinanceSpotGateway,BinanceUsdtGateway
from vnpy.event import EventEngine, Event
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.event import EVENT_LOG
from vnpy.trader.object import HistoryRequest
from vnpy.trader.utility import load_json
setting = {
"key": "",
"secret": "",
"服务器": "REAL",
"代理地址": "",
"代理端口":
}
"""
一键下载币安现货的行情数据
"""
# 用于初始化引擎和数据库
class BinanceData:
def __init__(self, EventEngine):
self.binance = BinanceUsdtGateway(EventEngine, gateway_name="BINANCE_USDT")
self.binance.connect(setting=setting)
self.database = get_database()
self.proxies = {'https': '127.0.0.1:22307'}
def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判断下载的数据是否重复出现在数据库中"""
bar_overview: List[BarOverview] = self.database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
if self.is_symbol_existed(symbol, interval, start, end):
return
# 获取新数据
print(f"{datetime.now()}正在获取--{symbol}--k线数据")
req = HistoryRequest(
symbol=symbol,
exchange=Exchange.BINANCE,
start=start,
end=end,
interval=interval
)
bars = self.binance.query_history(req)
self.database.save_bar_data(bars)
print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")
if __name__ == '__main__':
event = EventEngine()
binance = BinanceData(event)
event.start()
event.register(EVENT_LOG, lambda event: print(event.data))
symbols = ["BLURUSDT"]
for symbol in symbols:
binance.data_to_db(symbol, interval=Interval.HOUR, start=datetime(2015, 10, 5),
end=datetime.now())
event.stop()
from datetime import datetime, timedelta
from typing import List
from vnpy_binance import BinanceSpotGateway
from vnpy.event import EventEngine, Event
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.event import EVENT_LOG
from vnpy.trader.object import HistoryRequest
from vnpy.trader.utility import load_json
setting = {
"key": "",
"secret": "",
"服务器": "REAL",
"代理地址": "",
"代理端口":
}
"""
一键下载币安现货的行情数据
"""
# 用于初始化引擎和数据库
class BinanceData:
def __init__(self, EventEngine):
self.binance = BinanceSpotGateway(EventEngine, gateway_name="BINANCE_SPOT")
self.binance.connect(setting=setting)
self.database = get_database()
self.proxies = {'https': '127.0.0.1:22307'}
def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判断下载的数据是否重复出现在数据库中"""
bar_overview: List[BarOverview] = self.database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
if self.is_symbol_existed(symbol, interval, start, end):
return
# 获取新数据
print(f"{datetime.now()}正在获取--{symbol}--k线数据")
req = HistoryRequest(
symbol=symbol,
exchange=Exchange.BINANCE,
start=start,
end=end,
interval=interval
)
bars = self.binance.query_history(req)
self.database.save_bar_data(bars)
print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")
if __name__ == '__main__':
event = EventEngine()
binance = BinanceData(event)
event.start()
event.register(EVENT_LOG, lambda event: print(event.data))
symbols = ["blurusdt"]
for symbol in symbols:
binance.data_to_db(symbol, interval=Interval.MINUTE, start=datetime(2015, 10, 5),
end=datetime.now()-timedelta(days=1))
event.stop()
from datetime import datetime
from typing import List
from vnpy_rqdata.rqdata_datafeed import RqdataDatafeed
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.database import get_database, BarOverview
from vnpy.trader.object import HistoryRequest
import rqdatac
# 初始化
rqdataDatafeed = RqdataDatafeed()
rqdataDatafeed.init()
database = get_database()
def query_traffic():
# 获取流量使用情况
traffic_info = rqdatac.user.get_quota()
print(f"使用流量:{traffic_info['bytes_used'] / (2 ** 20)} Mb \n"
f"剩余流量:{(traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)} Mb ")
return (traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)
def is_symbol_existed(database, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
"""判断下载的数据是否重复出现在数据库中"""
bar_overview: List[BarOverview] = database.get_bar_overview()
for bar_view in bar_overview:
if bar_view.symbol != symbol:
continue
if bar_view.symbol == symbol and bar_view.interval != interval:
continue
if bar_view.symbol == symbol and bar_view.interval == interval:
if end < bar_view.start or start > bar_view.end:
continue
else:
return True
return False
if __name__ == '__main__':
vt_symbols = [
"i99.DCE", "j99.DCE", "jm99.DCE", "rb99.SHFE", "hc99.SHFE"
]
for vt_symbol in vt_symbols:
symbol, exchange = vt_symbol.split(".")
start = datetime(2010, 1, 1)
end = datetime.now()
interval = Interval.DAILY
if is_symbol_existed(database, symbol, interval, start, end):
print(f"{symbol}的{Interval.MINUTE}数据重复出现在数据库中")
print("-" * 40)
continue
if query_traffic() < 30:
print("-" * 40)
break
historyReq = HistoryRequest(
symbol=symbol,
exchange=Exchange(exchange),
start=start,
end=end,
interval=interval
)
bars = rqdataDatafeed.query_bar_history(historyReq)
database.save_bar_data(bars)
print(f"{vt_symbol}下载完成!")
print()
为了后面方便投研分析以及数据备份的需要,我也给出了数据转化成csv文件保存的代码。
import datetime
import os.path
from typing import List
from vnpy.trader.constant import Exchange, Interval
from vnpy_datamanager import ManagerEngine
from vnpy.trader.database import BarOverview
from vnpy.trader.engine import MainEngine, EventEngine
def output_data_to_csv(engine, exchange, interval):
bar_view:List[BarOverview] = engine.get_bar_overview()
for bar_ in bar_view:
bar_dict = bar_.__dict__.get("__data__")
if bar_dict.get("interval") == interval and bar_dict.get(
"exchange") == exchange:
print(
f"--------正在保存{bar_dict.get('symbol')}这个文件------------")
# 保存成csv文件
filepath = rf"D:\market\feature\{interval.value}"
if not os.path.exists(filepath):
os.makedirs(filepath)
filepath_name = os.path.join(filepath,
f"{bar_dict.get('symbol')}.{bar_dict.get('exchange').value}.csv")
flag = manage_engine.output_data_to_csv(file_path=filepath_name,
exchange=exchange,
symbol=bar_dict.get(
"symbol"),
interval=interval,
start=bar_dict.get(
"start"),
end=bar_dict.get("end"))
if flag:
print(f"{datetime.datetime.now()},{filepath_name}文件保存成功!")
else:
print(f"{datetime.datetime.now()},{filepath_name}文件保存失败!")
if __name__ == '__main__':
main_engine = MainEngine()
event_engine = EventEngine()
manage_engine = ManagerEngine(main_engine, event_engine)
output_data_to_csv(manage_engine,
exchange=Exchange.SHFE, interval=Interval.DAILY)
main_engine.close()
文章介绍了在量化投资中配置本地数据库的重要性,并使用Veighna框架和PostgreSQL数据库来搭建和配置本地数据库。首先讨论了是否真的需要数据库以及数据库的优缺点。介绍了Veighna框架和PostgreSQL数据库的特点和安装配置方法。接下来详细介绍了Veighna框架中的数据库架构和相关类的功能。文章最后给出了具体的配置步骤,包括创建.vntrader文件夹和配置vt_setting.json文件,并演示了如何使用run.py运行程序。提供了配置本地数据库的详细指南,帮助量化投资者方便地存储和管理市场数据,为后续的因子计算和策略研究提供支持。