• fastapi+mongo+qlib:体系化构建AI量化投研平台


    百天计划之第34篇,关于“AI量化投资研究平台”建设。

    从今天开始要开始一条主线——就是开始搭建整个投研平台

    如果说8月开始是知识点的梳理,一些基础技术的准备(以qlib和机器学习为核心),9月开始重点是“以解决真正的投资决策问题”为目标

    切入点是可转债

    当下A股市场持续下行,在市场总体下行的时候,讲真的策略不是最重要的。

    对于新手而言,可转债和优秀指数基金,永远是最好的标的,会让你感受到,投资其实未必有那么大的风险(提示:投资有风险)。

    01 架构

    如下是一个极简的AI投研架构,极简意味着非核心部件都采用最简单的组件。

    定时任务没有使用dagster或者airflow,因为这意味着需要引入一套全新的技术栈,需要人力云独立运维,我们使用fastapi内置的异步task。

    持久化的中间件选mongo为主,并兼容qlib数据库,便于查询与并行计算。

    后端使用fastapi——由于要深度使用mongo,则弃用django,而在flask与fastapi之间,考虑到我们使用api的场景偏多,考虑对api更加友好的fastapi框架,性能也会更好。

    02 价值点

    重点之核心在于“为谁解决什么问题”。

    “半个成品远胜于一个半成品”。

    目标远大很好,但饭要一口口吃,月亮虽好,六便士也很重要。

    投研平台价值点就是辅助用户投资决策效率,提升胜率

    面对可转债400多支,如何快速纵览数据历史,有什么规律,方便我们去分析。内置量化好的指标,对全市场进行透视,挖掘,聚类,排序。在此基础上定制一些策略,回测并归因等等。

    总之,面对实用编程。

    03 后端基础框架fastapi

    fastapi是一个现代,异步python框架,官网如下:

    https://fastapi.tiangolo.com/zh/

    安装非常轻量:

    pip install fastapi

    pip install "uvicorn[standard]" ——安装运行的服务器。

    hello FastApi:

    from typing import Union
    from fastapi import FastAPI
    
    app = FastAPI()
    
    
    @app.get("/")
    def read_root():
        return {"Hello": "基于Fastapi的AI量化投研平台!"}
    
    

    是不是非常简单,大家可能会想起Flask,是的。

    Fastapi还内置精美的swagger UI,

    让我们调试接口非常方便,这也是我优选Fastapi框架的原因之一。

    04 基于FastApi的定时任务

    传统的定时任务一般是apscheduler,它功能强大,但说实话,并不好用。

    我们的定时任务其实比较简单,就是定时去做一些事情,不需要特别精确,一般就是收盘后要去更新数据,然后做一些指标的预计算,或者数据同步之类的操作。

    fastapi本身是异步框架,内置了定时任务的功能。

    import asyncio
    from loguru import logger
    from functools import wraps
    from asyncio import ensure_future
    from starlette.concurrency import run_in_threadpool
    from typing import Any, Callable, Coroutine, Optional, Union
    
    NoArgsNoReturnFuncT = Callable[[], None]
    NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
    NoArgsNoReturnDecorator = Callable[
        [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
        NoArgsNoReturnAsyncFuncT
    ]
    
    
    def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
    ) -> NoArgsNoReturnDecorator:
        '''
        返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
        其装饰的函数不能接受任何参数并且不返回任何内容.
        参数:
            seconds: float
                等待重复执行的秒数        wait_first: bool (默认 False)
                如果为 True, 该函数将在第一次调用前先等待一个周期.
            raise_exceptions: bool (默认 False)
                如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
            max_repetitions: Optional[int] (默认 None)
                该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
        '''
        def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
            '''
            将修饰函数转换为自身重复且定期调用的版本.
            '''
            is_coroutine = asyncio.iscoroutinefunction(func)
            had_run = False
            @wraps(func)
            async def wrapped() -> None:
                nonlocal had_run
                if had_run:
                    return
                had_run = True
                repetitions = 0
                async def loop() -> None:
                    nonlocal repetitions
                    if wait_first:
                        await asyncio.sleep(seconds)
                    while max_repetitions is None or repetitions < max_repetitions:
                        try:
                            if is_coroutine:
                                # 以协程方式执行
                                await func()  # type: ignore
                            else:
                                # 以线程方式执行
                                await run_in_threadpool(func)
                            repetitions += 1
                        except Exception as exc:
                            logger.error(f'执行重复任务异常: {exc}')
                            if raise_exceptions:
                                raise exc
                        await asyncio.sleep(seconds)
                ensure_future(loop())
            return wrapped
        return decorator
    

    我们基于异步功能来实现一个装饰器repeat_task。

    一次封装,再使用就无比简洁了:

    @app.on_event('startup')
    @repeat_task(seconds=6, wait_first=True)
    def repeat_task_aggregate_request_records() -> None:
        logger.info('触发重复任务: 聚合请求记录')
    
     
    

    05 可转债数据

    我们需要规划一下数据库表名字:

    债券:bond_, 股票:stock_, 基金:fund_,不带复数s。

    基础信息:_basic, 日线:_daily,财务指标 _fin

    一般而言,任何一个交易品种,我们都需要先入库两种数据:

    一是基础信息列表,二是交易日频数据。

    基本信息只需要构建一次,若有变动不定期手动刷新即可:

    def build_cb_basic():
        df = get_cb_basic()
        df['_id'] = df['ts_code']
        write_df('bond_basic', df, drop_tb_if_exist=True)

    可以看出,一共A股历史上发行了799支可转债,目前可交易的400多支。

    交易数据,这是需要每天收盘后去增量更新的,我们的逻辑是如果没有数据,从19900101开始读,如果有数据,则从当前日期增量更新即可。

    mongo的优点可以自动忽略_id相同的行,所以特别方便。

    def update_all_bond_daily():
        # 获取所有列表,有日期,从最近的日期开始读。
        items = list(get_db()['bond_basic'].find({}, {'ts_code': 1, '_id': 0}))
        if items and len(items) == 0:
            logger.error("读可转债列表为空")
            return
        for i, item in enumerate(items):
            code = item['ts_code']
            logger.debug("{}-{}-{}".format(i, code, i / len(items)))
            date = get_daily_last_date(code)
            df = get_bond_daily(code, date)
            df['_id'] = df['ts_code'] + '_' + df['trade_date']
            print(df.tail())
            write_df('bond_daily', df)
            break

    小结一下:

    我们开始体系化构建整体平台。

    后端框架是fastapi,从fastapi内置的定时任务开始,构建可转债的基本数据和日频数据。

    qlib因子分析之alphalens源码解读

    基于alphalens对qlib的alpha158做单因子分析

    人生B计划,不确定时代的应对之道

    飞狐,科技公司CTO,用AI技术做量化投资;以投资视角观历史,解时事;专注个人成长与财富自由。

  • 相关阅读:
    图片可以在固定区域放大缩小,且可以拖拽移动,有复位(vue3)
    Java File.list具有什么功能呢?
    [Spring] AOP面向切面编程
    【心电信号】Simulink胎儿心电信号提取【含Matlab源码 1550期】
    编程每日一练(多语言实现):判断偶数
    【计算机组成原理】第三章 存储系统
    【Unity】如何限制相机、物体的移动角度、位置等
    Python基础入门篇【37】-异常:finally关键字的使用、自定义异常类型及自定义异常抛出
    力扣 739. 每日温度
    编程式事务之基于XML的声明式事务控制
  • 原文地址:https://blog.csdn.net/weixin_38175458/article/details/126675935