• milvus Delete API流程源码分析


    Delete API执行流程源码解析

    milvus版本:v2.3.2

    整体架构:

    在这里插入图片描述

    Delete 的数据流向:

    在这里插入图片描述

    1.客户端sdk发出Delete API请求。

    from pymilvus import (
        connections,
        Collection,
    )
    
    print("start connecting to Milvus")
    connections.connect("default", host="192.168.230.71", port="19530")
    
    hello_milvus = Collection("hello_milvus")
    
    print("Start delete entities")
    ## expr = "book_id in [0,1]" 主键
    expr = "pk in [447868867306324066,447868867306324067]" ## 非主键
    delete_result = hello_milvus.delete(expr)
    print(delete_result)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    2.服务端接受API请求,将request封装为deleteTask,并压入dmQueue队列。

    注意这里是dmQueue。DDL类型的是ddQueue。

    代码路径:internal\proxy\impl.go

    // Delete delete records from collection, then these records cannot be searched.
    func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) (*milvuspb.MutationResult, error) {
    	......
        // request封装为deleteTask
    	dt := &deleteTask{
    		ctx:         ctx,
    		Condition:   NewTaskCondition(ctx),
    		req:         request,
    		idAllocator: node.rowIDAllocator,
    		chMgr:       node.chMgr,
    		chTicker:    node.chTicker,
    		lb:          node.lbPolicy,
    	}
    
    	......
        // 将task压入dmQueue队列
    	// MsgID will be set by Enqueue()
    	if err := node.sched.dmQueue.Enqueue(dt); err != nil {
    		......
    	}
    
    	......
        // 等待任务执行完
    	if err := dt.WaitToFinish(); err != nil {
    		......
    	}
    
    	......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    DeleteRequest数据结构:

    type DeleteRequest struct {
    	Base                 *commonpb.MsgBase
    	DbName               string
    	CollectionName       string
    	PartitionName        string
    	Expr                 string
    	HashKeys             []uint32
    	XXX_NoUnkeyedLiteral struct{}
    	XXX_unrecognized     []byte
    	XXX_sizecache        int32
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.执行deleteTask的3个方法PreExecute、Execute、PostExecute。

    PreExecute()一般为参数校验等工作。

    Execute()为真正执行逻辑。

    PostExecute()执行完后的逻辑,什么都不做,返回nil。

    代码路径:internal\proxy\task_delete.go

    func (dt *deleteTask) Execute(ctx context.Context) (err error) {
    	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute")
    	defer sp.End()
    	log := log.Ctx(ctx)
    
    	if len(dt.req.GetExpr()) == 0 {
    		return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression")
    	}
    
    	dt.tr = timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
        // 拿到stream,类型为msgstream.mqMsgStream
    	stream, err := dt.chMgr.getOrCreateDmlStream(dt.collectionID)
    	if err != nil {
    		return err
    	}
    
    	plan, err := planparserv2.CreateRetrievePlan(dt.schema, dt.req.Expr)
    	if err != nil {
    		return fmt.Errorf("failed to create expr plan, expr = %s", dt.req.GetExpr())
    	}
    	// 判断走simpleDelete还是complexDelete
    	isSimple, termExp := getExpr(plan)
    	if isSimple {
    		// if could get delete.primaryKeys from delete expr
    		err := dt.simpleDelete(ctx, termExp, stream)
    		if err != nil {
    			return err
    		}
    	} else {
    		// if get complex delete expr
    		// need query from querynode before delete
    		err = dt.complexDelete(ctx, plan, stream)
    		if err != nil {
    			log.Warn("complex delete failed,but delete some data", zap.Int("count", dt.count), zap.String("expr", dt.req.GetExpr()))
    			return err
    		}
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    expr如果是主键表达式则走simpleDelete,否则走complexDelete。

    4.simpleDelete

    func (dt *deleteTask) simpleDelete(ctx context.Context, termExp *planpb.Expr_TermExpr, stream msgstream.MsgStream) error {
    	primaryKeys, numRow, err := getPrimaryKeysFromExpr(dt.schema, termExp)
    	if err != nil {
    		log.Info("Failed to get primary keys from expr", zap.Error(err))
    		return err
    	}
    	log.Debug("get primary keys from expr",
    		zap.Int64("len of primary keys", numRow),
    		zap.Int64("collectionID", dt.collectionID),
    		zap.Int64("partationID", dt.partitionID))
    	err = dt.produce(ctx, stream, primaryKeys)
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    函数getPrimaryKeysFromExpr()的返回schemapb.IDs。

    type IDs struct {
    	// Types that are valid to be assigned to IdField:
    	//
    	//	*IDs_IntId
    	//	*IDs_StrId
    	IdField              isIDs_IdField
    	XXX_NoUnkeyedLiteral struct{}
    	XXX_unrecognized     []byte
    	XXX_sizecache        int32
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    isIDs_IdField是一个接口类型。

    type isIDs_IdField interface {
    	isIDs_IdField()
    }
    
    • 1
    • 2
    • 3

    isIDs_IdField有2个实现:

    • IDs_IntId
    • IDs_StrId
    type IDs_IntId struct {
    	IntId *LongArray
    }
    
    type LongArray struct {
    	Data                 []int64
    	XXX_NoUnkeyedLiteral struct{}
    	XXX_unrecognized     []byte
    	XXX_sizecache        int32
    }
    
    type IDs_StrId struct {
    	StrId *StringArray
    }
    
    type StringArray struct {
    	Data                 []string
    	XXX_NoUnkeyedLiteral struct{}
    	XXX_unrecognized     []byte
    	XXX_sizecache        int32
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里插入图片描述

    从expr提取主键存储到变量primaryKeys。

    5.dt.produce()

    func (dt *deleteTask) produce(ctx context.Context, stream msgstream.MsgStream, primaryKeys *schemapb.IDs) error {
        // 根据vchannels计算hash
    	hashValues := typeutil.HashPK2Channels(primaryKeys, dt.vChannels)
    	// repack delete msg by dmChannel
    	result := make(map[uint32]msgstream.TsMsg)
    	numRows := int64(0)
    	for index, key := range hashValues {
    		vchannel := dt.vChannels[key]
    		_, ok := result[key]
    		if !ok {
                // 创建deleteMsg
    			deleteMsg, err := dt.newDeleteMsg(ctx)
    			if err != nil {
    				return err
    			}
    			deleteMsg.ShardName = vchannel
    			result[key] = deleteMsg
    		}
    		curMsg := result[key].(*msgstream.DeleteMsg)
    		curMsg.HashValues = append(curMsg.HashValues, hashValues[index])
    		curMsg.Timestamps = append(curMsg.Timestamps, dt.ts)
    
    		typeutil.AppendIDs(curMsg.PrimaryKeys, primaryKeys, index)
    		curMsg.NumRows++
    		numRows++
    	}
    
    	// send delete request to log broker
    	msgPack := &msgstream.MsgPack{
    		BeginTs: dt.BeginTs(),
    		EndTs:   dt.EndTs(),
    	}
        // 将deleteMsg包装进msgPack
    	for _, msg := range result {
    		if msg != nil {
    			msgPack.Msgs = append(msgPack.Msgs, msg)
    		}
    	}
    
    	log.Debug("send delete request to virtual channels",
    		zap.String("collectionName", dt.req.GetCollectionName()),
    		zap.Int64("collectionID", dt.collectionID),
    		zap.Strings("virtual_channels", dt.vChannels),
    		zap.Int64("taskID", dt.ID()),
    		zap.Duration("prepare duration", dt.tr.RecordSpan()))
        // 发送给mq
    	err := stream.Produce(msgPack)
    	if err != nil {
    		return err
    	}
    	dt.result.DeleteCnt += numRows
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    msgstream.TsMsg是一个接口类型。

    有如下实现:

    • createCollectionMsg
    • CreateDatabaseMsg
    • CreateIndexMsg
    • createPartitionMsg
    • DataNodeTtMsg
    • DeleteMsg
    • DropCollectionMsg
    • DropDatabaseMsg
    • DropIndexMsg
    • DropPartitionMsg
    • FlushMsg
    • InsertMsg
    • LoadCollectionMsg
    • ReleaseCollectionMsg
    • TimeTickMsg

    6.complexDelete

    func (dt *deleteTask) complexDelete(ctx context.Context, plan *planpb.PlanNode, stream msgstream.MsgStream) error {
    	err := dt.lb.Execute(ctx, CollectionWorkLoad{
    		db:             dt.req.GetDbName(),
    		collectionName: dt.req.GetCollectionName(),
    		collectionID:   dt.collectionID,
    		nq:             1,
    		exec:           dt.getStreamingQueryAndDelteFunc(stream, plan),
    	})
    	if err != nil {
    		log.Warn("fail to get or create dml stream", zap.Error(err))
    		return err
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    最终会执行dt.getStreamingQueryAndDelteFunc。

    这个函数会调用:

    dt.produce(ctx, stream, result.GetIds())
    
    • 1

    simpleDelete也是调用这个函数。

    complexDelete会根据expr查询出主键,然后根据主键进行删除数据。

    7.总结

    • delete api根据expr走simpleDelete还是complexDelete。
    • complexDelete最终也会转化为simpleDelete。
  • 相关阅读:
    Python学习 -- logging模块
    stdmap和stdmultimap的使用总结
    我的docker随笔38:用 registry 搭建私有仓库
    JAVA设计模式第十讲:SPI - 业务差异解决方案
    为什么说“分布式架构”才是AR眼镜的未来
    纯Python实现遗传算法
    B_QuRT_User_Guide(26)
    Alpha-Beta 剪枝
    我的面试(1年经验)
    Android11.0(R) MTK 预置可卸载app恢复出厂不恢复(仿RK方案)
  • 原文地址:https://blog.csdn.net/shulu/article/details/136235697