C/C++教程

concurrentHashMap 源码理解

本文主要是介绍concurrentHashMap 源码理解,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、table 的初始化

table 数组的第一次初始化会发成在 putVal 的时候,但是在构造器中已经设置了容量和阈值。

1、无参构造函数

// 默认容量是 16.
public ConcurrentHashMap() {}

##2、带初始容量的有参构造函数

如果提前能知道容量的大小,可以在创建对象时就给出初始容量。

    public ConcurrentHashMap(int initialCapacity) {
        // 下限检测
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // cap 是大于 1.5*initialCapacity 的最小的 2^n, 最大不超过 2^30.
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        // table 为初始化,sizeCtl > 0,sizeCtl 表示 table 的初始容量。
        this.sizeCtl = cap;
    }

用户给定的初始容量不一定符合2^n,所以在构造方法中又对用户给定的初始容量进行了重新计算。

  initialCapacity + (initialCapacity >>> 1) + 1
= 1.5 * initialCapacity + 1
 之所以 +1 是为了防止 1.5 * initialCapacity 恰好等于 2 的次幂。

3、带初始容量和负载因子的构造函数

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    // 给定线程数为1,调用别的构造器。
    this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 下界校验。
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 线程并发的数的上限是 table 的容量。因为多了没用。
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    // 接下来是采用给定的负载因子计算 sizeCtl。
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    // 赋值。
    this.sizeCtl = cap;
}

二、向 table 添加元素

添加操作和 HashMap添加的主体流程基本相同:

  1. table 如果为空,先初始化 table,也就是第一次扩容。
  2. 计算 key-value 要插入的桶的位置 i。
  3. table[i] 如果为空,直接将 key-value --> Node,添加到 table[i] 位置处。
  4. table[i] 不为空,那说明 i 位置的桶中有元素,要么是链表,要么是树。key-value要么追加,要么替代。
  5. 如果 table[i].hash == -1,说明此时的 table 正在执行扩容,而且 i 桶已经被迁移过了,此时再添加元素是无效的。所以当前线程执行协助扩容,在下一次循环的时候再添加元素。
  6. 当桶中是链表时,遍历链表,执行替代或者追加元素。
  7. 当桶中是树时,执行树的替代或者追加。
  8. 元素插入后,再统计 table 中所有元素的个数,如果超过了阈值,还要执行扩容。

ConcurrentHashMap 中不同的是,它在添加元素时能增加了防并发功能。

  1. 向 table 中添加元素的整个过程都放在死循环里面执行,当插入操作执行成功时,通过 break 跳出循环。死循环的作用是当 CAS 一次失败后,重新循环,可以有再次执行的机会。乐观锁的思路。

    for (Node<K,V>[] tab = table;;) {
        // 插入操作的所有代码都在里面。
    }
    
  2. 在第3 步中,使用 CAS 的方式向 table 中添加元素。别的线程如果比当前线程快一瞬间向 table[i] 处添加元素,那当前线程的 CAS 会执行失败,同样如果当前线程快一瞬间,那其他线程的 CAS 也就失败了。从而避免了 table[i] 处值的覆盖。

casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))
  1. 在第 6、7步执行的时候,使用了 synchronize 加锁。如果 i 位置是个链表或者树,线程对 i 都得先拿到链表的头结点 或者 树的根,这就是 table[i]。当前线程对 table[i] 加了悲观锁,别的线程也在 i 位置执行插入,再拿锁时就会陷入阻塞,直到当前线程执行释放锁。这就保证了线程在 i 桶位置执行插入操作时的线程同步。

    除此以外,当别的线程在 j 位置处执行插入操作时,可以直接执行,而不会被阻塞。这种细粒度的锁,既保证了 插入的线程安全,又保证了多线程同时插入的效率不至于太低。

    f = table[i];
    synchronized (f){
        //对桶遍历,执行替代或者追加。
    }
    

初始化table

这里需要注意 sizeCtl 在不同时候的代表的意思,而且 是 while 循环 + CAS 修改 sizeCtl 的值,还是乐观锁方式。

tablesizeCtlsizeCtl 表示的意思
0使用调用无参构造器的结果,table 的容量默认为 16。
大于0使用有参构造器的结果, sizeCtl 表示初始的 table 容量。
小于0表示有其他线程正在对 table 执行扩容,sizeCtl表示扩容的有多少个线程正在执行扩容。
等于 -1表示当前线程正在对 table 执行初始化。
不为空大于0表示 table 的扩容阈值。

初始化源码:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //  只有当 table 是空的时候,才执行初始化。
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            // sizeCtl 小于 0,表示有其他线程正在对 table 执行初始化。
            // 当前线程要放弃 CPU 的使用,这轮循环就结束了。
            // (如果下轮循环,还是其他线程正在对 table 执行初始化,继续放弃。)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS 将 sizeCtl 的值从 小于0 修改成 -1.
            // sizeCtl == -1,表示 table 正在执行初始化。
            // (当别的线程进到这个方法,执行初始化时,发现 sizeCtl<0,就放弃 CPU的使用权)
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // sc 的值是 table 的容量,如果 sc == 0,使用默认容量 16。
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 初始化 table。
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 当 table 不为空时,sizeCtl 的值是扩容阈值。
                    // 这里先计算 sc == n - (n / 4) = 0.75 * n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 设置扩容阈值 sizeCtl = sc。
                sizeCtl = sc;
            }
            break;
        }
    }
    // 返回初始化后的 table。
    return tab;
}

添加元素源码:

public V put(K key, V value) {
    return putVal(key, value, false);
}
// onlyIfAbsent 控制是否允许替代。fasle 允许。
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 计算 hash 值,加了扰动的。
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 死循环。
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            // table 是空的,初始化 table。
            // 初始化完成后,第一次循环就结束了,第二次循环时,返现 table 非空,执行下面的逻辑。
            tab = initTable();
        // (n - 1) & hash : 计算 key 的桶。
        // tabAt(tab,i): 拿出 table[i]。
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // table[i] == null, 说明 i 桶中是空的,最简单的情况了。
            // CAS 将 k-v 添加到 table[i] 位置。
            // 如果 CAS 失败,那就是被其他线程抢先了,这个循环执行完毕,
            // 下一次循环中再添加 k-v,只是不一定会在这个else if 块中。
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                // CAS 成功,跳出死循环。
                break;                   // no lock when adding to empty bin
        }
        // 到这里,说明 i 位置的桶中有元素,
        // f=table[i],f.hash == MOVED == -1,表示 table 正在扩容。
        // 此时不允许加元素,得先协助去扩容。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 对 table[i] 加锁,把锁加到了桶上。
            synchronized (f) {
                // 两次验证。有可能别的线程在 i 桶里添了个节点后,将链表变成了树。
                // 那这样的话 f 就不是原来的值了。
                if (tabAt(tab, i) == f) {
                    // hashcode 大于 0,是链表。
                    if (fh >= 0) {
                        // binCount 用来记录链表上的节点个数。
                        binCount = 1;
                        // 遍历链表,又是个死循环。
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果 hashcode 相同 且 key 相同,那是替代。
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 拿出老值。
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    // 新值替代老值。
                                    e.val = value;
                                // 跳出遍历链表的循环。
                                break;
                            }
                            // 到这里,key 与 e 的 hashcode不相同。
                            // pred 临时记录 e。
                            Node<K,V> pred = e;
                            // 换下一个节点。
                            // 如果下一个节点是 null,表示已经到链表
                            if ((e = e.next) == null) {
                                // 将 k-v 追加到链尾。
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                // 跳出遍历链表的循环。
                                break;
                            }
                        }
                    }
                    // i 位置的桶是树,执行树的替代或者插入。
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 链表上节点个数大于等于 8,且 table 容量大于等于 64 时,链表树化。
                // 链表上节点个数大于等于 8,但 table 容量小于 64 时,扩容。
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果是替代,会有老值,要将老值返回。
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 新的元素到这里插完了,接下来还要获取 table 中的总元素个数
    // 如果超过了阈值,那还得扩容。
    addCount(1L, binCount);
    return null;
}

三、table 扩容

向 table[] 中插入元素后还要再判断 当前 table 中的全部元素数量是否超过了阈值,如果超了,那要扩容。所以这里就需要做两件事情:

  1. 统计 table 中元素的总个数。
  2. (如果超过阈值)扩容。

1、统计 table 中元素个数。

因为ConcurrentHashMap 是针对多线程并发设计的, 在其中设计了两个字段用于统计 table 中元素的个数。

// 单线程下,线程添加的元素个数累加在 baseCount 字段上。
private transient volatile long baseCount;
// 多线程下,竞争失败的线程添加的元素个数x,封装在 new CounterCell(x) 对象中,最后放入 CounterCell[]。
// CounterCell[] 初始长度是2,但可以扩大器长度,长度上限是 CPU 的核数。
private transient volatile CounterCell[] counterCells;

全部元素的个数 = bashCount + counterCells 中所有元素的和。

HashMap 中 直接 ++size 就统计了元素个数。ConcurrentHashMap 采用 CAS 的方式增加元素个数。baseCount 是元素基数,x 是新插入的元素个数。

U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)

假设 t1、t2、t3、t4 四个线程每个线程都向 table 中添加了一个元素。接下来是统计当前 table 中元素的个数。

  1. 假设 t1 在 cas 时竞争成功了,则 t1 增加的个数计数到了 baseCount 上。

  2. t2 竞争失败,发现 countersCell 数组还是个空的,初始化数组,设定容量是2。将自己添加的元素个数放到数组中。

    CounterCell[] rs = new CounterCell[2];
    // 假设 t1 的 probe 计算出的索引是1。
    rs[1] = new CounterCell(1);
    
  3. t3 也竞争失败了,此时 countersCell 不为空,直接 rs[0] = new CounterCell(1); 这里假设 t3 的 probe 计算出的索引是 0。

  4. t4 也竞争失败了,此时 countersCell 不为空,但此时 countersCell 中放满了(不一定是满了,出现索引碰撞也可以)。

    结局1: t4 增加的元素的个数追加到 rs 其中的某个元素上,[counterCell[2],counterCell[1]] 或者 [counterCell[1],counterCell[2]]。具体是加谁上,要看 t4 和谁索引碰撞了。

    结局2: 将 countersCell 数组的长度扩大 2 倍。再将 t4 增加的个数添加进去。

最后 t4 计算 table 中全部元素的个数时,就等于:baseCount + countersCell 中全部元素求和。

1.1 源码:addCount(long x, int check)

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // counterCells 为空 且 CAS 修改 BASECOUNT 从  baseCount 到 b + x 成功。
    // 说明此时,没有发生并发,table 中总元素个数 s == b + x == baseCount + 1。

    // 如果 counterCells 或者 CAS 修改 BASECOUNT 失败,说明此时发生了并发。
    // 进到 if 代码块中采用 CounterCell 对 table 中的元素总数进行计数。
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        //
        boolean uncontended = true;
        // as 是空 || as 长度是空
        // || as[i] 是空
        // || CAS 给 as[i] 从当前值 a.value 修改到 v + x 失败
        // 进 if 代码块。
        if (as == null || (m = as.length - 1) < 0 ||
            // ThreadLocalRandom.getProbe() 相对于线程是固定的,就跟 key 的 hashcode 一样。
            // 用来计算数组的角标。
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 将并发时,CAS 失败的线程增加的数量记录到 countersCell 中。
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算 table 中的全部节点数。
        // 全部节点数 = countersCell 数组中每个元素的和 + baseCount。
        s = sumCount();
    }
    // 检查是否需要执行扩容。如果 table 中元素总个数超过阈值,那就执行扩容。
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 元素总个数超过阈值 && table 不为空 && table 容量小于 2^30。
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // rs 后面会被用来将 sizeCtl 设置为负数。
            int rs = resizeStamp(n);
            // sc 小于 0,表示 table 正在扩容。
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // CAS 将 sizeCtl 修改成负值。表示 table 正在扩容中。
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 执行扩容。
                transfer(tab, null);
            // 统计 table 中全部元素的个数。
            s = sumCount();
        }
    }
}

1.2 源码: fullAddCount(long x, boolean wasUncontended)

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    // 获取当前线程的 getProbe,如果是0。
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        // 则初始话当前线程的 probe 值,probe 是随机数。
        ThreadLocalRandom.localInit();      // force initialization
        // 赋值 probe 到 h。
        h = ThreadLocalRandom.getProbe();
        // counterCells 数组中为发生索引冲突。
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    // 死循环,也叫自旋。
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        // 如果 counterCells 已经被初始化过了。
        if ((as = counterCells) != null && (n = as.length) > 0) {
            // i = (n - 1) & h: 计算当前线程增加的元素个数放在 countersCell 的哪个索引上。
            // a[i] 处的没有元素。
            if ((a = as[(n - 1) & h]) == null) {
                // 如果 countersCell 不处在扩容或者初始化状态(忙碌状态),当前线程是可以直接往里面放值的。
                if (cellsBusy == 0) {            // Try to attach new Cell
                    // 创建 CounterCell(x)。
                    CounterCell r = new CounterCell(x); // Optimistic create
                    // CAS 修改 CELLSBUSY 从 清闲状态 到 忙碌状态。
                    // 别的线程看到 CELLSBUSY 是忙碌状态,就不能往 countersCell中写数据。
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        // 标识符,创建未完成。
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                //将 new CounterCell(x) 存放到 (m - 1) & h 位置上。
                                rs[j] = r;
                                // 设置标识符为创建完成。
                                created = true;
                            }
                        } finally {
                            // 设置设置标志符为"清闲状态"
                            cellsBusy = 0;
                        }
                        // 当 rs[j = (m - 1) & h] == null 成立,
                        // new CounterCell(x) 添加到 countersCell 成功。
                        // 否则,添加失败。

                        // 如果 new CounterCell(x) 添加到 countersCell 成功。
                        // 跳出死循环。
                        if (created)
                            break;
                        // 如果 new CounterCell(x) 添加到 countersCell 失败。
                        // 返回去再次执行循环。
                        continue;           // Slot is now non-empty
                    }
                }
                // countersCell[i] == null 但 busyCell != 0
                // 说明多个线程的结果都要存到 i 位置,导致好些线程竞争失败。
                // 也就是说 countersCell 长度太短了,需要为其扩容。
                collide = false;
            }
            // 在 addCount 方法中在 countersCell 数组的 i 位置已经有元素了,在其基础上
            // 再增加的时候失败了。
            else if (!wasUncontended)       // CAS already known to fail
                // 重新标记为未冲突。
                // 在后面重新获取线程的 probe,重新计算其在 countersCell 中的索引。
                wasUncontended = true;      // Continue after rehash
            // 因为计算的 i 位置出有元素,直接在其基础上再增加。
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                // 增加成功就跳出死循环。
                break;
            // counterCells != as 说明不是当前线程创建的 counterCells。
            // 或者 counterCells 大于 CPU的核数。
            else if (counterCells != as || n >= NCPU)
                // 将碰撞设置为false,不再对 countersCell 扩容。
                // 因为 countersCell 的上限是 NCPU。
                collide = false;            // At max size or stale
            // (经过了前面那么多判断,才执行到这里,说明扩容真的是最后一招了)
            else if (!collide)
                collide = true;
            // CAS 修改状态,并给 countersCell 执行扩容.
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        // 扩容为原来的两倍。
                        CounterCell[] rs = new CounterCell[n << 1];
                        // 将原来的元素一一赋值到扩容后的 countersCell 中。
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 赋值给 counterCells。
                        counterCells = rs;
                    }
                } finally {
                    // 标记会空闲状态。
                    cellsBusy = 0;
                }
                collide = false;
                // 继续下一次自旋。
                continue;                   // Retry with expanded table
            }
            // 重新获取当前线程的 probe。
            h = ThreadLocalRandom.advanceProbe(h);
        }
        // countersCell 没有被初始化。
        // cellsBusy == 0 表示没有别的线程在执行 counterCells 初始化 或 者扩容。(第一次扩容,不就是初始化吗)
        // CAS 将 cellsBusy 的值从 0 修改到 1 ,表示当前线程正在为 counterCells 做初始化。
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            // 标识符,初始化未完成。
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    // 初始化 rs 容量为 2。
                    CounterCell[] rs = new CounterCell[2];
                    // 将 x  也就是新增的元素个数放在 counterCells[i] 位置上。
                    rs[h & 1] = new CounterCell(x);
                    // 引用赋值,
                    counterCells = rs;
                    // counterCells 初始化结束。
                    init = true;
                }
            } finally {
                // 撤销 countersCell 的初始化标志符。
                cellsBusy = 0;
            }
            // 完成初始化了,直接跳出死循环。
            if (init)
                break;
        }
        // 如果 countersCell 没有被初始化,而且 它还忙碌着。
        // 那当前线程执行这个 CAS ,在 baseCount 的基础上增加。
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

###1.3 源码:sumCount()

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        // 遍历 countersCell 中的每一个元素。
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                // 将每个元素的 value 累加到 sum。
                sum += a.value;
        }
    }
    // 返回 sum。
    return sum;
}

1.4 size()

public int size() {
    // 调用了 sumCount() 统计 table 中的全部元素。
    long n = sumCount();
    // 如果没有超上、下限,返回元素个数。
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

2、执行扩容

当 table 中总元素的个数超过阈值时,就要执行扩容。ConcurrentHashMap 中采用多线程扩容,将 table 依据 CPU 的核数平均分成好几份,每一个线程只迁移它分到的这几个桶。采用这样的方式提高了扩容效率。

(https://raw.githubusercontent.com/diego1109/pictures/main/20210530171225.png)]

具体流程:

  1. 依据CPU 的核数,将 table 平分,每个线程只迁移其中它分到的这部分桶。每个线程分到的桶数不能小于16。

  2. 肯定有个线程是第一个执行扩容的,此时它要创建一个新 table 且容量是老 table 的 2 倍。

  3. 新 table 有了,开始迁移:(这个里面是个死循环)

    3.1. 计算当前线程负责将老 table 中的哪几个桶(假设是黄色块)迁移到新数组。从老 table 的末尾开始从右向左迁移。

    3.2. 如果老 table[i] == null,说明这里没有元素,那就不用执行迁移,也可以说 i 桶已经被迁移过了。设置 table[i] == new ForwardingNode(),该 node 的 hashcode 是 -1。表示 table 处于正在执行迁移,而且 table[i] 桶已经迁移完成了。

    3.3. 如果 table[i] .hashCode == -1 说明 i 桶已经被迁移过了。不用再次迁移。

    3.2. 如果 table[i] != null, 那就要迁移了,不管是链表还是树,都要 synchronize(table[i])。桶迁移的时候,不能允许再在桶中插入元素。接着按照 hashMap 中链表和树迁移的思想执行迁移。

    3.3. 当老table 的第 i 桶迁移完成后,在老 table[i] 处设置 forwardingNode,

    3.4. 第 i 个桶迁移完成了,--i ,准备迁移左边的下一个桶,–i 执行后,要再次判断下,是不是分配给自己的桶都迁移完了。如果没那继续迁移。

    3.5. 如果当前线程将分配给它的桶迁移完了,接下来再判断,是不是其他线程也都将分配给自己的桶迁移完了?

    • 如果没有,当前前程会被从新奉陪迁移任务,再去执行桶的迁移。
    • 如果都迁移完了,将 nextTable 置为空,将扩容后的新数组重读赋给 table,从新计算阈值。跳出死循环。

2.1 源码:transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 先计算出每个线程处理多少个桶。
    // 如果cpu是单核的,那不用分了,当前线程执行整个 table 的扩容。
    // 如果不是单核的,table.length/8,然后除以 CPU 的核数,求出每个线程处理多少个桶,
    // 每个线程处理的桶是不得低于 MIN_TRANSFER_STRIDE (16)。
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    // 是桶扩容,不是协同扩容。
    if (nextTab == null) {            // initiating
        try {
            // 创建临时table,容量是原来的两倍。
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 将新建的 table 赋值给 nextTable。
        nextTable = nextTab;
        // 保存的是老数组的长度。
        transferIndex = n;
    }
    // 获取新表的长度。
    int nextn = nextTab.length;
    // 创建 forwarding 节点,其 hashcode == MOVED == -1。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    // 又是死循环。计算当前线程该负责哪几个桶。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 老 table 中的桶迁移时,是从最右端向最左端迁移。
            // --i , 给当前线程分了 n 个桶,下标 -1,迁移左边的桶。
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 第一个扩容线程会进到这里来,nextIndex = 老数组的长度。
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 当前线程负责的桶的下界。
                bound = nextBound;
                // 当前线程负责的桶的上界。 i 此时是新数组的最后一个索引。
                i = nextIndex - 1;
                // 当前线程负责的是老数组中 (bound,i] 这些桶。
                advance = false;
            }
        }
        // 当前线程处理完了分配给它的桶。
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 再判断是不是所有的线程都做完了分配给自己的桶。
            // 如果都完成了。
            if (finishing) {
                // 新 table 置为空
                nextTable = null;
                // 扩容并且迁移后的 nextTab 写回老 table。
                table = nextTab;
                // 扩容完了,接下来要算阈值了。
                // (n << 1) - (n >>> 1) == 2*n - 0.5*n == 2*n - 0.25*2*n
                // == (1- 0.25) * 2*n == 0.75 * 2*n。
                // 新的阈值 == 0.75 * 新的容量。
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 如果不是
            // 只有当 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 时,所有所有线程扩容执行完毕。
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 标记 finishing = true。
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 如果老数组中 i 位置出元素为空,是不用处理的,也可以认为 i 位置已经处理过了。
        else if ((f = tabAt(tab, i)) == null)
            // CAS 将 fwd 节点放在 table[i] 位置,表示已经处理过了。
            advance = casTabAt(tab, i, null, fwd);
        // table[i] 元素的 hashcode == -1,表示桶已经处理过了。
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 给 table[i] 加锁,也就是将桶锁起来。
            synchronized (f) {
                // 双重验证,确保 table[i] 没有被改变。
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // hashcode > 0,表示桶里面是链表,执行链表的迁移。
                    if (fh >= 0) {
                        // 这块是 1.8 hashMap 链表迁移的方式。
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 拆分 table 中 i 桶中的链表,
                        // 拆成 1 个或者 2 个。
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 将低位元素赋值到 newTab 的 i 桶中。
                        setTabAt(nextTab, i, ln);
                        // 将高位元素赋值到 newTab 的 i+n 桶中。
                        setTabAt(nextTab, i + n, hn);
                        // 设置 tab 中 i 桶已经处理过了。
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 桶里面是树,执行树的迁移。
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

2.2 协助扩容

什么是协助扩容,T1 在 table[i] 插入了元素并且事后判断超阈值了,开始执行扩容,刚迁移完 table[i] 所在的桶,正在去迁移别的桶…,此时 T2 要插入 key-value,算出来也是要想 i 桶中插入,结果发现 table 正在执行扩容,T2 放弃了插入,也开始加入到对 table 进行扩容… ; 又来个 T3 向 table[j] 出插入了元素后, 发现 table 也在扩容,结果T3 也加入到了扩容大军中…。

T1、T2、T3 三个合起来叫多线程扩容;T2、T3 叫协住扩容。

由上面的描述可知,协同扩容有两种触发场景:

  1. 插入元素是发现 table[i] .hashcode. == -1。此时,当前线程放弃插入执行协同扩容,扩容完毕后,等下轮循环时再插入元素。

    // putVal(K key, V value, boolean onlyIfAbsent){} 方法中。
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
    
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    // 通过这里执行协同扩容。
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    
  2. 在 table[j] 上添加完元素了,判断需不需要扩容时,发现 table正在扩容,那就协助扩容。

    // private final void addCount(long x, int check) {}
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // sc 小于 0,表示 table 正在扩容。
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // CAS 将 sizeCtl 在原值上+1,表示参与扩容的线程又多了一个。
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 协同扩容,nt 是扩容后 table 的引用。
                    transfer(tab, nt);
            }
            // 第一个扩容的线程会走这里。
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                // 带个线程开始扩容。
                transfer(tab, null);
            s = sumCount();
        }
    }
    

四:源码 get(Object key)

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算 key 的 hashcode。
    int h = spread(key.hashCode());
    // (n - 1) & h 计算要查找的元素在哪个桶里面。
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // hashcode 相等,key 值相等,说明找到了。
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                // 直接返回 value。
                return e.val;
        }
        // hashcode < 0,说明 table[i] 处目前是个 ForwardingNode。
        else if (eh < 0)
            // e.find(h, key) 去 nextTable 中去找 key 对应的 value。
            return (p = e.find(h, key)) != null ? p.val : null;
        // 从桶里找。
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
这篇关于concurrentHashMap 源码理解的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!