• boltdb一瞥


    boltdb

    网上关于boltdb的文章有很多,特别是微信公众号上,例如:
    boltdb源码分析系列-事务-腾讯云开发者社区-腾讯云 (tencent.com)
    这些文章都写的挺好,但不一定覆盖了我所关注的几个点,下面我把我关注的几个点就来下来。

    node page bucket tx db的关系

    • 磁盘数据mmap到page内存区域,也可以理解为就是磁盘数据
      • page需要一段连续的内存
    • node封装的B+树节点数据结构
    • bucket一个B+树数据结构。可以理解成一个表
    • tx 读事务或读写事务
      • bucket是内存结构每个tx中都会生成一个
      • 会将tx中涉及到(读取过、修改过)的nodes都记录在bucket中
      • 读写事务最终写入磁盘时是需要重新申请新的page的,即不会修改原有的page
    • db整个数据库文件
      • db中的freelist记录了db文件中空闲的页(即已经可以释放掉的页)

    tx.commit

    在boltdb的 commit中才会执行b+树的rebalance操作,执行完后再进行写入磁盘的操作。也就是说在一个事务中涉及到的多次写操作,会最终在commit的时候同意执行写入磁盘spill操作。

    func (tx *Tx) Commit() error {
        _assert(!tx.managed, "managed tx commit not allowed")
        if tx.db == nil {
            return ErrTxClosed
        } else if !tx.writable {
            return ErrTxNotWritable
        }
           
        // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
    
        // Rebalance nodes which have had deletions.
        var startTime = time.Now()
        tx.root.rebalance()
        if tx.stats.Rebalance > 0 {
            tx.stats.RebalanceTime += time.Since(startTime)
        }
        
        // spill data onto dirty pages.
        startTime = time.Now()
        if err := tx.root.spill(); err != nil {
            tx.rollback()
            return err
        }
    

    也正因为txn中可能有多个key插入,所以split就可能会进行多次

    func (n *node) split(pageSize int) []*node {
        var nodes []*node
    
        node := n
        for {
            // Split node into two.
            a, b := node.splitTwo(pageSize)
            nodes = append(nodes, a)
    
            // If we can't split then exit the loop.
            if b == nil {
                break
            }   
    
            // Set node to b so it gets split on the next iteration.
            node = b 
        }   
    
        return nodes
    }
    
    node.go
    

    数据写入到磁盘的时候,是从下层节点往上层节点写的

    // spill writes the nodes to dirty pages and splits nodes as it goes.
    // Returns an error if dirty pages cannot be allocated.
    func (n *node) spill() error {
        var tx = n.bucket.tx
        if n.spilled {
            return nil
        }
        
        // Spill child nodes first. Child nodes can materialize sibling nodes in
        // the case of split-merge so we cannot use a range loop. We have to check
        // the children size on every loop iteration.
        sort.Sort(n.children)
        for i := 0; i < len(n.children); i++ {
            if err := n.children[i].spill(); err != nil {
                return err
            } 
        }
        
        // We no longer need the child list because it's only used for spill tracking.
        n.children = nil
    
        // Split nodes into appropriate sizes. The first node will always be n.
        var nodes = n.split(tx.db.pageSize)
    
    node.go
    

    数据较大如何处理?直接将构造一个大的page将数据存储进去。与此同时,原先node关联的page可以释放掉了。因为整个是一个append only模式,原先的page在新事务生成,且没有其他读事务访问后就可以释放掉了。

        for _, node := range nodes {
            // Add node's page to the freelist if it's not new.
            if node.pgid > 0 {
                tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
                node.pgid = 0
            }
        
            // Allocate contiguous space for the node.
            p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
            if err != nil {
                return err
            }
    
    
    node.go
    

    哪些node需要rebalance呢,size < 25% page_size或者中间节点小于2个key,叶子节点小于1个key。

    func (n *node) rebalance() {
        if !n.unbalanced {
            return
        }
        n.unbalanced = false
            
        // Update statistics.
        n.bucket.tx.stats.Rebalance++
    
        // Ignore if node is above threshold (25%) and has enough keys.
        var threshold = n.bucket.tx.db.pageSize / 4
        if n.size() > threshold && len(n.inodes) > n.minKeys() {
            return
        } 
    
    node.go
    

    bucket中读到了node,就将node加入到bucket中,读到了就意味着这些node可能就会发生改变。它是在cursor移动的时候加入到bucket中的。

    func (c *Cursor) node() *node {
        _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
    
        // If the top of the stack is a leaf node then just return it.
        if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
            return ref.node
        }
        
        // Start from root and traverse down the hierarchy.
        var n = c.stack[0].node
        if n == nil {
            n = c.bucket.node(c.stack[0].page.id, nil)
        }
        for _, ref := range c.stack[:len(c.stack)-1] {
            _assert(!n.isLeaf, "expected branch node")
            n = n.childAt(int(ref.index))
        }   
        _assert(n.isLeaf, "expected leaf node")
        return n
    }  
    
    // node creates a node from a page and associates it with a given parent.
    func (b *Bucket) node(pgid pgid, parent *node) *node {
        _assert(b.nodes != nil, "nodes map expected")
    
        // Retrieve node if it's already been created.
        if n := b.nodes[pgid]; n != nil {
            return n
        }   
    
        // Otherwise create a node and cache it.
        n := &node{bucket: b, parent: parent}
        if parent == nil {
            b.rootNode = n 
        } else {
            parent.children = append(parent.children, n)
        }   
    
        // Use the inline page if this is an inline bucket.
        var p = b.page
        if p == nil {
            p = b.tx.page(pgid)
        }   
    
        // Read the page into the node and cache it.
        n.read(p)
        b.nodes[pgid] = n 
    
        // Update statistics.
        b.tx.stats.NodeCount++
    

    freelist

    它表示的是磁盘中已经释放的页

    结构

    • ids 所有空闲页
    • pending {txid, pageids[]}即将释放的txid以及其关联的pageid
    • cache map索引

    ->pending 释放实际

    • tx.commit时会将事务中涉及到的老的node对应的page都放到pending中
      • node.spill中将关联的旧node(node与page对应)放到freelist的pending中

    pending->release释放时机

    tx的commit阶段会将事务涉及的原先老page放到freelist的pending中。

    func (f *freelist) free(txid txid, p *page) {
        if p.id <= 1 {
            panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
        }       
            
        // Free page and all its overflow pages.
        var ids = f.pending[txid]
        for id := p.id; id <= p.id+pgid(p.overflow); id++ {
            // Verify that page is not already free.
            if f.cache[id] {
                panic(fmt.Sprintf("page %d already freed", id))
            }
            
            // Add to the freelist and cache.
            ids = append(ids, id)
            f.cache[id] = true
        }
        f.pending[txid] = ids
    }   
    

    db.beginRWTx 开启读写事务的时候会尝试将过期的page释放掉

    func (f *freelist) release(txid txid) {
        m := make(pgids, 0)
        for tid, ids := range f.pending {
            if tid <= txid {
                // Move transaction's pending pages to the available freelist.
                // Don't remove from the cache since the page is still free.
                m = append(m, ids...)
                delete(f.pending, tid)
            }
        }
        sort.Sort(m)
        f.ids = pgids(f.ids).merge(m)
    }
    
  • 相关阅读:
    Unity手机游戏性能优化系列:针对CPU端的性能调优
    无线遥控方案无线收发 soc 芯片 CI2451&CI2454
    学生静态HTML个人博客主页【Web大学生网页作业成品】HTML+CSS+JavaScript
    Java.lang.Class desiredAssertionStatus()方法有什么功能呢?
    特征数组的特征值 leetcode
    【uniapp】微信小程序canvas签名旋转生成图片
    婴儿沐浴座椅上架亚马逊美国站安全标准要求ASTM F1967-19测试,CPC认证
    你不用URL?
    【Python百日进阶-数据分析】Day119 - Plotly Figure参数: 散点图(一)
    基于Spark的气象数据处理与分析
  • 原文地址:https://www.cnblogs.com/bymzy/p/18298137