前言
上篇分析完HashMap之后,这次来分析下ConcurrentHashMap这个并发条件下线程安全的HashMap又有哪些精妙绝伦、惊为天人的设计呢🤔
本文同样主要分析JDK1.8版本的ConcurrentHashMap
sizeCtl的作用
这个变量的作用比较复杂,起到标识位作用的同时也可以记录阈值等实际意义,主要有以下几种情况
sizeCtl值的情况 | 意义 |
---|---|
0 | 代表数组未初始化,且数组的初始容量为16 |
正数 | 如果数组未初始化,那么其记录的是数组的初始容量;如果数组已经初始化,那么其记录的就是i扩容阈值(数组的初始容量*0.75) |
-1 | 表示数组正在进行初始化 |
负数且不是-1 | 表示数组正在扩容,高16位是扩容标识戳,低16位是扩容线程数+1 |
扰动函数
static final int spread(int h) {
// HASH_BITS(01111111111111111111111111111111)保证hash值一定是为正数,因为符号位为0
// 高低位去异或运算,这里和HashMap的类似,让高位参与运算是为了哈希得更均匀
return (h ^ (h >>> 16)) & HASH_BITS;
}
table数组的初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// 如果sizeCtl小于0说明数组要么在初始化,要么在扩容,所以当前线程在让出CPU资源,也就是自旋
Thread.yield(); // lost initialization race; just spin
// 这里就是用CAS操作吧sizeCtl修改为-1,表示数组数组正在扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 这里采用双重校验锁的形式,防止别的线程拿到锁之后重复初始化操作
if ((tab = table) == null || tab.length == 0) {
// 下面就是简单的初始化Node数组并赋给table成员变量的过程
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// (n >>> 2)就是n/4,所以这里相当于 n * 0.75 ,也就是计算扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 初始化完成后,sizeCtl记录的就是数组的扩容阈值了
sizeCtl = sc;
}
break;
}
}
return tab;
}
putVal()方法分析
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 扰动函数计算哈希值
int hash = spread(key.hashCode());
int binCount = 0;
// 这里是一个死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 判断到table数组为null或者长度为0,此时要先进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// tabAt()就是用UnSafe类去获取对应下标的节点,下标的计算方式和HashMap的一样都是(n - 1) & hash)
// 如果桶位为null的话,可以CAS放到桶中,结束循环
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 这里的MOVED表示这是一个Forward节点,说明在扩容过程中已经迁移到新数组中去了
// 所以不能直接放进数组中,要进行协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 这里就用synchronized锁住了当前的桶位
// 注意这里只是锁住了当前这一个桶位以及其链表或红黑树,不影响其它桶位
// 相比HashTable一锁就是锁住整个数组来说提高了并发度,同时也保证了线程安全
synchronized (f) {
// 下面就是比较常规的变量这个链表中是否存在这个key,去决定下一步是覆盖还是插在链表中
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 这就是红黑树的情况了
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 这里就是判断链表节点是否达到8个,以及treeifyBin()方法中判断数组长度是否到达64来决定是否树化
// 这里的判断逻辑就和HashMap基本是一样的
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 维护集合的长度,包含fullAddCount()方法
addCount(1L, binCount);
return null;
}
当要往一个桶位添加节点的时候,synchronized只会锁住当前这个桶位,不会影响其它桶位,如图:
集合长度的维护
这里其实就和LongAdder
的设计基本一样:
就是把集合里元素的数量拆分成一个baseCount
和一个CounterCell
数组之和,每次新增一个元素需要对元素的数量+1,首先优先会对baseCount进行CAS+1的操作,如果失败则跳到fullAddCount
方法。
fullAddCount
方法会来到CounterCell数组中找一个位置进行+1,数组中的每一个CounterCell对象就维护这个一个value来+1,而找到我们要操作的位置就是通过当前线程进行哈希来定位,之后的+1也是通过CAS。而且这个过程中可能还会对CounterCell数组进行扩容,但是数组长度受到当前机器CPU核数的限制,因为超过同时竞争的线程数再扩容就意义不大。
其中fullAddCount
方法中是通过一个for的死循环,会进行很多次CAS操作直到成功为止。涉及到以下几个变量:
cellsBusy
CountCell 的操作标记位,如果正在修改、新建、操作 CountCell 数组中的元素会,会将其 cas 为 1,否则为0。wasUncontended
表示 cas 是否失败,如果失败则考虑操作升级。collide
是否冲突,如果冲突,则考虑扩容 CountCell 数组的长度。
其中涉及到一些double check保证线程安全,还有尝试更新失败后的操作可能是下一个循环重试、或者对线程的rehash再重试、或者是对baseCount进行CAS重试等操作,基本都是逐步升级的,以最小代价完成这个+1的操作。
这个fullAddCount
方法基本上思路就是这样,这里就不一行行分析了,因为这个方法非常复杂,可以看这个视频的讲解就带你一行行去看懂。(ConcurrentHashMap的fullAddCount方法就对应LongAdder的longAccumulate方法)
这样做的意义其实是为了提高高并发下统计数量的性能,也就是与AtomicLong
这种对一个value值进行CAS的操作相比,在大量线程竞争的时候,会有很多线程自旋造成CPU的较大的开销;而使用LongAdder
这种设计思路来维护一个数组就让多个线程来增加这个统计数的时候可以分散的操作,减少竞争冲突,提高了并发度,而我们要得到这个统计数就只需用baseCount去加上数组里所有元素之和就可以了。
高并发情况下LongAdder
性能是比AtomicLong
要好的,但是LongAdder
在计算统计值的时候如果有线程在修改那么可能就会有些许误差,所以一般也用于允许一定误差的场景来提高性能。
扩容
扩容标识sizeCtl
首先在扩容过程中,sizeCtl的值会经过一系列操作:
下面是resizeStamp()方法:
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
首先numberOfLeadingZeros(n)的作用是返回n的最高非0位前面的0的个数,比如n=16,因为16是 10000。一个int是32位,32-5就是27。那么 Integer.numberOfLeadingZeros(n)返回值就是27。
然后RESIZE_STAMP_BITS是16,1 << (RESIZE_STAMP_BITS - 1)
就是让1左移15位,最后得到1后面15个0。
两者相或,会让这个numberOfLeadingZeros(n)得到的这个数的第16位变成1,这个1待会会用到。addCount()方法中把这个resizeStamp()方法的返回值赋给变量rs
。
然后看到addCount()方法中在发现元素数量已经超过扩容阈值时,有这么一段:
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
rs << RESIZE_STAMP_SHIFT) + 2
这个表达式的作用就是把rs左移16位,这样一来上面原本rs的第16位的1就变成了符号位,这个也就变成了负数,原本的rs被移到了高16位;然后加2,低16位就是2,也就是表示当前扩容的线程数+1
。而上面这个CAS操作成功的话,sizeCtl就变成了这个很大的负数,也就标志着当前容器正式进入了扩容状态。
判断扩容是否结束也是靠这个标识,在transfer()方法中:
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
先对线程数-1,判断(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT
是否为真,其实就是形式变换了一下,本质上还是判断这个扩容标识值是否还原为原来初始化时的值,如果是则证明扩容迁移过程都完成了,finishing
标志为置为true。
协助扩容的时机
有两种情况会让这个线程一起协助扩容
-
addCount()方法
addCount方法在判断sizeCtl<0后会有这么一段:
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt);
这里CAS是把当前协助扩容的线程数+1,因为只有低16位与扩容线程数有关,所以尽管此时sizeCtl为负数也是直接+1的。CAS成功后就进入transfer()方法进行扩容
-
putVal()方法
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
上面分析putVal()方法中也分析了这一点,
MOVED
表示这是一个Forward节点,说明在扩容过程中已经迁移到新数组中去了,所以不能直接放进数组中,要进行协助扩容。helpTransfer
方法中同样有对线程数+1的操作:if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; }
多线程协助扩容
正式开始扩容后,新建一个新的数组,长度为原来的2倍,然后我们就需要把原数组中的数据迁移过去。由于是多线程环境,所以迁移过程由多个线程共同完成,它是将数组划分成了几个部分,每个部分由一个线程来进行迁移操作,顺序是从后往前迁移的。每个线程迁移的最小任务量是16个桶位。
每迁移完一个节点,就会把这个桶位的节点变为Forward节点(fwd)
,这种节点的hash为常量MOVED
,相当于一个标识位,表示当前节点已经迁移到新数组中去了。当某个putVal时遇到这种节点就要去协助扩容而不能直接插入了。
在多线程环境下,如果该线程触发了上面说的协助扩容的时机,就会去“领任务”,也就是被分配一段自己负责的迁移数据的范围,由transferIndex
记录下一个分配工作范围开始的位置,根据一个线程的任务量 stride
去计算此次分配任务的左边界bound
,再CAS去更新transferIndex
的值让他向左移动一个stride
为下一次分配做准备。
这里图解一下,为了容易看一点这里假设最小任务量是4,原数组有16个桶位,所以可以划分出4个工作量。每次分配完transferIndex就左移一个线程工作量,当扩容时有多个线程触发协助扩容条件就会进来拿到一个自己要帮助迁移的工作范围,做完之后可以领下一个直到全部迁移完。所以同一时刻可能有多个不同的线程在一起帮助扩容,利用了多线程的特点提高了迁移的效率。
transfer()方法源码解析
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride就是每次给线程分配的任务数
// 如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果是第一个发起扩容的线程(非协助迁移的),此时新数组为null
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 两倍扩容创建新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 记录线程开始迁移的桶位,从后往前迁移
transferIndex = n;
}
// 记录新数组的末尾
int nextn = nextTab.length;
// 已经迁移的桶位,会用ForwardingNode节点占位(这个节点的hash值为-1(MOVED))
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// i记录当前正在迁移桶位的索引值
// bound记录下一次任务迁移的开始桶位
// --i也是从后往前迁移的体现
// --i >= bound 成立表示当前线程分配的迁移任务还没有完成
if (--i >= bound || finishing)
advance = false;
// 没有元素需要迁移 => 后续会去将扩容线程数减1,并判断扩容是否完成
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
// 因为迁移是从后往前的,所以用 nextIndex - stride
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 如果没有更多的需要迁移的桶位,就进入该if
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
// 这里实际上是0.75n * 2,用位运算加速
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 扩容任务线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断当前所有扩容任务线程是否都执行完成
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 所有扩容线程都执行完,标识结束
finishing = advance = true;
i = n; // recheck before commit
}
}
// 当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 当前节点已经被迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 当前节点需要迁移,加锁迁移,保证多线程安全
// 此处的迁移逻辑和HashMap的基本一样,也就是拆分高低位链表,所以就不再重复分析了
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
与JDK1.7相比
1.7 中的 concurrentHashMap 使用了分段锁的机制,定义了Segment
,每个 Segment 是 一个单独的容器,单独获取一把ReentrantLock
锁。Segment 都是一个类似 HashMap 数组的结构,它可以扩容,它的冲突会转化为链表。但是 Segment 的个数一但初始化就不能改变。
1.8的ConCurrentHashMap大部分采用的是synchronized与CAS操作去保证线程安全的,synchronized现在有锁升级机制性能好很多了,CAS是用sun.misc.Unsafe
类实现的底层是CMPXCHG指令,putVal的时候加锁只会锁住一个桶位不影响其他的,容器内元素的长度采用的是LongAdder的数组累加的提高并发的设计,扩容时允许多个线程一起协助迁移数据,还有一个sizeCtl的变量来标识当前容器的各种状态、扩容阈值或者扩容线程数。
关于ConcurrentHashMap不能存null值
ConcurrentHashMap无论是key还是value,都是不能为null的!
二义性问题
如果ConcurrentHashMap允许插入 null,那么此时就会有二义性问题:
- key不存在,所以返回 null。
- key的value就是 null,所以返回的就是它原本的 null 值。
可以看出这就是 ConcurrentHashMap 的二义性问题,那为什么 HashMap 就不怕二义性问题呢?
可证伪的 HashMap
这是因为 HashMap 的设计是给单线程使用的,所以如果查询到了 null 值,我们可以通过hashMap.containsKey(key)
的方法来区分到底是key本来就不存在?还是key的value就为null?这样二义性问题就得到了解决,所以 HashMap 不怕二义性问题。
不可证伪的 ConcurrentHashMap
而 ConcurrentHashMap 就不一样了,因为 ConcurrentHashMap 使用的场景是多线程,所以它的情况更加复杂。
我们假设 ConcurrentHashMap 可以存入 null 值,有这样一个场景,现在有一个线程 A 调用了get方法获得一个key的值,得到null,这个key在这一刻实际上是不存在的key,但是因为二义性我们无法确定是哪一种情况,所以再调用了 containsKey(key)
,原本应该返回false的,但是由于是多线程环境,在这之前又有一个线程B去put了这个key进去(value不为null),导致 containsKey(key)
返回变成true了,最后让我们以为这个key是存在的只是value为null,实际上那个时候key压根就不存在。
也就是说,多线程的状况非常复杂,我们没办法判断某一个时刻返回的 null 值,到底是值为 null,还是压根就不存在,也就是二义性问题不可被证伪,所以 ConcurrentHashMap 才会在源码中这样设计,直接杜绝 key 或 value 为 null 的歧义问题。
对于这个问题,有人问过 ConcurrentHashMap 的作者 Doug Lea,他的回复就是:不容忍在并发场景下出现歧义!
还有一点就是Hashtable也是不允许key和value为null的,所以也是为了这两个线程安全的哈希表在做迁移的时候保持一致性。
一些感想
看了ConcurrentHashMap的源码以及之前的HashMap源码不得不感叹真的是太精妙了,不仅大量使用位运算来提高性能,甚至一个变量都掰成高16位和低16位使用,大量使用CAS能不加锁的地方绝不加锁,最绝的是一个统计长度的变量都能拆成一个数组的和来优化,真的是对性能追求到了极致,优化到了极致。最后说一句:Doug Lea大神牛逼!
参考
https://xilidou.com/2018/11/27/LongAdder/