• Java - 跳表在ConcurrentSkipListMap的运用及其原理


    前言

    我们知道跳表是Redis中的一个重要数据结构,其中Sorted Set这种集合底层就用到了跳表。后来我才知道,原来Java当中也有着跳表的集合实现。而我对此却非常陌生。因此趁此机会学习下。

    一. 跳表的产生和原理

    我们先来看下一个简单的链表
    在这里插入图片描述
    倘若我们要查找5,7,11这3个数字,那么我们分别需要比较5次,7次,11次。共计23次。

    可见比较次数是非常多的,那么如果我们能够将某些比较过程跳过,那是不是可以减少比较的次数呢?我们在上述链表中,挑选几个数字作为一个比较“索引”,如图:
    在这里插入图片描述
    那么我们在查找5,7,11这3个数字的时候,可以做什么样的优化操作呢?

    1. 先从最顶层链表开始查找。5这个数字在顶层链表可以直接找到,比较次数为3次。
    2. 顶层链表中,值为5的元素下一个指针(当前层级)指向元素9,大于7,因此在元素5的地方,可以向下层的链表去查找,再比较2次就可以找到元素7。比较次数为5次
    3. 找元素11同理,在顶层链表中值为9的元素处,向下层链表查找,再比较2次就可以找到元素11。比较次数为6次。

    那么总的来说,比较次数由原来的23次降低为14次。当然,我们可以再加一层链表来提高查询速度,如图:
    在这里插入图片描述

    那么这样一层套一层的形式,就形成了所谓的跳表。跳表的特性有这么几点:

    1. 一个跳表结构由很多层数据结构组成。
    2. 每一层都是一个有序的链表,默认是升序。也可以自定义排序方法。
    3. 最底层链表(图中所示Level1)包含了所有的元素
    4. 如果每一个元素出现在LevelN的链表中(N>1),那么这个元素必定在下层链表出现。
    5. 每一个节点都包含了两个指针,一个指向同一级链表中的下一个元素,一个指向下一层级别链表中的相同值元素。

    1.1 跳表的查找示意图

    还是以上图为例,那么想查找元素11,在跳表中的流程图如下:
    在这里插入图片描述

    1.2 跳表的插入示意图

    跳表插入数据的流程如下:

    1. 找到元素适合的插入层级K,这里的K采用随机的方式。K大于跳表的总层级,那么开辟新的一层,否则在对应的层级插入。
    2. 申请新的节点。
    3. 调整对应的指针。

    假设我要插入元素13(图画的都是自然升序,比较特殊…)原有的层级是3级,假设K=4:
    在这里插入图片描述
    倘若K=2:
    在这里插入图片描述

    二. ConcurrentSkipListMap解析

    ConcurrentSkipListMap的底层就是用了跳表来实现。我们可以发现,跳表有这么几个重要的元素信息:

    1. 每层是一个链表。
    2. 链表中的每个元素又是一个整体,存储着对应的Key-Value
    3. 跳表的层级。

    2.1 内部结构

    我们先来看下这个类当中的几个重要的静态内部类:

    public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
        
    	static final class Node<K,V> {
    		final K key;
            volatile Object value;
            volatile Node<K,V> next;
            // ...
    	}
    	
     	static class Index<K,V> {
            final Node<K,V> node;
            final Index<K,V> down;
            volatile Index<K,V> right;
            // ...
    	}
    	
    	static final class HeadIndex<K,V> extends Index<K,V> {
    		final int level;
            HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
                super(node, down, right);
                this.level = level;
            }
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • Node类:表示单链表的有序节点。里面存了Key、Value以及指向下一个节点的next指针。
    • Index类:基于Node类的一个索引层。上文说到每个节点有两个指针,在这里体现的就是down(下一层链表的节点)指针和right指针(同层的下一个节点)。
    • HeadIndex:维护索引层次,有等级的概念了。

    从静态内部类我们就可以看出ConcurrentSkipListMap满足了跳表的基本特征。接下来我们看下构造函数。我们看下最常用的无参构造:

    public ConcurrentSkipListMap() {
    	// 这个就是排序器,默认升序
        this.comparator = null;
        // 初始化动作
        initialize();
    }
    
    private void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    我们可以看到,最主要的就是初始化了一个HeadIndex(维护索引的一个结构),我们来将参数进行一一对应:

    new HeadIndex<K,V>(
    	(前三个参数实际上用于父类的构造HeadIndex的父类是IndexIndex.node ---> new Node<K,V>(null, BASE_HEADER, null),
        Index.down ---> null, 
        Index.right ---> null,
        HeadIndex.level ---> 1
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总的来说,就是创建出一个层级为1,最底层链表有一个初始元素。同时其相关指针都没有指向null

    而接下来,就可以进行元素的插入操作了。

    2.2 元素插入流程

    我们从插入元素最外层调用的API:put()函数看起:

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们可以得知,ConcurrentSkipListMap不允许插入的值为null,否则抛异常,紧接着继续往下走,这里我先把代码进行简化,方便大家理解,分开来看有时候会更直观

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;   
        // key同样不能为null,否则报空指针          
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        // 根据key找到对应的位置,并插入到最底层的链表中
        findInsertLocationAndInsert();
    	// 建立新的索引K,决定新元素建立的索引需要有几层,两种情况,一种是在已有的层级去建立,一种是新建立一个层级。
        buildIndexAndUpdateHeadIndex();
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    步骤大致分为两步:

    1. 找到该Key需要插入的位置在哪,然后插入。
    2. 生成对应的索引,并维护起来。

    2.2.1 寻找插入位置

    代码主要通过findPredecessor()函数来寻找元素的插入位置:

    private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    	// 同样判断了key是否为null,可见这个校验方面还是比较多的
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
        	// 从Head节点开始,最高等级的Level处开始往后查找
            for (Index<K,V> q = head, r = q.right, d;;) {
            	// 一般先向同级链表的右侧开始查找,元素单调递增
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    // 代表该节点已经被删除
                    if (n.value == null) {
                    	// 删除该节点,即改变下指针指向,然后停止遍历
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    // 往后找,直到右边Node节点的Key > 当前Key
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
               	// 向下寻找
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    翻译成大白话就是:

    1. 从顶层的链表开始向后查找,优先在同级链表往后查找。
    2. 找到大于当前Key的节点的时候,改为向下级链表查找。然后重复步骤1。
    3. 最终找到的元素是最底层链表中的节点。

    最终找到的节点是插入元素的上一个节点。即前置节点。 例如,我想插入元素10,然后其上一个最接近的元素是9,那么9就是其前置节点。

    2.2.2 元素的插入(创建新节点Node

    元素的插入操作主要分为两个步骤:

    1. 元素的相关校验。
    2. 执行插入。本质上就是当前元素的next指针指向前置节点原有的next指针。前置节点的next指针再指向当前插入元素。
    findInsertLocationAndInsert(){
    	outer: for (;;) {
    		// findPredecessor 函数主要用来确认key要插入的位置
    		for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
    		    if (n != null) {
    		    	// b是前置节点。n初始化是前置节点的next节点,即后继节点。当前元素理论上应该插入b和n之间
    		    	// 即 b(前置节点)  --> 当前插入的节点 ---> n(后继节点)
    		        Object v; int c;
    		        // 先记录一下后继节点的后继节点,用于下面的后移操作。
    		        Node<K,V> f = n.next;
    		        // 虽然一开始n赋值为b.next,但是在多并发的情况下,可能会不一致,防止不一致读
    		        if (n != b.next)               
    		            break;
    		        // 若该节点已经被删除,则不执行后续的插入操作。 
    		        if ((v = n.value) == null) {   
    		            n.helpDelete(b, f);
    		            break;
    		        }
    		        // 前置节点已经被删除(b = findPredecessor(key, cmp))
    		        if (b.value == null || v == n) 
    		            break;
    		        // 若当前需要插入的位置还要大一点,那么整体向后移动一位,本质上通过compareTo进行比较
    		        if ((c = cpr(cmp, key, n.key)) > 0) {
    		            b = n;
    		            n = f;
    		            continue;
    		        }
    		        // c==0说明,当前想要插入的位置,已经存在一个值了
    		        if (c == 0) {
    		        	// onlyIfAbsent的作用:false-->如果待插入的元素存在则替换。 true-->如果待插入的元素存在则直接返回
    		            if (onlyIfAbsent || n.casValue(v, value)) {
    		                @SuppressWarnings("unchecked") V vv = (V)v;
    		                return vv;
    		            }
    		            break; // restart if lost race to replace value
    		        }
    		        // else c < 0; fall through
    		    }
    			// 将当前k-v进行包装,在插入。
    		    z = new Node<K,V>(key, value, n);
    		    if (!b.casNext(n, z))
    		        break;         // restart if lost race to append to b
    		    break outer;
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    2.2.3 创建新索引(Index)

    到这里为止,待插入的元素已经在跳表中最底层链表上插入成功了,接下来就该去维护整个跳表的结构了:

    1. 计算这个待插入元素插入的层级。即需要几层链表都建立这个索引?
    buildIndexAndUpdateHeadIndex(){
    	// 生成随机数
    	int rnd = ThreadLocalRandom.nextSecondarySeed();
    	if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
    	    int level = 1, max;
    	    // 计算层级
    	    while (((rnd >>>= 1) & 1) != 0)
    	        ++level;
    	    Index<K,V> idx = null;
    	    // 这个head是HeadIndex类型的,里面有个level属性,记录着当前跳表的层级数
    	    HeadIndex<K,V> h = head;
    	    // 在已有的层级中建立索引
    	    if (level <= (max = h.level)) {
    	        for (int i = 1; i <= level; ++i)
    	        	// 主要是从最底层向上寻找,直到指定的level层。
    	            idx = new Index<K,V>(z, idx, null);
    	    }
    	    // 新创建一层链表结构,并维护索引
    	    else { 
    	    	// 索引层级数+1,然后建立索引
    	    	// 注意:此时只是新增了新节点的索引,并没有关联到跳表的真实体中
    	        level = max + 1; 
    	        @SuppressWarnings("unchecked")Index<K,V>[] idxs =
    	            (Index<K,V>[])new Index<?,?>[level+1];
    	        for (int i = 1; i <= level; ++i)
    	            idxs[i] = idx = new Index<K,V>(z, idx, null);
    	        for (;;) {
    	            h = head;
    	            int oldLevel = h.level;
    	            // 此时说明已经有其他的线程修改了头索引层数,因此退出循环
    	            if (level <= oldLevel) // lost race to add level
    	                break;
    	            HeadIndex<K,V> newh = h;
    	            Node<K,V> oldbase = h.node;
    	            // 生成新的HeadIndex节点,指向最新的层级。一般新创建的链表中,只有两个索引,一个是头索引,一个是新增节点的索引。前者next指向后者。
    	            for (int j = oldLevel+1; j <= level; ++j)
    	                newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
    	            // 整个过程就是复制一个新的HeadIndex节点,然后通过CAS操作,将新老HeadIndex对象进行替换。
    	            if (casHead(h, newh)) {
    	                h = newh;
    	                idx = idxs[level = oldLevel];
    	                break;
    	            }
    	        }
    	    }
    	    // 插入操作
    	    headIndexInsert();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    到这里为止,ConcurrentSkipListMap主要目的是根据待插入元素Node创建出一个新的索引。但是并没有将这个索引插入到对应的层级中。 即上述一大串代码都是创建操作,只有最后一句话(伪代码)是插入操作,我们来看下。

    2.2.4 新建索引的插入

    headIndexInsert(){
    	// 获取level,该level为原头节点的层数,不包括新层
        splice: for (int insertionLevel = level;;) {
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
            	// 若头索引或者新增节点索引为null,则退出插入操作。可能是其他线程删除了头索引或者新增节点的索引
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K,V> n = r.node;
                    // 进行 Key值的比较
                    int c = cpr(cmp, key, n.key);
                    // 删除空值索引
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    // 同理,如果当前插入的Key大于(预想插入位置)n的Key,往后移动
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
    			// 上面找到节点要插入的位置,这里就插入(最开始是最顶层)
                if (j == insertionLevel) {
                	// 关联两个节点
                    if (!q.link(r, t))
                        break; // restart
                    // 如果新增节点的值为null,表示该节点已经被其他线程删除
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    // 若插入到最后一个节点,则停止
                    if (--insertionLevel == 0)
                        break splice;
                }
    			// 从上往插入
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                // 更新当前节点的两个指针
                q = q.down;
                r = q.right;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    然后我们关注下这个代码q.link(r, t)

    final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
        //获取调用索引对象的节点
        Node<K,V> n = node;
        //将新索引的链表后续索引(newSucc)设为老索引(succ)
        newSucc.right = succ;
        //如果调用索引对象的值不为null,通过无锁竞争CAS操作,将新索引替换老索引
        return n.value != null && casRight(succ, newSucc);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2.2.5 流程图总结代码含义

    假设我们原跳表如下:
    在这里插入图片描述

    上面的步骤分为4个:

    1. 第一步:寻找插入的位置,假设我们要插入的元素是13。示意图如下:
      在这里插入图片描述

    2. 构建Node节点,并在跳表的最底层执行插入:
      在这里插入图片描述

    3. 经过计算(随机),计算出新节点所需要插入的层级为3:那么在指定层开始构建索引:
      在这里插入图片描述

    4. 将新的索引插入到各个层级链表当中:
      在这里插入图片描述

    流程图可能会有误,但是总体流程是不会错的,概括如下:

    1. 计算出插入节点的Key,应该插入到跳表的哪一个位置,最终找到最底层链表的一个前置节点。
    2. 根据当前插入的K-V构建出一个新的Node节点,并插入到最底层节点中。
    3. 从最底层链表向上。不断构建新Node节点的索引Index
    4. 最后将Index不断地插入,维护到HeadIndex中,同时更新HeadIndex的层级。其中总层级可能会多一层也可能不会,这个过程是随机的(抛硬币).
    5. 最后注意一点的是,插入过程中,KV都不允许为null,同时每个步骤多会很多校验,比如是否有多线程并发操作、是否节点被删除等等。
    6. 同时整个操作流程中,还是有很多地方用到了CAS操作的,比如HeadIndex的替换,新老索引的替换等操作。

    2.3 元素获取流程

    获取流程和插入流程相比,就太简单了,毕竟插入操作,还需要维护跳表的结构,而元素获取操作并不会改变底层的数据结构,因此较为简单。但是我们还是大概了解下:

    public V get(Object key) {
        return doGet(key);
    }
    
    private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
        	// 同样,findPredecessor函数获取最底层链表的前置节点
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                // 若找到的前置节点没有后续节点,直接结束循环
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                // 同理,多线程的情况下可能造成数据不一致,此时跳出循环
                if (n != b.next)                
                    break;
                // 如果对应的值为null,跳出循环
                if ((v = n.value) == null) {    
                    n.helpDelete(b, f);
                    break;
                }
                // 若前置节点值为null或者后续节点为null,同样跳出循环
                if (b.value == null || v == n)  
                    break;
                // 若查找的键和后续节点的键相同,那么返回后续节点。
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                // 否则,跳出循环
                if (c < 0)
                    break outer;
                // 继续向当前链表的后续节点查找
                b = n;
                n = f;
            }
        }
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    总的来说就是:

    1. 校验(值是否被删除,是否不一致等)
    2. 根据跳表的结构,从上往下,从左往右去寻找节点。
  • 相关阅读:
    【编译原理】LL(1)文法
    pytorch模型可视化的方法总结
    嵌入式分享合集54
    【面试题01】找出数组中的最长前缀
    Docker Cgroups资源控制
    多御安全浏览器更新了这些设置,还是你熟悉的吗?
    vue项目 Editor.md使用示例
    什么是域控服务器?域控服务器功能?部署域控需要考虑因素?域控组策略功能?
    Echart 非apache 托管
    golang的垃圾回收算法之九写屏障
  • 原文地址:https://blog.csdn.net/Zong_0915/article/details/126139005