• milvus Delete api写s3的流程


    Delete api写s3的流程

    milvus版本:v2.3.2

    整体架构:

    在这里插入图片描述

    Delete 的数据流向

    在这里插入图片描述

    delete相关配置

    dataNode:
      segment:
        insertBufSize: 16777216 # Max buffer size to flush for a single segment.
        deleteBufBytes: 67108864 # Max buffer size to flush del for a single channel
        syncPeriod: 600 # The period to sync segments if buffer is not empty.
    
    • 1
    • 2
    • 3
    • 4
    • 5

    当collection已经有flushed文件,如果后续有insert和delete操作,这个配置文件控制这个行为。

    s3文件不支持进行文件内容的编辑。因此需要有一种机制能够进行insert和delete。

    delete在内存中(buffer)的流程

    堆栈:

    start()(internal\util\flowgraph\node.go)
      |--go nodeCtx.work()(同上)
        |--n.Operate(input)(同上)
          |--dn.bufferDeleteMsg()(internal\datanode\flow_graph_delete_node.go)
            |--dn.delBufferManager.StoreNewDeletes()(同上)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    func (m *DeltaBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
    	tss []Timestamp, tr TimeRange, startPos, endPos *msgpb.MsgPosition,
    ) {
        // 获取delDataBuf
    	buffer, loaded := m.Load(segID)
        // 如果不存在则新建
    	if !loaded {
    		buffer = newDelDataBuf(segID)
    	}
        // 将pks存入buffer
    	size := buffer.Buffer(pks, tss, tr, startPos, endPos)
    
    	m.pushOrFixHeap(segID, buffer)
    	m.updateMeta(segID, buffer)
    	m.usedMemory.Add(size)
    
    	metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(
    		fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(len(pks)))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    pks存储的是主键值。

    这是对内存的操作。

    delete写入s3的流程

    Start()(internal\util\flowgraph\node.go)
      |--go nodeCtx.work()(同上)
        |--n.Operate(input)(同上)
          |--dn.flushManager.flushDelData()(internal\datanode\flow_graph_delete_node.go)
            |--m.handleDeleteTask()(internal\datanode\flush_manager.go)
              |--m.getFlushQueue(segmentID).enqueueDelFlush()(同上)
                |--q.getFlushTaskRunner(pos).runFlushDel()(同上)
                  |--runFlushDel()(internal\datanode\flush_task.go)
                    |--task.flushDeleteData()(同上)
                      |--t.MultiWrite(ctx, t.data)(internal\datanode\flush_manager.go)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    当达到syncPeriod或者buffer满或者执行flush操作,会触发写s3操作。

    看看这个函数flushDelData():

    // notify flush manager del buffer data
    func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,
    	pos *msgpb.MsgPosition,
    ) error {
    	// del signal with empty data
    	if data == nil || data.delData == nil {
    		m.handleDeleteTask(segmentID, &flushBufferDeleteTask{}, nil, pos)
    		return nil
    	}
    
    	collID, partID, err := m.getCollectionAndPartitionID(segmentID)
    	if err != nil {
    		return err
    	}
        // 编码解码器,提供序列化,反序列化功能
    	delCodec := storage.NewDeleteCodec()
        // 序列化
    	blob, err := delCodec.Serialize(collID, partID, segmentID, data.delData)
    	if err != nil {
    		return err
    	}
    
    	logID, err := m.AllocOne()
    	if err != nil {
    		log.Error("failed to alloc ID", zap.Error(err))
    		return err
    	}
    
    	blobKey := metautil.JoinIDPath(collID, partID, segmentID, logID)
    	blobPath := path.Join(m.ChunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
        // 合成kvs
    	kvs := map[string][]byte{blobPath: blob.Value[:]}
    	data.LogSize = int64(len(blob.Value))
    	data.LogPath = blobPath
    	log.Info("delete blob path", zap.String("path", blobPath))
    	m.handleDeleteTask(segmentID, &flushBufferDeleteTask{
    		ChunkManager: m.ChunkManager,
    		data:         kvs,
    	}, data, pos)
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    delCodec.Serialize()返回的变量blob类型为*Blob。

    // Blob is a pack of key&value
    type Blob struct {
    	Key    string
    	Value  []byte
    	Size   int64
    	RowNum int64
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    blobPath为s3的文件路径

    files/delta_log/447918772784340274/447918772784340275/447918772784340283/447918772783939606
    
    • 1

    delta_log存储的是insert和delete增量数据。

    s3的截图:

    在这里插入图片描述

    在这里插入图片描述

    总结

    1.delete/insert增量数据写入buffer。

    2.满足一定条件buffer刷入s3。

  • 相关阅读:
    C++:类模板的应用实现动态数组
    SpringCloud Gateway 网关的请求体body的读取和修改
    oracle -- 表操作
    Java并发编程学习十四:AQS框架
    python之df.index.to_native_types()应用举例
    win11开机动画关闭教程
    深入浅出Spring注解ConfigurationProperties
    计算机网络【CN】TCP报文段格式【20B】
    自己动手实现rpc框架(一) 实现点对点的rpc通信
    cdp集群Hbase组件HRegionServer服务停止原因以及排查
  • 原文地址:https://blog.csdn.net/shulu/article/details/136259494