• 行情订阅&分钟合成


    海风/go_real_md

    https://www.bilibili.com/video/BV15T411A7n3/

    订阅

    版本 v0.6.5-20220722

    quote.go :181

     1// 订阅行情
    2var insts = make([]string0)
    3t.Instruments.Range(func(k, v interface{}) bool {
    4  if len(v.(*goctp.InstrumentField).ProductID) == 0 { // 过滤 非正常合约
    5    return true
    6  }
    7  insts = append(insts, k.(string))
    8  if len(insts) > 200 { // 一次订阅 200个合约
    9    q.ReqSubscript(insts)
    10    insts = make([]string0)
    11  }
    12  i++
    13  return true
    14})
    15q.ReqSubscript(insts)
    • 1

    合成分钟数据

    处理 Actionday

    OnRspLogin

    DCE 的 actionDay 可能为 tradingDay,夜盘数据不对。

    trade.go :69

    1    // 交易日/自然日
    2    t.TradingDay = login.TradingDay
    3    if t.Weekday() == time.Monday { // 周一
    4          actionDay = t.AddDate(00-3).Format("20060102")     // 上周五
    5          actionDayNext = t.AddDate(00-2).Format("20060102"// 上周六
    6    } else {
    7          actionDay = t.AddDate(00-1).Format("20060102"// 上一天
    8          actionDayNext = login.TradingDay                   // 本日
    9    }
    • 1

    OnTick

    quote.go

     1    // 取tick的分钟构造当前分钟时间
    2    var action = t.TradingDay
    3    // 夜盘
    4    hour, _ := strconv.Atoi(tick.UpdateTime[0:2])
    5    if hour <= 3 {
    6        action = actionDayNext
    7    } else if hour >= 20 {
    8        action = actionDay
    9    }
    10    // 分钟 yyyy-mm-dd hh:mm:00
    11    minDateTime := fmt.Sprintf("%s-%s-%s %s:00", action[0:4], action[4:6], action[6:], tick.UpdateTime[0:5])
    • 1

    首个 Tick

    开高低收值为 lastPrice,Volume当前Volume为总成交量,需要减去上一分钟最后一个tick的总成交量。

     1preVol := bar.PreVol + bar.Volume
    2&Bar{
    3  ID:           minDateTime,
    4  InstrumentID: tick.InstrumentID,
    5  OpenInterest: tick.OpenInterest,
    6  TradingDay:   t.TradingDay,
    7  Open:         tick.LastPrice,
    8  High:         tick.LastPrice,
    9  Low:          tick.LastPrice,
    10  Close:        tick.LastPrice,
    11  PreVol:       preVol,
    12  Volume:       tick.Volume - preVol,
    13}
    • 1

    K线更新

    最高/最低/收盘价/成交量/持仓量

     1const E = 0.000001
    2if tick.LastPrice-bar.High > E {
    3  bar.High = tick.LastPrice
    4}
    5if tick.LastPrice-bar.Low < E {
    6  bar.Low = tick.LastPrice
    7}
    8bar.Close = tick.LastPrice
    9bar.Volume = tick.Volume - bar.PreVol
    10bar.OpenInterest = tick.OpenInterest
    • 1

    数据存储

    分钟 tick 数 ≥ 3

    • 开盘前竞价 tick

    • 小节收盘时只收到 1 个tick

    • json

     1// 当前分钟未被记录
    2if bar.Ticks == 3 { // 控制分钟最小tick数量;避免盘歇的数据
    3  jsonStr, _ := json.Marshal(&bar)
    4  err := rdb.RPush(ctx, tick.InstrumentID, jsonStr).Err()
    5  if err != nil {
    6    logrus.Errorf("redis rpush error: %s %v", tick.InstrumentID, err)
    7  }
    8  // 发布分钟数据
    9  rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
    10else if bar.Ticks > 3 {
    11  jsonStr, _ := json.Marshal(&bar)
    12  err := rdb.LSet(ctx, tick.InstrumentID, -1, jsonStr).Err()
    13  if err != nil {
    14    logrus.Errorf("redis lset error: %s %v", tick.InstrumentID, err)
    15  }
    16  // 发布分钟数据
    17  rdb.Publish(ctx, "md."+tick.InstrumentID, jsonStr)
    18}
    • 1

    收盘时写入 postgres

    • 读取 redis 数据 (json)

    • 写入 pg

     1func InserrtPg() (err error) {
    2    // 取交易日
    3    tradingDay, err := rdb.HGet(ctx, "tradingday""curday").Result()
    4    if err != nil {
    5        return errors.Wrap(err, "get curday")
    6    }
    7    // 删除当前交易日数据(实现重复入库)
    8    err = GORMpg.Where(&Bar{TradingDay: tradingDay}).Delete(&Bar{}).Error
    9    if err != nil {
    10        return errors.Wrap(err, "delete")
    11    }
    12    // 取所有 key
    13    var bars = make([]*Bar, 0)
    14    insts, err := rdb.Keys(ctx, "*").Result()
    15    if err != nil {
    16        return errors.Wrap(err, "get redis keys")
    17    }
    18    // 按合约key入库
    19    for _, inst := range insts {
    20        if inst == "tradingday" {
    21            continue
    22        }
    23        var mins = []string{}
    24        if mins, err = rdb.LRange(ctx, inst, 0-1).Result(); err != nil {
    25            logrus.Error("取redis数据错误:", inst, err)
    26            continue
    27        }
    28        for _, bsMin := range mins {
    29            var bar Bar
    30            if err = json.Unmarshal([]byte(bsMin), &bar); err != nil {
    31                logrus.Error("解析bar错误:", bar, " ", err)
    32                continue
    33            }
    34            bars = append(bars, &bar)
    35            if len(bars) > 100000 { // 10W
    36                err = GORMpg.CreateInBatches(bars, 2000).Error
    37                if err != nil {
    38                    return errors.Wrap(err, "create batch")
    39                }
    40                bars = make([]*Bar, 0)
    41            }
    42        }
    43    }
    44    err = GORMpg.CreateInBatches(bars, 2000).Error
    45    if err != nil {
    46        return errors.Wrap(err, "create batch")
    47    }
    48    return
    49}
    • 1

    部署

    方式一

    修改 Makefile 中的环境变量

    • 前置、登录参数

    • redis IP

    • pg IP 不配置则不会在收盘时写入当日分钟数据

    1make docker
    2make local 或者 make srv
    • 1

    方式二

    docker-compose

    1docker-compose --compatibility up -d
    • 1

    查看数据

    redis

    1# 进入 docker redis cli
    2docker exec -it ${redis_docker} redis-cli
    3# 合约列表
    4> KEYS *
    5# 查看存储的行情
    6> LRANGE ${合约} 0 -1
    7# 查看存储的交易日
    8> HGET tradingday curday
    • 1

    实时分钟数据可用 pubsub 订阅

     1# pubsub 是一个查看订阅与发布系统状态的内省命令, 它由数个不同格式的子命令组成pubsub
    2pubsub channels [pattern] # 查询系统中符合模式的频道信息,pattern为空,则查询系统中所有存在的频道
    3pubsub numsub [channel] # 查询一个或多个频道的订阅数
    4pubsub numpat  #查询当前客户端订阅了多少频道
    5
    6# subscribe 订阅一个或多个频道
    7subscribe channel [channel...]
    8
    9# psubscribe 订阅符合一个或多个匹配模式的所有频道,psubscribe new.* 则是订阅所有new.开头的频道(new.log,new.studnet,etc…)
    10psubscribe pattern [pattern …]
    • 1
  • 相关阅读:
    手机微信里面的文件打印步骤
    微信公众号如何运营和管理?
    finalize()方法什么时候被调用?析构函数(finalization)的目的是什么?
    六、程序员指南:数据平面开发套件
    分布式-分布式选举算法
    【路径规划】基于遗传算法求解多车多类型车辆的车辆路径优化问题附matlab代码
    Python21天学习挑战赛Day(15-16)·lxml库与Xpath提取网页数据
    如何使用java语言下载https的网络文件
    博弈论
    银发经济崭露头角:海外网红营销如何助力假发品牌增长
  • 原文地址:https://blog.csdn.net/along1976/article/details/126459010