• 【VeighNa】开始量化交易——第二章:PostgreSQL数据库配置


    量化投研配置本地数据库是量化投资领域中非常重要的一环。通过搭建本地数据库,可以方便地存储和管理股票市场的日频数据,为后续的因子计算和策略研究提供支持。

    1. 数据库搭建

    首先我们抛出一个问题:真的需要数据库吗?

    前段时间跑了一个回测代码(这里用的是mysql):用海龟策略跑2011年到2022年12年的分钟线数据,源数据是csv文件,我们将其存储到数据库过程就非常耗时,从数据库读出来也耗费了很长时间,大概有20多分钟的样子,但是如果我们将csv文件转化为pickle二进制文件,从硬盘读取到内存只用2分钟,而且内存占用量也下降了2倍多。如果说用金融时序数据库的话速度可能会快一些。

    缺点:IO操作非常耗时,占用硬盘空间大。
    优点:方便数据管理添加维护、框架集成,用户操作方便。

    为了简化数据库搭建的过程,我们采用Veighna框架自带的数据管理模块来作为本地数据库管理工具。


    2. PostgreSQL

    PostgreSQL是特性更为丰富的开源关系型数据库,只推荐熟手使用。相比于MySQL,其特点如下:

    • 采用多进程结构;
    • 支持通过扩展插件来新增功能。

    Windows上PostgreSQL安装配置教程:https://blog.csdn.net/my1324/article/details/103226622

    3. Veighna数据库架构介绍

    我们首先进入到vnpy\trader\database.py这个python文件中
    在这里插入图片描述

    3.1 数据结构类

    主要是BarOverview类和TickOverview类,这两个类封装了底层接口数据结构,提供数据视图,用来区分存储的数据。
    在这里插入图片描述

    3.2 BaseDatabase(ABC)数据库基类

    BaseDatebase(ABC):是数据库基类,封装了常用的数据保存加载删除等抽象方法,由子类实现。

    定义了8个抽象方法,继承了基类的子类都必须实现这些方法,从命名中我们也可以很清楚的知道每个方法的意思。
    在这里插入图片描述

    返回的数据类型,比如BarData,这种自定义的数据结构是在vnpy\trader\object.py文件中定义的。
    在这里插入图片描述

    3.3 get_database()获取数据库服务模块
    def get_database() -> BaseDatabase:
        """"""
        # Return database object if already inited
        global database
        if database:
            return database
    
        # Read database related global setting
        database_name: str = SETTINGS["database.name"]
        module_name: str = f"vnpy_{database_name}"
    
        # Try to import database module
        try:
            module: ModuleType = import_module(module_name)
        except ModuleNotFoundError:
            print(f"找不到数据库驱动{module_name},使用默认的SQLite数据库")
            module: ModuleType = import_module("vnpy_sqlite")
    
        # Create database object from module
        database = module.Database()
        return database
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    可以看到,Veighna是从SETTINGS配置文件里面读取设置的database名称,通过import_module()导入该数据库模块,所以我们需要提前下载好对应的vnpy_{}包,否则会默认使用vnpy_sqlite包作为项目数据库服务模块。

    本文使用的是postgresql数据库,因此需要运行下面这行代码。

    pip install vnpy_PostgreSQL Postgres
    
    • 1
    3.4 vnpy_postgresql数据库实例模块

    首先找到vnpy_postgresql文件夹,一般在你下载的虚拟环境envs\vnpy\Lib\site-packages\vnpy_postgresql下。
    可以使用pycharm中File——Settings——Project:xxx——project Structure导入这个模块到你的项目结构下。
    在这里插入图片描述
    我们点击postgresql_database.py这个文件,看到它的结构如下。
    在这里插入图片描述
    其实Veighna就是使用peewee这个python的ORM框架对数据结构进行进一步封装,对数据库的操作映射成对类、对象的操作,避免了我们直接写在SQL语句。看到这边,相信你对Veighna的数据管理已经有了直观的理解,下面我们就进行具体的配置操作。

    4.具体配置

    4.1 创建.vntrader文件

    .vntrader和run.py一定是同级目录
    在这里插入图片描述
    程序加载setting里面的配置信息都是从这个文件夹里面的json文件获取。

    def _get_trader_dir(temp_name: str) -> Tuple[Path, Path]:
        """
        Get path where trader is running in.
        """
        cwd: Path = Path.cwd()
        temp_path: Path = cwd.joinpath(temp_name)
    
        # If .vntrader folder exists in current working directory,
        # then use it as trader running path.
        if temp_path.exists():
            return cwd, temp_path
    
        # Otherwise use home path of system.
        home_path: Path = Path.home()
        temp_path: Path = home_path.joinpath(temp_name)
    
        # Create .vntrader folder under home path if not exist.
        if not temp_path.exists():
            temp_path.mkdir()
    
        return home_path, temp_path
    
    
    TRADER_DIR, TEMP_DIR = _get_trader_dir(".vntrader")
    sys.path.append(str(TRADER_DIR))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    我们可以在vnpy\trader\utility.py里面找到这串代码,意思就是从当前工作目录获取.vntrader文件,添加到系统环境变量里面。如果当前环境没有这个文件,就会到Path.home()里面找,也就是你c盘用户目录C:\Users\Administrator

    4.2 运行下面代码run.py
    from vnpy.event import EventEngine
    from vnpy.trader.engine import MainEngine
    from vnpy.trader.ui import MainWindow, create_qapp
    from vnpy_datamanager import DataManagerApp
    
    
    def main():
        """Start VeighNa Trader"""
        qapp = create_qapp()
    
        event_engine = EventEngine()
        main_engine = MainEngine(event_engine)
        main_engine.add_app(DataManagerApp)
    
        main_window = MainWindow(main_engine, event_engine)
        main_window.showMaximized()
    
        qapp.exec()
    
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    第一次运行报错:

    找不到数据库驱动vnpy_{},使用默认的SQLite数据库
    没关系,这是因为我们在.vntrader里面没有配置要使用的数据库,我们将下面这个文件保存到.vntrader文件夹下面。

    vt_setting.json

    {
        "font.family": "微软雅黑",
        "font.size": 8,
        "log.active": true,
        "log.level": 50,
        "log.console": true,
        "log.file": true,
        "email.server": "smtp.qq.com",
        "email.port": 465,
        "email.username": "",
        "email.password": "",
        "email.sender": "",
        "email.receiver": "",
        "datafeed.name": "",
        "datafeed.username": "",
        "datafeed.password": "",
        "database.timezone": "Asia/Shanghai",
        "database.name": "postgresql",
        "database.database": "vnpy",
        "database.host": "localhost",
        "database.port": 5432,
        "database.user":"postgres",
        "database.password": "123456"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在SQL Shell(psql)下创建数据库

    在这里插入图片描述
    重启run.py文件

    5.数据下载

    5.1 环境配置
    5.1.1 安装vnpy_binance
    pip install vnpy_binance
    
    • 1

    vnpy_binance下面有三个接口,使用时需要注意本接口:

    • 只支持全仓保证金模式
    • 只支持单向持仓模式
    from vnpy_datamanager import DataManagerApp
    
    from vnpy.event import EventEngine
    from vnpy.trader.engine import MainEngine
    from vnpy.trader.ui import MainWindow, create_qapp
    from vnpy_binance import (
        BinanceSpotGateway, # 现货交易
        BinanceUsdtGateway,	# 合约交易
        BinanceInverseGateway # 用于对接币安反向合约的交易接口
    )
    
    
    def main():
        """Start VeighNa Trader"""
        qapp = create_qapp()
    
        event_engine = EventEngine()
        main_engine = MainEngine(event_engine)
        main_engine.add_app(DataManagerApp)
        main_engine.add_gateway(BinanceSpotGateway)
        main_window = MainWindow(main_engine, event_engine)
        main_window.showMaximized()
    
        qapp.exec()
    
    
    if __name__ == "__main__":
        main()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    5.1.1 API配置

    在这里插入图片描述
    记住自己的API和Secret key 并且不要泄露出去,否则会有资金安全风险。

    5.1.3 启动项目

    首先 pip install requests这个包

    运行上面代码后,发现报错如下:在这里插入图片描述
    在trader下的constant的Exchange类下添加一个常量
    在这里插入图片描述

    当我们看到这个界面后说明配置完成!
    在这里插入图片描述

    5.2 下载币安合约数据
    from datetime import datetime, timedelta
    from typing import List
    
    from vnpy_binance import BinanceSpotGateway,BinanceUsdtGateway
    from vnpy.event import EventEngine, Event
    from vnpy.trader.constant import Exchange, Interval
    from vnpy.trader.database import get_database, BarOverview
    from vnpy.trader.event import EVENT_LOG
    from vnpy.trader.object import HistoryRequest
    from vnpy.trader.utility import load_json
    
    setting = {
        "key": "",
        "secret": "",
        "服务器": "REAL",
        "代理地址": "",
        "代理端口": 
    }
    """
    一键下载币安现货的行情数据
    """
    
    
    # 用于初始化引擎和数据库
    class BinanceData:
        def __init__(self, EventEngine):
            self.binance = BinanceUsdtGateway(EventEngine, gateway_name="BINANCE_USDT")
            self.binance.connect(setting=setting)
            self.database = get_database()
            self.proxies = {'https': '127.0.0.1:22307'}
    
        def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
            """判断下载的数据是否重复出现在数据库中"""
            bar_overview: List[BarOverview] = self.database.get_bar_overview()
            for bar_view in bar_overview:
                if bar_view.symbol != symbol:
                    continue
                if bar_view.symbol == symbol and bar_view.interval != interval:
                    continue
                if bar_view.symbol == symbol and bar_view.interval == interval:
                    if end < bar_view.start or start > bar_view.end:
                        continue
                    else:
                        return True
            return False
    
        def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
            if self.is_symbol_existed(symbol, interval, start, end):
                return
            # 获取新数据
            print(f"{datetime.now()}正在获取--{symbol}--k线数据")
            req = HistoryRequest(
                symbol=symbol,
                exchange=Exchange.BINANCE,
                start=start,
                end=end,
                interval=interval
            )
    
            bars = self.binance.query_history(req)
    
            self.database.save_bar_data(bars)
            print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")
    
    
    if __name__ == '__main__':
        event = EventEngine()
        binance = BinanceData(event)
        event.start()
        event.register(EVENT_LOG, lambda event: print(event.data))
        symbols = ["BLURUSDT"]
        for symbol in symbols:
            binance.data_to_db(symbol, interval=Interval.HOUR, start=datetime(2015, 10, 5),
                               end=datetime.now())
        event.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    5.3 下载币安现货数据
    from datetime import datetime, timedelta
    from typing import List
    
    from vnpy_binance import BinanceSpotGateway
    from vnpy.event import EventEngine, Event
    from vnpy.trader.constant import Exchange, Interval
    from vnpy.trader.database import get_database, BarOverview
    from vnpy.trader.event import EVENT_LOG
    from vnpy.trader.object import HistoryRequest
    from vnpy.trader.utility import load_json
    
    setting = {
        "key": "",
        "secret": "",
        "服务器": "REAL",
        "代理地址": "",
        "代理端口": 
    }
    """
    一键下载币安现货的行情数据
    """
    
    
    # 用于初始化引擎和数据库
    class BinanceData:
        def __init__(self, EventEngine):
            self.binance = BinanceSpotGateway(EventEngine, gateway_name="BINANCE_SPOT")
            self.binance.connect(setting=setting)
            self.database = get_database()
            self.proxies = {'https': '127.0.0.1:22307'}
    
        def is_symbol_existed(self, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
            """判断下载的数据是否重复出现在数据库中"""
            bar_overview: List[BarOverview] = self.database.get_bar_overview()
            for bar_view in bar_overview:
                if bar_view.symbol != symbol:
                    continue
                if bar_view.symbol == symbol and bar_view.interval != interval:
                    continue
                if bar_view.symbol == symbol and bar_view.interval == interval:
                    if end < bar_view.start or start > bar_view.end:
                        continue
                    else:
                        return True
            return False
    
        def data_to_db(self, symbol: str, interval: Interval, start: datetime, end: datetime):
            if self.is_symbol_existed(symbol, interval, start, end):
                return
            # 获取新数据
            print(f"{datetime.now()}正在获取--{symbol}--k线数据")
            req = HistoryRequest(
                symbol=symbol,
                exchange=Exchange.BINANCE,
                start=start,
                end=end,
                interval=interval
            )
    
            bars = self.binance.query_history(req)
    
            self.database.save_bar_data(bars)
            print(f"{datetime.now()}--{symbol}--k线数据已被存入数据库!")
    
    
    if __name__ == '__main__':
        event = EventEngine()
        binance = BinanceData(event)
        event.start()
        event.register(EVENT_LOG, lambda event: print(event.data))
        symbols = ["blurusdt"]
        for symbol in symbols:
            binance.data_to_db(symbol, interval=Interval.MINUTE, start=datetime(2015, 10, 5),
                               end=datetime.now()-timedelta(days=1))
        event.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    5.4 下载期货行情数据
    from datetime import datetime
    from typing import List
    
    from vnpy_rqdata.rqdata_datafeed import RqdataDatafeed
    from vnpy.trader.constant import Exchange, Interval
    from vnpy.trader.database import get_database, BarOverview
    from vnpy.trader.object import HistoryRequest
    import rqdatac
    
    # 初始化
    rqdataDatafeed = RqdataDatafeed()
    rqdataDatafeed.init()
    database = get_database()
    
    
    def query_traffic():
        # 获取流量使用情况
        traffic_info = rqdatac.user.get_quota()
    
        print(f"使用流量:{traffic_info['bytes_used'] / (2 ** 20)} Mb \n"
              f"剩余流量:{(traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)} Mb ")
        return (traffic_info['bytes_limit'] - traffic_info['bytes_used']) / (2 ** 20)
    
    
    def is_symbol_existed(database, symbol: str, interval: Interval, start: datetime, end: datetime) -> bool:
        """判断下载的数据是否重复出现在数据库中"""
        bar_overview: List[BarOverview] = database.get_bar_overview()
        for bar_view in bar_overview:
            if bar_view.symbol != symbol:
                continue
            if bar_view.symbol == symbol and bar_view.interval != interval:
                continue
            if bar_view.symbol == symbol and bar_view.interval == interval:
                if end < bar_view.start or start > bar_view.end:
                    continue
                else:
                    return True
        return False
    
    
    if __name__ == '__main__':
        vt_symbols = [
            "i99.DCE", "j99.DCE", "jm99.DCE", "rb99.SHFE", "hc99.SHFE"
        ]
        for vt_symbol in vt_symbols:
            symbol, exchange = vt_symbol.split(".")
            start = datetime(2010, 1, 1)
            end = datetime.now()
            interval = Interval.DAILY
    
            if is_symbol_existed(database, symbol, interval, start, end):
                print(f"{symbol}{Interval.MINUTE}数据重复出现在数据库中")
                print("-" * 40)
                continue
    
            if query_traffic() < 30:
                print("-" * 40)
                break
    
            historyReq = HistoryRequest(
                symbol=symbol,
                exchange=Exchange(exchange),
                start=start,
                end=end,
                interval=interval
            )
    
            bars = rqdataDatafeed.query_bar_history(historyReq)
            database.save_bar_data(bars)
            print(f"{vt_symbol}下载完成!")
            print()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    6.保存成本地csv文件

    为了后面方便投研分析以及数据备份的需要,我也给出了数据转化成csv文件保存的代码。

    import datetime
    import os.path
    from typing import List
    
    from vnpy.trader.constant import Exchange, Interval
    from vnpy_datamanager import ManagerEngine
    
    from vnpy.trader.database import BarOverview
    from vnpy.trader.engine import MainEngine, EventEngine
    
    
    def output_data_to_csv(engine, exchange, interval):
        bar_view:List[BarOverview] = engine.get_bar_overview()
        for bar_ in bar_view:
            bar_dict = bar_.__dict__.get("__data__")
            if bar_dict.get("interval") == interval and bar_dict.get(
                    "exchange") == exchange:
                print(
                    f"--------正在保存{bar_dict.get('symbol')}这个文件------------")
                # 保存成csv文件
                filepath = rf"D:\market\feature\{interval.value}"
    
                if not os.path.exists(filepath):
                    os.makedirs(filepath)
    
                filepath_name = os.path.join(filepath,
                                             f"{bar_dict.get('symbol')}.{bar_dict.get('exchange').value}.csv")
                flag = manage_engine.output_data_to_csv(file_path=filepath_name,
                                                        exchange=exchange,
                                                        symbol=bar_dict.get(
                                                            "symbol"),
                                                        interval=interval,
                                                        start=bar_dict.get(
                                                            "start"),
                                                        end=bar_dict.get("end"))
                if flag:
                    print(f"{datetime.datetime.now()},{filepath_name}文件保存成功!")
                else:
                    print(f"{datetime.datetime.now()},{filepath_name}文件保存失败!")
    
    
    if __name__ == '__main__':
        main_engine = MainEngine()
        event_engine = EventEngine()
        manage_engine = ManagerEngine(main_engine, event_engine)
        output_data_to_csv(manage_engine,
                           exchange=Exchange.SHFE, interval=Interval.DAILY)
        main_engine.close()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在这里插入图片描述

    在这里插入图片描述

    7. 总结

    文章介绍了在量化投资中配置本地数据库的重要性,并使用Veighna框架和PostgreSQL数据库来搭建和配置本地数据库。首先讨论了是否真的需要数据库以及数据库的优缺点。介绍了Veighna框架和PostgreSQL数据库的特点和安装配置方法。接下来详细介绍了Veighna框架中的数据库架构和相关类的功能。文章最后给出了具体的配置步骤,包括创建.vntrader文件夹和配置vt_setting.json文件,并演示了如何使用run.py运行程序。提供了配置本地数据库的详细指南,帮助量化投资者方便地存储和管理市场数据,为后续的因子计算和策略研究提供支持。

  • 相关阅读:
    $.ajax异步请求总结
    k8s管理工具kubectl详解(一)
    R语言的极值统计学、分位数回归、机器学习方法
    【iOS】——分类、扩展和关联对象
    [Gym 102423]-Elven Efficiency | 思维
    Linux常用命令:文件及磁盘
    el-table中的toggleRowSelection 选中目标没有选中效果
    第五章 树和二叉树(下)【哈夫曼树、并查集】
    设计模式(1)-设计模式前置基础知识
    VS中展开和折叠代码
  • 原文地址:https://blog.csdn.net/m0_58598240/article/details/127700332