• milvus upsert流程源码分析


    milvus版本:v2.3.2

    整体架构:

    在这里插入图片描述

    Upsert 的数据流向:

    在这里插入图片描述

    1.客户端sdk发出Upsert API请求。

    import numpy as np
    from pymilvus import (
        connections,
        Collection,
    )
    
    num_entities, dim = 4, 3
    
    print("start connecting to Milvus")
    connections.connect("default", host="192.168.230.71", port="19530")
    
    hello_milvus = Collection("hello_milvus")
    
    print("Start upsert entities")
    rng = np.random.default_rng(seed=19530)
    entities = [
        [0,1,2,4000],
        [10,11,12,4000],
        rng.random((num_entities, dim)),
    ]
    hello_milvus.upsert(entities)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

    注意这里是dmQueue。DDL类型的是ddQueue。

    代码路径:internal\proxy\impl.go

    // Upsert upsert records into collection.
    func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
    	......
        // request封装为upsertTask
    	it := &upsertTask{
    		baseMsg: msgstream.BaseMsg{
    			HashValues: request.HashKeys,
    		},
    		ctx:       ctx,
    		Condition: NewTaskCondition(ctx),
    		req:       request,
    		result: &milvuspb.MutationResult{
    			Status: merr.Success(),
    			IDs: &schemapb.IDs{
    				IdField: nil,
    			},
    		},
    
    		idAllocator:   node.rowIDAllocator,
    		segIDAssigner: node.segAssigner,
    		chMgr:         node.chMgr,
    		chTicker:      node.chTicker,
    	}
    
    	......
        // 将task压入dmQueue队列
    	if err := node.sched.dmQueue.Enqueue(it); err != nil {
    		......
    	}
    
    	......
        // 等待任务执行完
    	if err := it.WaitToFinish(); err != nil {
    		......
    	}
    
    	......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

    PreExecute()一般为参数校验等工作。

    Execute()为真正执行逻辑。

    PostExecute()执行完后的逻辑,什么都不做,返回nil。

    代码路径:internal\proxy\task_upsert.go

    func (it *upsertTask) Execute(ctx context.Context) (err error) {
    	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
    	defer sp.End()
    	log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))
    
    	tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
    	// 拿到stream,类型为msgstream.mqMsgStream
        stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
    	if err != nil {
    		return err
    	}
        // 创建msgPack
    	msgPack := &msgstream.MsgPack{
    		BeginTs: it.BeginTs(),
    		EndTs:   it.EndTs(),
    	}
        // 添加insertMsgPack
    	err = it.insertExecute(ctx, msgPack)
    	if err != nil {
    		log.Warn("Fail to insertExecute", zap.Error(err))
    		return err
    	}
        // 添加deleteMsgPack
    	err = it.deleteExecute(ctx, msgPack)
    	if err != nil {
    		log.Warn("Fail to deleteExecute", zap.Error(err))
    		return err
    	}
    
    	tr.RecordSpan()
        // 发送数据至mq
    	err = stream.Produce(msgPack)
    	if err != nil {
    		it.result.Status = merr.Status(err)
    		return err
    	}
    	sendMsgDur := tr.RecordSpan()
    	metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
    	totalDur := tr.ElapseSpan()
    	log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
    		zap.Duration("total duration", totalDur))
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    msgPack变量:

    在这里插入图片描述

    msgPack包含了insertRequest和deleteRequest。

    在这里插入图片描述

    insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

    在这里插入图片描述

    deleteRequest包含主键值。

  • 相关阅读:
    文科类文献综述怎么写?
    搭建游戏要选什么样的服务器?
    php加密解密
    图片上传怎么搞?!阿里云OSS对象存储教你快速实现!
    STM32串口详解
    羽夏壳世界——异或加密的实现
    并发程序的噩梦——数据竞争
    Tesco EDI需求分析
    SpringMVC基础:请求转发和重定向
    Day 62 单向循环链表 双链表
  • 原文地址:https://blog.csdn.net/shulu/article/details/136327180