table 数组的第一次初始化会发成在 putVal 的时候,但是在构造器中已经设置了容量和阈值。
// 默认容量是 16. public ConcurrentHashMap() {}
##2、带初始容量的有参构造函数
如果提前能知道容量的大小,可以在创建对象时就给出初始容量。
public ConcurrentHashMap(int initialCapacity) { // 下限检测 if (initialCapacity < 0) throw new IllegalArgumentException(); // cap 是大于 1.5*initialCapacity 的最小的 2^n, 最大不超过 2^30. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // table 为初始化,sizeCtl > 0,sizeCtl 表示 table 的初始容量。 this.sizeCtl = cap; }
用户给定的初始容量不一定符合2^n,所以在构造方法中又对用户给定的初始容量进行了重新计算。
initialCapacity + (initialCapacity >>> 1) + 1 = 1.5 * initialCapacity + 1 之所以 +1 是为了防止 1.5 * initialCapacity 恰好等于 2 的次幂。
public ConcurrentHashMap(int initialCapacity, float loadFactor) { // 给定线程数为1,调用别的构造器。 this(initialCapacity, loadFactor, 1); }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 下界校验。 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 线程并发的数的上限是 table 的容量。因为多了没用。 if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads // 接下来是采用给定的负载因子计算 sizeCtl。 long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); // 赋值。 this.sizeCtl = cap; }
添加操作和 HashMap添加的主体流程基本相同:
ConcurrentHashMap 中不同的是,它在添加元素时能增加了防并发功能。
向 table 中添加元素的整个过程都放在死循环里面执行,当插入操作执行成功时,通过 break 跳出循环。死循环的作用是当 CAS 一次失败后,重新循环,可以有再次执行的机会。乐观锁的思路。
for (Node<K,V>[] tab = table;;) { // 插入操作的所有代码都在里面。 }
在第3 步中,使用 CAS 的方式向 table 中添加元素。别的线程如果比当前线程快一瞬间向 table[i] 处添加元素,那当前线程的 CAS 会执行失败,同样如果当前线程快一瞬间,那其他线程的 CAS 也就失败了。从而避免了 table[i] 处值的覆盖。
casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))
在第 6、7步执行的时候,使用了 synchronize 加锁。如果 i 位置是个链表或者树,线程对 i 都得先拿到链表的头结点 或者 树的根,这就是 table[i]。当前线程对 table[i] 加了悲观锁,别的线程也在 i 位置执行插入,再拿锁时就会陷入阻塞,直到当前线程执行释放锁。这就保证了线程在 i 桶位置执行插入操作时的线程同步。
除此以外,当别的线程在 j 位置处执行插入操作时,可以直接执行,而不会被阻塞。这种细粒度的锁,既保证了 插入的线程安全,又保证了多线程同时插入的效率不至于太低。
f = table[i]; synchronized (f){ //对桶遍历,执行替代或者追加。 }
初始化table
这里需要注意 sizeCtl 在不同时候的代表的意思,而且 是 while 循环 + CAS 修改 sizeCtl 的值,还是乐观锁方式。
table | sizeCtl | sizeCtl 表示的意思 |
---|---|---|
空 | 0 | 使用调用无参构造器的结果,table 的容量默认为 16。 |
空 | 大于0 | 使用有参构造器的结果, sizeCtl 表示初始的 table 容量。 |
空 | 小于0 | 表示有其他线程正在对 table 执行扩容,sizeCtl表示扩容的有多少个线程正在执行扩容。 |
空 | 等于 -1 | 表示当前线程正在对 table 执行初始化。 |
不为空 | 大于0 | 表示 table 的扩容阈值。 |
初始化源码:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // 只有当 table 是空的时候,才执行初始化。 while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // sizeCtl 小于 0,表示有其他线程正在对 table 执行初始化。 // 当前线程要放弃 CPU 的使用,这轮循环就结束了。 // (如果下轮循环,还是其他线程正在对 table 执行初始化,继续放弃。) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // CAS 将 sizeCtl 的值从 小于0 修改成 -1. // sizeCtl == -1,表示 table 正在执行初始化。 // (当别的线程进到这个方法,执行初始化时,发现 sizeCtl<0,就放弃 CPU的使用权) try { if ((tab = table) == null || tab.length == 0) { // sc 的值是 table 的容量,如果 sc == 0,使用默认容量 16。 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化 table。 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 当 table 不为空时,sizeCtl 的值是扩容阈值。 // 这里先计算 sc == n - (n / 4) = 0.75 * n sc = n - (n >>> 2); } } finally { // 设置扩容阈值 sizeCtl = sc。 sizeCtl = sc; } break; } } // 返回初始化后的 table。 return tab; }
添加元素源码:
public V put(K key, V value) { return putVal(key, value, false); }
// onlyIfAbsent 控制是否允许替代。fasle 允许。 final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 计算 hash 值,加了扰动的。 int hash = spread(key.hashCode()); int binCount = 0; // 死循环。 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // table 是空的,初始化 table。 // 初始化完成后,第一次循环就结束了,第二次循环时,返现 table 非空,执行下面的逻辑。 tab = initTable(); // (n - 1) & hash : 计算 key 的桶。 // tabAt(tab,i): 拿出 table[i]。 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // table[i] == null, 说明 i 桶中是空的,最简单的情况了。 // CAS 将 k-v 添加到 table[i] 位置。 // 如果 CAS 失败,那就是被其他线程抢先了,这个循环执行完毕, // 下一次循环中再添加 k-v,只是不一定会在这个else if 块中。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // CAS 成功,跳出死循环。 break; // no lock when adding to empty bin } // 到这里,说明 i 位置的桶中有元素, // f=table[i],f.hash == MOVED == -1,表示 table 正在扩容。 // 此时不允许加元素,得先协助去扩容。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 对 table[i] 加锁,把锁加到了桶上。 synchronized (f) { // 两次验证。有可能别的线程在 i 桶里添了个节点后,将链表变成了树。 // 那这样的话 f 就不是原来的值了。 if (tabAt(tab, i) == f) { // hashcode 大于 0,是链表。 if (fh >= 0) { // binCount 用来记录链表上的节点个数。 binCount = 1; // 遍历链表,又是个死循环。 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果 hashcode 相同 且 key 相同,那是替代。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 拿出老值。 oldVal = e.val; if (!onlyIfAbsent) // 新值替代老值。 e.val = value; // 跳出遍历链表的循环。 break; } // 到这里,key 与 e 的 hashcode不相同。 // pred 临时记录 e。 Node<K,V> pred = e; // 换下一个节点。 // 如果下一个节点是 null,表示已经到链表 if ((e = e.next) == null) { // 将 k-v 追加到链尾。 pred.next = new Node<K,V>(hash, key, value, null); // 跳出遍历链表的循环。 break; } } } // i 位置的桶是树,执行树的替代或者插入。 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 链表上节点个数大于等于 8,且 table 容量大于等于 64 时,链表树化。 // 链表上节点个数大于等于 8,但 table 容量小于 64 时,扩容。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果是替代,会有老值,要将老值返回。 if (oldVal != null) return oldVal; break; } } } // 新的元素到这里插完了,接下来还要获取 table 中的总元素个数 // 如果超过了阈值,那还得扩容。 addCount(1L, binCount); return null; }
向 table[] 中插入元素后还要再判断 当前 table 中的全部元素数量是否超过了阈值,如果超了,那要扩容。所以这里就需要做两件事情:
因为ConcurrentHashMap 是针对多线程并发设计的, 在其中设计了两个字段用于统计 table 中元素的个数。
// 单线程下,线程添加的元素个数累加在 baseCount 字段上。 private transient volatile long baseCount; // 多线程下,竞争失败的线程添加的元素个数x,封装在 new CounterCell(x) 对象中,最后放入 CounterCell[]。 // CounterCell[] 初始长度是2,但可以扩大器长度,长度上限是 CPU 的核数。 private transient volatile CounterCell[] counterCells;
全部元素的个数 = bashCount + counterCells 中所有元素的和。
HashMap 中 直接 ++size
就统计了元素个数。ConcurrentHashMap 采用 CAS 的方式增加元素个数。baseCount 是元素基数,x 是新插入的元素个数。
U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
假设 t1、t2、t3、t4 四个线程每个线程都向 table 中添加了一个元素。接下来是统计当前 table 中元素的个数。
假设 t1 在 cas 时竞争成功了,则 t1 增加的个数计数到了 baseCount 上。
t2 竞争失败,发现 countersCell 数组还是个空的,初始化数组,设定容量是2。将自己添加的元素个数放到数组中。
CounterCell[] rs = new CounterCell[2]; // 假设 t1 的 probe 计算出的索引是1。 rs[1] = new CounterCell(1);
t3 也竞争失败了,此时 countersCell 不为空,直接 rs[0] = new CounterCell(1);
这里假设 t3 的 probe 计算出的索引是 0。
t4 也竞争失败了,此时 countersCell 不为空,但此时 countersCell 中放满了(不一定是满了,出现索引碰撞也可以)。
结局1: t4 增加的元素的个数追加到 rs 其中的某个元素上,[counterCell[2],counterCell[1]]
或者 [counterCell[1],counterCell[2]]
。具体是加谁上,要看 t4 和谁索引碰撞了。
结局2: 将 countersCell 数组的长度扩大 2 倍。再将 t4 增加的个数添加进去。
最后 t4 计算 table 中全部元素的个数时,就等于:baseCount + countersCell 中全部元素求和。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // counterCells 为空 且 CAS 修改 BASECOUNT 从 baseCount 到 b + x 成功。 // 说明此时,没有发生并发,table 中总元素个数 s == b + x == baseCount + 1。 // 如果 counterCells 或者 CAS 修改 BASECOUNT 失败,说明此时发生了并发。 // 进到 if 代码块中采用 CounterCell 对 table 中的元素总数进行计数。 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // boolean uncontended = true; // as 是空 || as 长度是空 // || as[i] 是空 // || CAS 给 as[i] 从当前值 a.value 修改到 v + x 失败 // 进 if 代码块。 if (as == null || (m = as.length - 1) < 0 || // ThreadLocalRandom.getProbe() 相对于线程是固定的,就跟 key 的 hashcode 一样。 // 用来计算数组的角标。 (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 将并发时,CAS 失败的线程增加的数量记录到 countersCell 中。 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 计算 table 中的全部节点数。 // 全部节点数 = countersCell 数组中每个元素的和 + baseCount。 s = sumCount(); } // 检查是否需要执行扩容。如果 table 中元素总个数超过阈值,那就执行扩容。 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 元素总个数超过阈值 && table 不为空 && table 容量小于 2^30。 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs 后面会被用来将 sizeCtl 设置为负数。 int rs = resizeStamp(n); // sc 小于 0,表示 table 正在扩容。 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // CAS 将 sizeCtl 修改成负值。表示 table 正在扩容中。 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 执行扩容。 transfer(tab, null); // 统计 table 中全部元素的个数。 s = sumCount(); } } }
private final void fullAddCount(long x, boolean wasUncontended) { int h; // 获取当前线程的 getProbe,如果是0。 if ((h = ThreadLocalRandom.getProbe()) == 0) { // 则初始话当前线程的 probe 值,probe 是随机数。 ThreadLocalRandom.localInit(); // force initialization // 赋值 probe 到 h。 h = ThreadLocalRandom.getProbe(); // counterCells 数组中为发生索引冲突。 wasUncontended = true; } boolean collide = false; // True if last slot nonempty // 死循环,也叫自旋。 for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 如果 counterCells 已经被初始化过了。 if ((as = counterCells) != null && (n = as.length) > 0) { // i = (n - 1) & h: 计算当前线程增加的元素个数放在 countersCell 的哪个索引上。 // a[i] 处的没有元素。 if ((a = as[(n - 1) & h]) == null) { // 如果 countersCell 不处在扩容或者初始化状态(忙碌状态),当前线程是可以直接往里面放值的。 if (cellsBusy == 0) { // Try to attach new Cell // 创建 CounterCell(x)。 CounterCell r = new CounterCell(x); // Optimistic create // CAS 修改 CELLSBUSY 从 清闲状态 到 忙碌状态。 // 别的线程看到 CELLSBUSY 是忙碌状态,就不能往 countersCell中写数据。 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // 标识符,创建未完成。 boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //将 new CounterCell(x) 存放到 (m - 1) & h 位置上。 rs[j] = r; // 设置标识符为创建完成。 created = true; } } finally { // 设置设置标志符为"清闲状态" cellsBusy = 0; } // 当 rs[j = (m - 1) & h] == null 成立, // new CounterCell(x) 添加到 countersCell 成功。 // 否则,添加失败。 // 如果 new CounterCell(x) 添加到 countersCell 成功。 // 跳出死循环。 if (created) break; // 如果 new CounterCell(x) 添加到 countersCell 失败。 // 返回去再次执行循环。 continue; // Slot is now non-empty } } // countersCell[i] == null 但 busyCell != 0 // 说明多个线程的结果都要存到 i 位置,导致好些线程竞争失败。 // 也就是说 countersCell 长度太短了,需要为其扩容。 collide = false; } // 在 addCount 方法中在 countersCell 数组的 i 位置已经有元素了,在其基础上 // 再增加的时候失败了。 else if (!wasUncontended) // CAS already known to fail // 重新标记为未冲突。 // 在后面重新获取线程的 probe,重新计算其在 countersCell 中的索引。 wasUncontended = true; // Continue after rehash // 因为计算的 i 位置出有元素,直接在其基础上再增加。 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) // 增加成功就跳出死循环。 break; // counterCells != as 说明不是当前线程创建的 counterCells。 // 或者 counterCells 大于 CPU的核数。 else if (counterCells != as || n >= NCPU) // 将碰撞设置为false,不再对 countersCell 扩容。 // 因为 countersCell 的上限是 NCPU。 collide = false; // At max size or stale // (经过了前面那么多判断,才执行到这里,说明扩容真的是最后一招了) else if (!collide) collide = true; // CAS 修改状态,并给 countersCell 执行扩容. else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale // 扩容为原来的两倍。 CounterCell[] rs = new CounterCell[n << 1]; // 将原来的元素一一赋值到扩容后的 countersCell 中。 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 赋值给 counterCells。 counterCells = rs; } } finally { // 标记会空闲状态。 cellsBusy = 0; } collide = false; // 继续下一次自旋。 continue; // Retry with expanded table } // 重新获取当前线程的 probe。 h = ThreadLocalRandom.advanceProbe(h); } // countersCell 没有被初始化。 // cellsBusy == 0 表示没有别的线程在执行 counterCells 初始化 或 者扩容。(第一次扩容,不就是初始化吗) // CAS 将 cellsBusy 的值从 0 修改到 1 ,表示当前线程正在为 counterCells 做初始化。 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // 标识符,初始化未完成。 boolean init = false; try { // Initialize table if (counterCells == as) { // 初始化 rs 容量为 2。 CounterCell[] rs = new CounterCell[2]; // 将 x 也就是新增的元素个数放在 counterCells[i] 位置上。 rs[h & 1] = new CounterCell(x); // 引用赋值, counterCells = rs; // counterCells 初始化结束。 init = true; } } finally { // 撤销 countersCell 的初始化标志符。 cellsBusy = 0; } // 完成初始化了,直接跳出死循环。 if (init) break; } // 如果 countersCell 没有被初始化,而且 它还忙碌着。 // 那当前线程执行这个 CAS ,在 baseCount 的基础上增加。 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
###1.3 源码:sumCount()
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { // 遍历 countersCell 中的每一个元素。 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) // 将每个元素的 value 累加到 sum。 sum += a.value; } } // 返回 sum。 return sum; }
public int size() { // 调用了 sumCount() 统计 table 中的全部元素。 long n = sumCount(); // 如果没有超上、下限,返回元素个数。 return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }
当 table 中总元素的个数超过阈值时,就要执行扩容。ConcurrentHashMap 中采用多线程扩容,将 table 依据 CPU 的核数平均分成好几份,每一个线程只迁移它分到的这几个桶。采用这样的方式提高了扩容效率。
(https://raw.githubusercontent.com/diego1109/pictures/main/20210530171225.png)]
具体流程:
依据CPU 的核数,将 table 平分,每个线程只迁移其中它分到的这部分桶。每个线程分到的桶数不能小于16。
肯定有个线程是第一个执行扩容的,此时它要创建一个新 table 且容量是老 table 的 2 倍。
新 table 有了,开始迁移:(这个里面是个死循环)
3.1. 计算当前线程负责将老 table 中的哪几个桶(假设是黄色块)迁移到新数组。从老 table 的末尾开始从右向左迁移。
3.2. 如果老 table[i] == null
,说明这里没有元素,那就不用执行迁移,也可以说 i 桶已经被迁移过了。设置 table[i] == new ForwardingNode()
,该 node 的 hashcode 是 -1。表示 table 处于正在执行迁移,而且 table[i] 桶已经迁移完成了。
3.3. 如果 table[i] .hashCode == -1
说明 i 桶已经被迁移过了。不用再次迁移。
3.2. 如果 table[i] != null
, 那就要迁移了,不管是链表还是树,都要 synchronize(table[i])
。桶迁移的时候,不能允许再在桶中插入元素。接着按照 hashMap 中链表和树迁移的思想执行迁移。
3.3. 当老table 的第 i 桶迁移完成后,在老 table[i] 处设置 forwardingNode,
3.4. 第 i 个桶迁移完成了,--i
,准备迁移左边的下一个桶,–i 执行后,要再次判断下,是不是分配给自己的桶都迁移完了。如果没那继续迁移。
3.5. 如果当前线程将分配给它的桶迁移完了,接下来再判断,是不是其他线程也都将分配给自己的桶迁移完了?
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 先计算出每个线程处理多少个桶。 // 如果cpu是单核的,那不用分了,当前线程执行整个 table 的扩容。 // 如果不是单核的,table.length/8,然后除以 CPU 的核数,求出每个线程处理多少个桶, // 每个线程处理的桶是不得低于 MIN_TRANSFER_STRIDE (16)。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 是桶扩容,不是协同扩容。 if (nextTab == null) { // initiating try { // 创建临时table,容量是原来的两倍。 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 将新建的 table 赋值给 nextTable。 nextTable = nextTab; // 保存的是老数组的长度。 transferIndex = n; } // 获取新表的长度。 int nextn = nextTab.length; // 创建 forwarding 节点,其 hashcode == MOVED == -1。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab // 又是死循环。计算当前线程该负责哪几个桶。 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; // 老 table 中的桶迁移时,是从最右端向最左端迁移。 // --i , 给当前线程分了 n 个桶,下标 -1,迁移左边的桶。 if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 第一个扩容线程会进到这里来,nextIndex = 老数组的长度。 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // 当前线程负责的桶的下界。 bound = nextBound; // 当前线程负责的桶的上界。 i 此时是新数组的最后一个索引。 i = nextIndex - 1; // 当前线程负责的是老数组中 (bound,i] 这些桶。 advance = false; } } // 当前线程处理完了分配给它的桶。 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 再判断是不是所有的线程都做完了分配给自己的桶。 // 如果都完成了。 if (finishing) { // 新 table 置为空 nextTable = null; // 扩容并且迁移后的 nextTab 写回老 table。 table = nextTab; // 扩容完了,接下来要算阈值了。 // (n << 1) - (n >>> 1) == 2*n - 0.5*n == 2*n - 0.25*2*n // == (1- 0.25) * 2*n == 0.75 * 2*n。 // 新的阈值 == 0.75 * 新的容量。 sizeCtl = (n << 1) - (n >>> 1); return; } // 如果不是 // 只有当 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 时,所有所有线程扩容执行完毕。 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 标记 finishing = true。 finishing = advance = true; i = n; // recheck before commit } } // 如果老数组中 i 位置出元素为空,是不用处理的,也可以认为 i 位置已经处理过了。 else if ((f = tabAt(tab, i)) == null) // CAS 将 fwd 节点放在 table[i] 位置,表示已经处理过了。 advance = casTabAt(tab, i, null, fwd); // table[i] 元素的 hashcode == -1,表示桶已经处理过了。 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 给 table[i] 加锁,也就是将桶锁起来。 synchronized (f) { // 双重验证,确保 table[i] 没有被改变。 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // hashcode > 0,表示桶里面是链表,执行链表的迁移。 if (fh >= 0) { // 这块是 1.8 hashMap 链表迁移的方式。 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 拆分 table 中 i 桶中的链表, // 拆成 1 个或者 2 个。 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 将低位元素赋值到 newTab 的 i 桶中。 setTabAt(nextTab, i, ln); // 将高位元素赋值到 newTab 的 i+n 桶中。 setTabAt(nextTab, i + n, hn); // 设置 tab 中 i 桶已经处理过了。 setTabAt(tab, i, fwd); advance = true; } // 桶里面是树,执行树的迁移。 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
什么是协助扩容,T1 在 table[i] 插入了元素并且事后判断超阈值了,开始执行扩容,刚迁移完 table[i] 所在的桶,正在去迁移别的桶…,此时 T2 要插入 key-value,算出来也是要想 i 桶中插入,结果发现 table 正在执行扩容,T2 放弃了插入,也开始加入到对 table 进行扩容… ; 又来个 T3 向 table[j] 出插入了元素后, 发现 table 也在扩容,结果T3 也加入到了扩容大军中…。
T1、T2、T3 三个合起来叫多线程扩容;T2、T3 叫协住扩容。
由上面的描述可知,协同扩容有两种触发场景:
插入元素是发现 table[i] .hashcode. == -1。此时,当前线程放弃插入执行协同扩容,扩容完毕后,等下轮循环时再插入元素。
// putVal(K key, V value, boolean onlyIfAbsent){} 方法中。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 通过这里执行协同扩容。 transfer(tab, nextTab); break; } } return nextTab; } return table; }
在 table[j] 上添加完元素了,判断需不需要扩容时,发现 table正在扩容,那就协助扩容。
// private final void addCount(long x, int check) {} if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // sc 小于 0,表示 table 正在扩容。 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // CAS 将 sizeCtl 在原值上+1,表示参与扩容的线程又多了一个。 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 协同扩容,nt 是扩容后 table 的引用。 transfer(tab, nt); } // 第一个扩容的线程会走这里。 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 带个线程开始扩容。 transfer(tab, null); s = sumCount(); } }
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算 key 的 hashcode。 int h = spread(key.hashCode()); // (n - 1) & h 计算要查找的元素在哪个桶里面。 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // hashcode 相等,key 值相等,说明找到了。 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 直接返回 value。 return e.val; } // hashcode < 0,说明 table[i] 处目前是个 ForwardingNode。 else if (eh < 0) // e.find(h, key) 去 nextTable 中去找 key 对应的 value。 return (p = e.find(h, key)) != null ? p.val : null; // 从桶里找。 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }