我们知道跳表是Redis
中的一个重要数据结构,其中Sorted Set
这种集合底层就用到了跳表。后来我才知道,原来Java
当中也有着跳表的集合实现。而我对此却非常陌生。因此趁此机会学习下。
我们先来看下一个简单的链表:
倘若我们要查找5,7,11这3个数字,那么我们分别需要比较5次,7次,11次。共计23次。
可见比较次数是非常多的,那么如果我们能够将某些比较过程跳过,那是不是可以减少比较的次数呢?我们在上述链表中,挑选几个数字作为一个比较“索引”,如图:
那么我们在查找5,7,11这3个数字的时候,可以做什么样的优化操作呢?
那么总的来说,比较次数由原来的23次降低为14次。当然,我们可以再加一层链表来提高查询速度,如图:
那么这样一层套一层的形式,就形成了所谓的跳表。跳表的特性有这么几点:
Level1
)包含了所有的元素。LevelN
的链表中(N>1
),那么这个元素必定在下层链表出现。还是以上图为例,那么想查找元素11,在跳表中的流程图如下:
跳表插入数据的流程如下:
K
,这里的K
采用随机的方式。若K
大于跳表的总层级,那么开辟新的一层,否则在对应的层级插入。假设我要插入元素13(图画的都是自然升序,比较特殊…)原有的层级是3级,假设K
=4:
倘若K
=2:
ConcurrentSkipListMap
的底层就是用了跳表来实现。我们可以发现,跳表有这么几个重要的元素信息:
Key-Value
。我们先来看下这个类当中的几个重要的静态内部类:
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
// ...
}
static class Index<K,V> {
final Node<K,V> node;
final Index<K,V> down;
volatile Index<K,V> right;
// ...
}
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}
}
Node
类:表示单链表的有序节点。里面存了Key、Value
以及指向下一个节点的next指针。Index
类:基于Node
类的一个索引层。上文说到每个节点有两个指针,在这里体现的就是down
(下一层链表的节点)指针和right
指针(同层的下一个节点)。HeadIndex
:维护索引层次,有等级的概念了。从静态内部类我们就可以看出ConcurrentSkipListMap
满足了跳表的基本特征。接下来我们看下构造函数。我们看下最常用的无参构造:
public ConcurrentSkipListMap() {
// 这个就是排序器,默认升序
this.comparator = null;
// 初始化动作
initialize();
}
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
null, null, 1);
}
我们可以看到,最主要的就是初始化了一个HeadIndex
(维护索引的一个结构),我们来将参数进行一一对应:
new HeadIndex<K,V>(
(前三个参数实际上用于父类的构造HeadIndex的父类是Index)
Index.node ---> new Node<K,V>(null, BASE_HEADER, null),
Index.down ---> null,
Index.right ---> null,
HeadIndex.level ---> 1
);
总的来说,就是创建出一个层级为1,最底层链表有一个初始元素。同时其相关指针都没有指向null
。
而接下来,就可以进行元素的插入操作了。
我们从插入元素最外层调用的API:put()
函数看起:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}
我们可以得知,ConcurrentSkipListMap
不允许插入的值为null
,否则抛异常,紧接着继续往下走,这里我先把代码进行简化,方便大家理解,分开来看有时候会更直观
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z;
// key同样不能为null,否则报空指针
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 根据key找到对应的位置,并插入到最底层的链表中
findInsertLocationAndInsert();
// 建立新的索引K,决定新元素建立的索引需要有几层,两种情况,一种是在已有的层级去建立,一种是新建立一个层级。
buildIndexAndUpdateHeadIndex();
return null;
}
步骤大致分为两步:
Key
需要插入的位置在哪,然后插入。代码主要通过findPredecessor()
函数来寻找元素的插入位置:
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
// 同样判断了key是否为null,可见这个校验方面还是比较多的
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
// 从Head节点开始,最高等级的Level处开始往后查找
for (Index<K,V> q = head, r = q.right, d;;) {
// 一般先向同级链表的右侧开始查找,元素单调递增
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
// 代表该节点已经被删除
if (n.value == null) {
// 删除该节点,即改变下指针指向,然后停止遍历
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
// 往后找,直到右边Node节点的Key > 当前Key
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
// 向下寻找
if ((d = q.down) == null)
return q.node;
q = d;
r = d.right;
}
}
}
翻译成大白话就是:
Key
的节点的时候,改为向下级链表查找。然后重复步骤1。最终找到的节点是插入元素的上一个节点。即前置节点。 例如,我想插入元素10,然后其上一个最接近的元素是9,那么9就是其前置节点。
元素的插入操作主要分为两个步骤:
next
指针指向前置节点原有的next
指针。前置节点的next
指针再指向当前插入元素。findInsertLocationAndInsert(){
outer: for (;;) {
// findPredecessor 函数主要用来确认key要插入的位置
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
// b是前置节点。n初始化是前置节点的next节点,即后继节点。当前元素理论上应该插入b和n之间
// 即 b(前置节点) --> 当前插入的节点 ---> n(后继节点)
Object v; int c;
// 先记录一下后继节点的后继节点,用于下面的后移操作。
Node<K,V> f = n.next;
// 虽然一开始n赋值为b.next,但是在多并发的情况下,可能会不一致,防止不一致读
if (n != b.next)
break;
// 若该节点已经被删除,则不执行后续的插入操作。
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
// 前置节点已经被删除(b = findPredecessor(key, cmp))
if (b.value == null || v == n)
break;
// 若当前需要插入的位置还要大一点,那么整体向后移动一位,本质上通过compareTo进行比较
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
// c==0说明,当前想要插入的位置,已经存在一个值了
if (c == 0) {
// onlyIfAbsent的作用:false-->如果待插入的元素存在则替换。 true-->如果待插入的元素存在则直接返回
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
// 将当前k-v进行包装,在插入。
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
}
到这里为止,待插入的元素已经在跳表中最底层链表上插入成功了,接下来就该去维护整个跳表的结构了:
buildIndexAndUpdateHeadIndex(){
// 生成随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
// 计算层级
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
// 这个head是HeadIndex类型的,里面有个level属性,记录着当前跳表的层级数
HeadIndex<K,V> h = head;
// 在已有的层级中建立索引
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
// 主要是从最底层向上寻找,直到指定的level层。
idx = new Index<K,V>(z, idx, null);
}
// 新创建一层链表结构,并维护索引
else {
// 索引层级数+1,然后建立索引
// 注意:此时只是新增了新节点的索引,并没有关联到跳表的真实体中
level = max + 1;
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
// 此时说明已经有其他的线程修改了头索引层数,因此退出循环
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
// 生成新的HeadIndex节点,指向最新的层级。一般新创建的链表中,只有两个索引,一个是头索引,一个是新增节点的索引。前者next指向后者。
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
// 整个过程就是复制一个新的HeadIndex节点,然后通过CAS操作,将新老HeadIndex对象进行替换。
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// 插入操作
headIndexInsert();
}
到这里为止,ConcurrentSkipListMap
主要目的是根据待插入元素Node创建出一个新的索引。但是并没有将这个索引插入到对应的层级中。 即上述一大串代码都是创建操作,只有最后一句话(伪代码)是插入操作,我们来看下。
headIndexInsert(){
// 获取level,该level为原头节点的层数,不包括新层
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
// 若头索引或者新增节点索引为null,则退出插入操作。可能是其他线程删除了头索引或者新增节点的索引
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// 进行 Key值的比较
int c = cpr(cmp, key, n.key);
// 删除空值索引
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
// 同理,如果当前插入的Key大于(预想插入位置)n的Key,往后移动
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
// 上面找到节点要插入的位置,这里就插入(最开始是最顶层)
if (j == insertionLevel) {
// 关联两个节点
if (!q.link(r, t))
break; // restart
// 如果新增节点的值为null,表示该节点已经被其他线程删除
if (t.node.value == null) {
findNode(key);
break splice;
}
// 若插入到最后一个节点,则停止
if (--insertionLevel == 0)
break splice;
}
// 从上往插入
if (--j >= insertionLevel && j < level)
t = t.down;
// 更新当前节点的两个指针
q = q.down;
r = q.right;
}
}
}
然后我们关注下这个代码q.link(r, t)
:
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
//获取调用索引对象的节点
Node<K,V> n = node;
//将新索引的链表后续索引(newSucc)设为老索引(succ)
newSucc.right = succ;
//如果调用索引对象的值不为null,通过无锁竞争CAS操作,将新索引替换老索引
return n.value != null && casRight(succ, newSucc);
}
假设我们原跳表如下:
上面的步骤分为4个:
第一步:寻找插入的位置,假设我们要插入的元素是13。示意图如下:
构建Node
节点,并在跳表的最底层执行插入:
经过计算(随机),计算出新节点所需要插入的层级为3:那么在指定层开始构建索引:
将新的索引插入到各个层级链表当中:
流程图可能会有误,但是总体流程是不会错的,概括如下:
Key
,应该插入到跳表的哪一个位置,最终找到最底层链表的一个前置节点。K-V
构建出一个新的Node
节点,并插入到最底层节点中。Node
节点的索引Index
。Index
不断地插入,维护到HeadIndex
中,同时更新HeadIndex
的层级。其中总层级可能会多一层也可能不会,这个过程是随机的(抛硬币).K
和V
都不允许为null
,同时每个步骤多会很多校验,比如是否有多线程并发操作、是否节点被删除等等。CAS
操作的,比如HeadIndex
的替换,新老索引的替换等操作。获取流程和插入流程相比,就太简单了,毕竟插入操作,还需要维护跳表的结构,而元素获取操作并不会改变底层的数据结构,因此较为简单。但是我们还是大概了解下:
public V get(Object key) {
return doGet(key);
}
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 同样,findPredecessor函数获取最底层链表的前置节点
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
// 若找到的前置节点没有后续节点,直接结束循环
if (n == null)
break outer;
Node<K,V> f = n.next;
// 同理,多线程的情况下可能造成数据不一致,此时跳出循环
if (n != b.next)
break;
// 如果对应的值为null,跳出循环
if ((v = n.value) == null) {
n.helpDelete(b, f);
break;
}
// 若前置节点值为null或者后续节点为null,同样跳出循环
if (b.value == null || v == n)
break;
// 若查找的键和后续节点的键相同,那么返回后续节点。
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 否则,跳出循环
if (c < 0)
break outer;
// 继续向当前链表的后续节点查找
b = n;
n = f;
}
}
return null;
}
总的来说就是: