ConcurrentHashMap(JDK8)源码分析及夺命9连问
ConcurrentHashMap(jdk1.8)
底层操作
//数组对象 = 头对象(8bytes) + 指针对象(4bytes) + 数组长度(4bytes) + 数据(xxxbytes)
Class<?> ak = Node[].class; //Node[]Class 对象ak
ABASE = U.arrayBaseOffset(ak); //头对象 + 指针对象 + 数组对象 = 16
int scale = U.arrayIndexScale(ak); //单个元素的大小 int(4bytes) long(8bytes)
if ((scale & (scale - 1)) != 0) //scale必须是2的幂
throw new Error("data type scale not a power of two");
/**
Integer.numberOfLeadingZeros(n) 计算n的二进制从最高有效位往左有几个0
案例: n = 4 二进制 100 32-3=29
ASHIFT = 31-29 = 2
**/
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
/**
该方法的目的: 取出table[i]
(i << ASHIFT) + ABASE i是角标
接着上面的案例: i* 4 + 16
相当于数组的寻址公式 a[i]_address = data_type_size*i + base_adress
**/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
该方法的目的: CAS操作 更新值为v table[i] = v
((long)i << ASHIFT) + ABASE 这个上面说了就是寻址公式能够找到该元素 在底层所在的位置
**/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
一、 容器初始化
1.源码分析
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
//基于一个Map集合,构建一个ConcurrentHashMap
//初始容量为16
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
2.sizeCtl含义解释
注意: 以上这些构造方法中都涉及到一个变量sizeCtl,这个变量是一个非常重要的变量。
而且具有非常丰富的含义,它的值不同,对应的含义也不一样,这里先做一些简单说明:
sizeCtl
为0, 代表数组未初始化,且数组的初始容量为16
sizeCtl
为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值 (数组的初始容量*0.75 sc = n - (n >>> 2)??
sizeCtl
为-1,表示数组正在进行初始化
sizeCtl
小于0,并且不是-1,表示数组正在扩容 。高16位表示扩容标识戳,低16位标识当前正在参与扩容的个数-1
二、添加安全
1.源码分析
1.1 添加元素putVal方法
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果有空值或者空键,直接抛出异常
if (key == null || value == null) throw new NullPointerException();
//基于Key计算hash值,并进行一定的扰动
//这个值一定是一个正数(正数很重要),方便后面添加元素判断该节点的类型
int hash = spread(key.hashCode());
int binCount = 0;
//开始死循环 这个死循环 对应着接下来的casTabAt
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//判断该数组是否已经初始化
if (tab == null || (n = tab.length) == 0)
//懒加载初始化
tab = initTable();
/**
f = table[i]
判断 该桶位的第一个值是否为空
如果 f == null 说明该桶位无其它值,可以直接赋值
赋值 使用CAS操作 并发控制对该桶位的赋值操作 防止其它线程同时操作该桶位
table[i] = new Node(hash,key,value,null)
**/
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
/**以下情况都是 该桶位有值的情况 但是该桶位分为3种
1. 仅有一个Node 还未链化 和 树化
2. 链化
3. 树化
**/
//判断该桶位第一个值的hash 是否为MOVED(-1) 若为-1 说明数组正在扩容
//并执行协助扩容 helpTransfer() 该方法后面会详解
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//做添加 或者 更新 节点的操作
else {
V oldVal = null;
//这里锁住了当前的桶位 该锁的粒度仅限于数组的各个桶位
synchronized (f) {
/**
Double Check (双端校验机制) 加固保险 这一步很重要
防止在进入该锁之前 有其它的线程操作了该桶位
**/
if (tabAt(tab, i) == f) {
//fh是 f的hash值 大于0 说明该桶位是链表
if (fh >= 0) {
//与hashMap类似 bitCount记录的是 该桶位下链表的长度
binCount = 1;
//又是一个死循环,实际上就是遍历该桶位下的链表所有节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//判断该链表下所有节点的key是否和要put的key相等
//若相等做更新操作
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//若到链表末尾都不相等 则尾插入该链表
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//若hash值不>=0 说明 该桶位有可能树化了 那么接下来做关于红黑树添加或更新的操作
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
/**
判断链表是否为0
若为0 说明没有对 链表进行过操作
若不为0 说明操作过该桶位的链表 那么就要判断该链表是否需要树化
**/
if (binCount != 0) {
// binCount >= 8时数化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//触发数组的扩容
addCount(1L, binCount);
return null;
}
总结: 通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素
加锁操作,这样一方面保障元素添加时,多线程的安全,同时对某个桶位加锁不会影响
其他桶位的操作,进一步提升多线程的并发效率
1.2 数组初始化,initTable方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//cas+自旋,保证线程安全,对数组进行初始化操作
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl的值(-1)小于0,说明此时正在初始化,让出cpu
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//cas修改sizeCtl值为-1,修改成功,进行数组初始化,失败,继续自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//sizeCtl为0,取默认长度16,否则取sizeCtl的值
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//基于初始长度 构建数组对象
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容阈值,并赋值给sc 相当于0.75*初始长度
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
总结: initTable方法中关键点:
1. sizeCtl这个参数的意义。 (详情参考上一小节)
2. 当数组初始化成功后, sizeCtl 就会变为 该数组的扩容阈值 n- n>>>2 相当于 n*0.75
三、 扩容安全
1.源码分析
1.1 触发扩容addCount方法
在putVal方法最后会 执行 addCount(1L, binCount); bitCount =0
private final void addCount(long x, int check) {
/**
该段代码先略过 后面会详细讲
这里是需要知道 s 是当前存入Map中的key-value 总数量
**/
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//这里从putVal 方法进来 check=0 一定会成立的
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
/**
分解while 判断的3个条件 可见这3个条件都是 && 必须所有条件成立才能进入循环
1.s >= (long)(sc = sizeCtl)
此时 sc是扩容阈值 s是Map中的元素总数量
若数组需要扩容 该条件必成立
2.(tab = table) != null
数组不为空 这是必然的
3.(n = tab.length) < MAXIMUM_CAPACITY
数组的长度要小于 1<<30 一般情况下也是必然的
则进入此循环
**/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
/**
private static int RESIZE_STAMP_BITS = 16
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
举例计算:
数组长度为16
1.Integer.numberOfLeadingZeros(n)
该方法前面说过了 16的2进制为 10000
那么该值为:32-5=27 二进制为 0000 0000 0000 0000 0000 0000 0001 1011
2.(1 << (RESIZE_STAMP_BITS - 1))
该步骤加上后面的左移16 意义就是将sizeCtl变为负数
001 右移15位 结果是 0000 0000 0000 0000 1000 0000 0000 0000
3.前后两者值进行 或位运算
rs = 0000 0000 0000 0000 1000 0000 0001 1011
**/
int rs = resizeStamp(n);
/**
当sc = sizeCtl 小于0时有两种情况
1. sc=-1 数组正在初始化
2. sc=-(1+n) 数组正在扩容 n是线程个数
**/
if (sc < 0) {
/**
这样一段代码 主要是限制
在对数组扩容前期,要初始化新数组以及赋值一些扩容所需的初始值等这些操作仅只能一个线程来完成,该阶段限制其它线程进入
**/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//当新数组以及其它扩容初始值设置完成后,才允许其它线程进入协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
/**
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
private static int RESIZE_STAMP_BITS = 16;
第一个参与扩容弄个的线程 会先走该判断
else if中的代码分两块看
1. (rs << RESIZE_STAMP_SHIFT) + 2)
由上面给出的常量可知 RESIZE_STAMP_SHIFT = 16
继续按照之前例子得出的结果执行
rs = 0000 0000 0000 0000 1000 0000 0001 1011
结果 = 1000 0000 0001 1011 0000 0000 0010
1.前16位中有数组的信息且 sizeCtl会变为负数(负数我就不用多说了吧)
2.后16位记录当前有多少个线程参与扩容
2. U.compareAndSwapInt(this, SIZECTL, sc,上面的结果)
将sizeCtl 置为小于-1的负数 然后执行transfer(tab, null) 扩容方法。
**/
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
1.2 扩容transfer方法
该部分代码很长 也很难读。 在解析该部分代码之前,我先举个场景有助于大家理解。
场景:农民工搬砖,要求农民工们把 工地1的砖头 搬到新盖的工地2中去。
原数组: 工地1 nextTable(新数组):工地2 线程: 农民工 数组角标(桶位): 一块砖头
stride: 一位农民工的小目标(假设为16块砖头)
注意:砖头代表散列表中的桶位 且并不是一次搬运16个 而是1个个搬 16仅仅是个阶段性的目标transferIndex:工地1中砖头的总工作量
nextIndex: transferIndex = n
nextBound: transferIndex - stride
bound = nextBound
[nexBound ,nextIndex-1] :农民工小目标要搬运从nextBound位置到nextIndex位置的砖块
注意: 有一个重要的一点, 原数组迁移到新数组的过程, 是从原数组的末尾开始 往数组开头 做迁移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n为数组长度 stride为步长
int n = tab.length, stride;
/**
这里会根据CPU的数量来决定步长(stride)
该算法目的是:当CPU个数多,且需要扩容的数组长度越大 会充分利用CPU的性能。
**/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
/**
第一个线程参与扩容时会进入此判断 addCount-> transfer(tab, null)
该段代码相当于执行扩容之前的一个初始化操作
**/
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//创建一个原数组容量2倍的 新数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//相当于所有线程要工作的目的地 新数组
nextTable = nextTab;
//目标量 要迁移的桶位量 也就是原数组的长度
transferIndex = n;
}
//新数组的长度
int nextn = nextTab.length;
//转发节点 一个特殊的Node节点(属性: hash:-1,nextTable:nextTab)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//显然又是一个死循环
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
/**
该while循环中有三个判断,不管进入哪一个最终都会advance=false 退出该while循环
1.(--i >= bound || finishing)
2.(nextIndex = transferIndex) <= 0
以上两个判断先不看, 只需要从第2个条件中得到个赋值操作:nextIndex = transferIndex
3.U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound)
nextBound = (nextIndex > stride ?nextIndex - stride : 0)
nextBound = nextIndex-stride
当前线程的阶段性工作量 [nextBound,nextIndex]
下次的总目标就变成了: [0,nextBound] 也就是 transferIndex = nextBound
nextIndex - 1
**/
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
//由于数组角标从0开始 则需要-1
i = nextIndex - 1;
advance = false;
}
}
//这段是结束扩容的代码
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//判断当前桶位是否为空,若为空,则往该位置放入一个转发节点 标记该桶位在做扩容
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果该桶位不为空,则判断该桶位的第一个值的hash值是否为-1
//若为-1则说明该桶位已经被处理过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//若该桶位既不为空 hash也不为-1 则对该桶位进行迁移
else {
//锁住该桶位
synchronized (f) {
//DCL Double Check Lock 双端检索机制
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//如果fh>=0 说明该桶位是一个链表
if (fh >= 0) {
int runBit = fh & n; //该值用来区分高位链表 和低位链表的
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//如果fh <0 就要判断该桶位是否 树化了
else if (f instanceof TreeBin) {
.... //树化桶位的迁移操作 本次解析不考虑红黑树相关的
}
}
}
}
}
}
}
四、计数
1.源码分析
1.1 sumCount方法
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
代码很清晰,也就是 baseCount 与 CounterCell数组中所有元素的value属性 相加的总和。
private transient volatile long baseCount;
/**
* size is a power of 2. 该数组的大小是2的N次幂
*/
private transient volatile CounterCell[] counterCells;
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value; //volatile修饰的value
CounterCell(long x) { value = x; }
}
下面主要说说CounterCell的工作原理,以及在多线程环境下,如何计数的。
1.2 CounterCell[]
先明确两点,1. 计数的方式 2.在哪里会调用计数
1.计数的方式: 要么修改了baseCount 要么修改了CounterCell对象中的value值
2.在哪里会调用计数:
? addCount()方法中有关于计数的代码
putVal() 中调用了 addCount(1L,binCount)
remove() 调用了 addCount(-1L,-1)
那么我们直接来看看 addCount,也就是我们在触发扩容中留下的第一段代码
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
........... //触发扩容的方法,与计数无关,所以省略
}
as是CounterCell数组的引用
b=0
s=0
CounterCell[] as; long b, s;
案例假设1:
假设counterCells没有初始化,且有4个线程,同时执行该方法。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
(as = counterCells) != null
, 因为counterCells未初始化 所以为false
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
目的是修改 baseCount
的值
四个线程同时执行该CAS 只能有一个线程执行成功。 那么只有一个返回true 其它三个返回false
这里因为是取反操作,所以会有三个线程进入方法体。
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
按照之前的假设来说CounterCells没有初始化, 因此as ==null
返回true。因此会进入该方法体。
三个线程都会执行 fullAddCount(x, uncontended);
这个方法就是精确计算计数的。
案例假设2:
假设counterCells 已经初始化,且size大于0, 同样有4个线程进来。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
这个判断4个线程都会成功,四个线程都会执行
(a = as[ThreadLocalRandom.getProbe() & m]) == null
ThreadLocalRandom.getProbe()
这个方法是返回一个随机数,彼此之间不同。
as[ThreadLocalRandom.getProbe() & m]
相当于寻址算法
假设在极端情况下 4个线程计算出的下标是同一个。
若垓下表处元素为null, 那么4个线程都会去执行 fullAddCount(x, uncontended)
若该下表处元素都不为null,那4个线程都会执行
!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
由于该操作是CAS操作,那么只有一个线程成功修改CounterCell中的value值,完成这个线程的计数。
那么剩下3个线程都会执行fullAddCount(x, uncontended)
1.3 fullAddCount(x,uncontended) 方法
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
这段是为了让线程获取随机数
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 修改某个CounterCell,有可能会对counterCells 进行扩容
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//初始化counterCells
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
// 修改baseCount的值
}
外层是一个无线循环, 三个判断分支只要执行成功一个,就可以跳出循环。
否则一直执行。最终结果要么修改 baseCount,要么修改CounterCell
先看第二个分支 初始化counterCells
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
....
}
cellsBusy == 0 && counterCells == as
这两个判断是并发的控制
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)
这个是CAS修改cellsBusy,
cellsBusy 是1指在初始化或是在操作CounterCells.
执行成功的那个线程,初始化counterCells。 然后看方法体
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
//这里又是个寻址算法
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
counterCell 的初始容量是2,并且把new 一个CounterCell对象,记录了数值 初始化完毕后
将cellsBusy设置为0。
再来看第一个分支
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { // 此时该CounterCell数组的这个桶位是空
if (cellsBusy == 0) { //且当前没有线程操作该数组
CounterCell r = new CounterCell(x); // new 一个CounterCell
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //CAS设置cellsBusy为1 锁定该数组
boolean created = false; //开关
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) { //DCL 判断
rs[j] = r; //将前面new的对象放入该数组中
created = true; //开关
}
} finally {
cellsBusy = 0; //操作完成后 改为0
}
if (created)
break; // 设置成功的情况下,退出
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break; //若对应数组角标位置的值 不为空 则在其基础上 增加 并退出
else if (counterCells != as || n >= NCPU) //当数组size 大于CPU 数量,不让数组扩容
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
五、面试问题
1.并发Map存储数据结构什么样子?
与HashMap一样 Node数组 + 链表 + 红黑树
Node类中的属性有 hash,key,value,next
2.ConcurrentHashMap负载因子可以指定吗?
HashMap的负载因子可以修改, 但是ConcurrentHashMap不可以
3.Node.hash字段一般情况下值必须>=0,为什么?
1.首先这个hash字段的值 在HashMap与ConcurrentHashMap中是有一点不同, 在HashMap中由key的hashcode值 高16为异或其低16位,而在ConcurrentHashMap中多了个 或运算 0x7FFFFFFF 目的是让最终的hash值要大于等于0.
2.在添加k-v 时,会判断该桶位的第一个值的hash值是否大于等于0
当大于0时,说明该桶位可以添加
若等于-1 ,说明该桶位已经被扩容迁移。 因为原散列表扩容迁移到新散列表,是按照从后往前一个桶一个桶迁移,每迁移一个桶,那么会在这个桶的首节点放置一个ForwardingNode,其hash值是MOVED-1,其中还有一个属性是nextTable是指向新散列表的位置>
4.讲一讲sizeCtl字段?
sizeCtl在HashMap中是个很重要的字段,也是能够控制并发的一个关键字段。
sizeCtl为0时,代表该散列表还未初始化,初始化长度的默认值为16
sizeCtl大于0时,有两种情况
1.若该散列表还未初始化,初始化散列表的默认长度为该sizeCtl值的大小
2.若该散列表已经初始化,sizeCtl代表的是该散列表的扩容阈值
sizeCtl小于0时,同样有两种情况。
1.sizeCtl为-1 说明该散列表正在初始化
2.sizeCtl小于-1 说明该散列表正在扩容
且sizeCtl的 高16位为该散列表扩容前的信息 低16位代表有多少个线程参与扩容
5.扩容准备时的sizeCtl的值是怎么设置的?
先使用Integer.numberOfLeadingZeros() 这个方法计算散列表的长度的前面有几个0并得出结果,然后跟1<<15位进行或运算,之后再 <<16位 得到个负数,再加2。
6.ConcurrentHashMap如何保证写数据安全的?
1.如果要写的数据的该桶位在散列表是空的,那么就会依赖CAS自旋来实现线程安全的。
2.如果要写的数据该桶位不为空,则使用synchronized来所住该桶位,来保证写操作的线程安全。
7.描述一下hash寻址算法?
寻址算法与普通的HashMap是一致的。
8.ConcurrentHashMap如何统计当前散列表数据量的?
在普通的HashMap中使用的是int型的size来统计散列表中的数据量。
而在ConcurrentHashMap中,为了保证在多线程环境下统计散列表的线程安全,采用了LongAdder的方式统计数量。而没有使用AtomicLong的形式。
9.那AtomicLong与LongAdder有什么区别,为什么在ConcurrentHashMap中要使用LongAdder?
先说下他们两者的相同点:AtomicLong 与LongAdder都是能够保证自增情况下的线程安全
不同点:他们在应对更高并发的情况下,LongAdder的性能要远高于AtomicLong.
举个多线程并发下的例子:
AtomicLong当多个线程同时操作AtomicLong 自增,那么由于底层是使用CAS操作一个变量,那么同一时刻只能由一个线程执行成功,那么其它线程都会进入下一次自旋CAS中,且其它线程手中的值也都是过期数据,他们还需要从主内存中获取最新值后再去执行CAS,这样会大大浪费了CPU的性能,大部分时间耗费在,循环和获取最新值的过程中。
LongAdder 它底层是由一个BaseCount 和 CounterCells数组以及该数组内部的元素是CounterCell组成。最终的计算值 是BaseCount和所有的CounterCell的value值的总和。 在单线程环境下,只需要去增加这个BaseCount字段就行,但是在多线程环境下,只有一个线程会操作成功BaseCount,那剩下的线程会先根据自己的hash值 经过寻址运算计算出在CounterCells中的角标位,去增加对应桶位的CounterCell的value值,如果此时有其它线程正好也要修改该桶位的value值的话,cellsBusy这个字段会来控制这个COunterCells数组是否正在被操作,当cellsBusy为1时,说明该CounterCells数组正在被操作,那么该线程会去修改BaseCount字段,若BaseCount也未成功修改,那么会进入下一次循环。
AtomicLong是把所有线程的请求打在一个点上, 而LongAdder则是使用空间换时间的方法,让进来的多个线程的请求打在多个点上,且不需要重复从主内存中获取新值减少不必要的IO操作,这样效率会明显比AtomicLong高很多。