ConcurrentHashMap是 HashMap 的并发版本,它是线程安全的,并且在高并发的情境下,性能优于 HashMap 很多。
jdk 1.7 采用分段锁技术,整个 Hash 表被分成多个段,每个段中会对应一个 Segment 段锁,段与段之间可以并发访问,但是多线程想要操作同一个段是需要获取锁的。所有的 put,get,remove 等方法都是根据键的 hash 值对应到相应的段中,然后尝试获取锁进行访问。
jdk 1.8 取消了基于 Segment 的分段锁思想,改用 CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。本篇主要介绍 1.8 版本的 ConcurrentHashMap 的具体实现,有关其之前版本的实现情况,这里推荐几篇文章:
CAS和Unsafe类
ConcurrentHashMap的成员变量介绍(省略HashMap相同的参数)
private static final int MIN_TRANSFER_STRIDE = 16; /** * The number of bits used for generation stamp in sizeCtl. * Must be at least 6 for 32bit arrays. */ private static int RESIZE_STAMP_BITS = 16; /** * 扩容的最大的线程数,65535 */ private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; /** * 扩容的最大的线程数,65535 */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /** * The bit shift for recording size stamp in sizeCtl. */ private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; /* * 节点的hash值类型 */ // forwarding nodes节点的hash值,只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。 static final int MOVED = -1; //树根节点的hash值 static final int TREEBIN = -2; // hash for roots of trees static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash /* * Volatile类型的Node类型的table,默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。 */ transient volatile Node<K,V>[] table; /** * 默认为null,扩容时新生成的数组,其大小为原数组的两倍。 */ private transient volatile Node<K,V>[] nextTable; /** * counterCells 数组未初始化,在没有线程争用时,将 size 的变化写入此字段 * 初始化 counterCells 数组时,没有获取到 cellsBusy 锁,会再次尝试将 size 的变化写入此字段 */ private transient volatile long baseCount; /** *当前未初始化: * = 0 未指定初始容量 * > 0 由指定的初始容量计算而来,再找最近的2的幂次方。 * 比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。 * 初始化中: * = -1 //table正在初始化 * = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示并行扩容线程数+1,-(1+n)表示此时有n个线程正在共同完成数组的扩容操作。 * 初始化完成: * =table.length * 0.75 扩容阈值调为table容量大小的0.75倍 * */ private transient volatile int sizeCtl; /** * The next table index (plus one) to split while resizing. */ private transient volatile int transferIndex; /** * 用于同步 counterCells 数组结构修改的乐观锁资源 */ private transient volatile int cellsBusy; /** * counterCells 数组一旦初始化,size 的变化将不再尝试写入 baseCount * 可以将 size 的变化写入数组中的任意元素 * 可扩容,长度保持为 2 的幂 */ private transient volatile CounterCell[] counterCells; /** * CounterCell 是 ConcurrentHashMap 的一个静态内部类。 * baseCount和counterCells一起保存着整个哈希表中存储的所有的节点的个数总和。 */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
节点类型:
这是一个重要的属性,无论是初始化哈希表,还是扩容 rehash 的过程,都是需要依赖这个关键属性的。该属性有以下几种取值:
0:默认值
-1:代表哈希表正在进行初始化
大于0:相当于 HashMap 中的 threshold,表示阈值
小于-1:代表有多个线程正在进行扩容
ConcurrentHashMap的构造方法
/** * 无参构造,默认构建的table数组长16 */ public ConcurrentHashMap() { } /** * 指定容量,为2的n次方,此时设置sizeCtl=容量 */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } /** * 指定元素,sizeCtl=16 */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }
ConcurrentHashMap的节点类型,JDK1.8 中的 ConcurrentHashMap 对节点Node类中的共享变量使用Volatile关键字,保证多线程操作时,变量的可见行!
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; //Volatile类型 volatile Node<K,V> next; //Volatile类型 }
Node类型的节点有4个子类,具体如下:
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //1. 判断key和value都不能为空,否则抛出异常 if (key == null || value == null) throw new NullPointerException(); //2. 计算key的hash值 int hash = spread(key.hashCode()); int binCount = 0; //3.遍历table数组 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //4. 如果数组为空,则执行初始化操作(第一次循环) if (tab == null || (n = tab.length) == 0) tab = initTable(); //5. 数组不为空(第n次循环,n>1),Unsafe获取数组该索引位置的节点f,且f为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //5.1 节点为null,直接用CAS在该索引新增一个节点,结束put if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //6. f不为null,判断该节点的类型是ForwardingNode 类型,如果是那就是说明有线程正在进行扩容操作 else if ((fh = f.hash) == MOVED) //6.1 那么当前线程就进入协助扩容 tab = helpTransfer(tab, f); //7. f不为null ,且该节点为普通节点,上锁(首节点对象) else { V oldVal = null; //8 上synchronized锁锁住头节点,拿到锁的线程执行下一步 synchronized (f) { //9 再次确定这个节点的确是数组中的这个头结点 if (tabAt(tab, i) == f) { //10 头节点的hash>=0,该节点为链表节点 if (fh >= 0) { binCount = 1;//设置binCount=1 //10.2 遍历节点,binCount++,binCount记录找到目标节点的链表索引 for (Node<K,V> e = f;; ++binCount) { K ek; //10.3 如果链表上的节点的key和hash值和插入的相等,替换value,结束 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //10.4 否则,将e的下个节点赋值给e,不为null继续循环 Node<K,V> pred = e; if ((e = e.next) == null) { //10.5 下个节点为null,直接添加新节点到尾部,结束 pred.next = new Node<K,V>(hash, key, value, null); break; } } } //11. 节点类型为红黑树节点 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2;//设置binCount=2 //调用putTreeVal找到目标红黑树节点p if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { //如果目标节点p不为null,替换value结束 oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功, //红黑树节点时候binCount =2 //binCount == 0 说明put操作将一个新节点添加成为某个桶的首节点 if (binCount != 0) { //11 如果binCount且>8,则将i位置链表转成红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) //返回老节点 return oldVal; break; } } } //12 元素个数(类似HashMap的size)增加1, binCount表示当前插入数据所在桶元素序号(1,2...), 根据条件判断是否扩容 addCount(1L, binCount); return null; }
在put方法中,可以看出ConcurrentHashMap通过CAS和synchronized来保证线程安全的,当找到哈希桶上的节点为null,通过CAS增加一个节点,当找到哈希桶上的节点不为null,判断节点类型,如果节点类型是ForwardingNode 类型,说明此时正在扩容,当前线程协助扩容,否则通过synchronized锁住头节点,遍历节点,找到目标节点替换value或者在尾部新增一个节点。在put操作中,如果表没有初始化,则会有一个线程执行初始化操作,最后在添加完节点后,调用addCount方法增加元素个数(size),并且判断是否需要扩容。
sizeCtl 是一个ConcurrentHashMap的属性,使用了 volatile 关键字修饰保证并发的可见性,默认为0,用来控制table初始化和扩容操作,当使用非空参数构造完ConcurrentHashMap的时候会将sizeCtl设置成容量,初始化数组的时候,线程会通过Unsafe.compareAndSwapInt()方法(CAS)操作将sizeCtl设置成-1,其他线程判断sizeCtl的值如果是-1则让出CPU即可,初始化table完了,将sizeCtl设置成阈值。
/**
*当前未初始化:
* = 0 未指定初始容量
* > 0 由指定的初始容量计算而来,再找最近的2的幂次方。
* 比如传入6,计算公式为6+6/2+1=10,最近的2的幂次方为16,所以sizeCtl就为16。
* 初始化中:
* = -1 //table正在初始化
* = -N //N是int类型,分为两部分,高15位是指定容量标识,低16位表示并行扩容线程数+1,-(1+n)表示此时有n个线程正在共同完成数组的扩容操作。
* 初始化完成:
* =table.length * 0.75 扩容阈值调为table容量大小的0.75倍
*
*/
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //1. 在table为空的情况下执行初始化操作 while ((tab = table) == null || tab.length == 0) { //2. 如果一个线程发现sizeCtl<0,意味着有另外的线程执行CAS操作成功了,当前线程只需要让出CPU即可 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //3. 需要初始化,CAS将sizectl修改成-1,有且只有要给线程成功 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//SIZECTL是sizeCtl的位置 try { //4. 再次确认数组为空 if ((tab = table) == null || tab.length == 0) { //5. sc 大于零说明容量初始化map时已经指定容量,否则(使用无参构造的)使用默认容量16构造table int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //6. 初始化数组Node Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //7. 计算阈值,相当于n*0.75 sc = n - (n >>> 2); } } finally { //8. 设置阈值 sizeCtl = sc; } break; } } return tab; }
addCount方法主要有2个功能,其一就是将ConcurrentHashMap的元素+1,其二就是根据baseCount的值判断是否进行扩容,ConcurrentHashMap的元素个数的方法如下:
/** * 计算ConcurrentHashMap的size */ public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; //1. baseCount赋值给sum long sum = baseCount; //2. 如果CounterCell数组不为空,将CounterCell元素的value值求和后累加sum if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } //返回sum return sum; } /** * CounterCell 对象,有个volatile的value属性 */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
其一:如果线程安全的操作ConcurrenHashMap的size
可以看出ConcurrentHashMap的元素个数由baseCount和CounterCell数组(CounterCell[] as)的value的和累计而得,CounterCell含有volatile修饰的value属性,当多线程增加ConcurrentHashMap的元素个数时,首先会通过CAS操作修改baseCount+1,如果修改失败,每个线程会根据一个线程随机数&CounterCell数组长度-1(ThreadLocalRandom.getProbe() & as.length - 1)生成的索引值,通过CAS去操作CounterCell数组该索引值对应的CounterCell对象的value+1
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //1.1.判断如果计数盒子counterCells数组不为空,直接执行第一步,增加map元素个数 //1.2.判断如果技术盒子counterCells为空,则CAS执行baseCount+1(元素+1),CAS成功,执行第二步(增加map元素个数已成功),CAS失败,则执行第一步去增加map元素个数 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //第一步:增加ConcurrentHashMap的元素个数 //2.1 进入第一步该方法说明:counterCells数组不为空或者counterCells数组为空但是CAS baseCount+1失败 CounterCell a; long v; int m; boolean uncontended = true;// CAS 数组元素时,有没有发生线程争用的标志 //2.3 如果当前线程的数组元素非空,则尝试将 x 累加到对应数组元素 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //2.4 如果counterCells数组为空,执行fullAddCount方法,否则执行2.5 //2.5 判断counterCells数组的元素为空,执行fullAddCount方法,否则执行2.6 //2.6 当前线程随机数与counterCells数组-1的值对应的CounterCell为空,执行fullAddCount方法,否则执行2.7 //2.7 CAS拿锁(cellValue 0,1)失败,则执行fullAddCount方法 //2.8. 执行fullAddCount方法 fullAddCount(x, uncontended); //2.9 如果调用过 fullAddCount,则当前线程一定不会协助扩容 return; } //3. 如果check<1则啥都不做; binCount=1 说明是链表且替换了head节点的val值; 或者是数组单元是空的, 添加新的head; 就不做数组扩容的操作了; if (check <= 1) return; //4 用计数盒子保存所有总节点数量; 并返回总节点数量; s = sumCount(); } //第二步 检查是否要扩容,如需则扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //3.1 此时sc=sizeCtl=阈值,s=baseCount + x,x=1,baseCount=元素数,s表示加入新元素后容量大小,表示此时的map的元素超过阈值且table不为空,length没有超过最大值 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //3.2 int rs = resizeStamp(n); //3.3 sc小于0 表示已经有线程处理扩容 if (sc < 0) { //3.3.1 如果sizeCtl无符号右移16不等于rs,则标识符变化了 //3.3.2 如果sizeCtl = rs+1,表示扩容结束了,不再有线程进行扩容 //3.3.3 如果sizeCtl = rs + 65535,表示已经达到最大帮助线程数量,即65535 //3.3.4 如果转移下标transferIndex <=0,表示扩容结束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break;//直接结束 //3.4 有新线程参与扩容则sizeCtl加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //3.5 CAS将sizeCtl从阈值设置成负数,成功后扩容操作 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); //3.6 s=元素个数 s = sumCount(); } } }
从上2.5、2.6、2.7的步骤中可以看出在如果以下3种情况下执行fullAddCount方法
fullAddCount方法主要做了以下几件事
private final void fullAddCount(long x, boolean wasUncontended) { int h; //1. 将当前线程随机数赋值给h,并且判断是否为0 if ((h = ThreadLocalRandom.getProbe()) == 0) { //1.1 如果当前线程随机数为0,则初始化当前线程随机数,并将其赋值给h ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); //1.2 将wasUncontended 设置成true(wasUncontended 在addCount默认为true,若执行CAS拿锁(cellValue 0,1)且失败时wasUncontended 为false wasUncontended = true; } //2. 自旋操作 boolean collide = false; // 用来判断CounterCell数组是否扩容 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //2.1 counterCells数组不为空(addCount中2.6或2.7进入,当前线程随机数与counterCells数组-1的值对应的CounterCell为空或CAS拿锁(cellValue 0,1)失败) if ((as = counterCells) != null && (n = as.length) > 0) { //2.2 如果当前线程随机数与counterCells数组-1的值对应的CounterCell为空判断是2.7的情况 if ((a = as[(n - 1) & h]) == null) { //2.3判断锁cellBusy是否是0 if (cellsBusy == 0) { // Try to attach new Cell //2.4 创建CounterCell,value=x CounterCell r = new CounterCell(x); // Optimistic create //2.5如果上锁成功 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //2.5.1 上锁成功, boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; //2.5.2 当前数组counterCell不为空且该索引位置为空,则将创建的CounterCell对象放到该索引位置,然后设置created为true,后面退出循环 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { //2.5.3 释放锁 cellsBusy = 0; } if (created) //2.5.3 成功了就退出循环,否则继续 break; continue; // Slot is now non-empty } } //2.6 如果上锁失败将collide设置成false collide = false; } //2.7counterCells数组的索引位置对象不为空,执行下面 //2.7.1 wasUncontended为false时候进入,且执行完后直接执行3 wasUncontended为false表示CounterCell数组不为空,且通过CAS修改CounterCell对象值时失败且当前线程随机数不为空,这时候会执行执行3,重新生成一个hash值生成一个索引然后再次循环 else if (!wasUncontended) // CAS already known to fail //2.7.2 设置成true wasUncontended = true; // Continue after rehash //2.7.3 再次通过CAS修改CounterCell对象的value,成功跳出,否则2.7.4 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //2.7.4 如果当前线程发现counterCells 数组发生变化,或者数组的容量超过CPU核心数,设置collide = false,下一次循环则执行2.7.5 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale //2.7.5 如果此时collide = false,则设置collide = true,下一次循环时候,collide = true时执行2.7.6 扩容CounterCell数组,执行2.7.6 else if (!collide) collide = true; //2.7.6 CAS拿锁,成功后给CounterCell数组双倍扩容且转移元素 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //3. 重新设置当前线程随机数 h = ThreadLocalRandom.advanceProbe(h); } //4.如果CounterCell数组为空,则拿锁然后初始化一个容量为2的CounterCell的数组 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { //4.1 数组没有发生变化的情况下,初始化CounterCell数组 CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { //4.2释放锁 cellsBusy = 0; } if (init) break; } //5. 如果上面2个操作都失败了,再次尝试CAS 将baseCount+1,成功break,否则自旋 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
注意
addCount的第二部分就是判断是否需要扩容,代码如下:
private final void addCount(long x, int check) { //省略第一步:给集合元素+1 //1. 根据check大小判断是需要扩容,这里的check就是put方法统计的baseCount if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //2. 如果集合容量>sizeCtl(此时为阈值),且数组不为空,集合容量小于最大值,则扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //3.根据数组长度得到一个标志符号 int rs = resizeStamp(n); //4. 如果sc=sizeCtl<0,表示正在扩容中,尝试帮助扩容 if (sc < 0) { //5.协助扩容判断 //5.1 如果sizeCtl无符号右移16位不等于rs,则标志符号变化了,不扩容,break //5.2 如果sizeCtl==rs+1,扩容已经结束,不扩容,break //5.3 如果sizeCtl==rs+65535,扩容线程已达最大数,不扩容,break //5.4 如果nextTable为空,扩容已经结束,不扩容,break //5.5 如果转移下标transferIndex<= 0,扩容已经结束,不扩容,break if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //6.满足扩容判断,执行CAS,将sizeCtl+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) //6.1 扩容操作 transfer(tab, nt); } //7. 如果没在扩容,则将sizeCtl更新成 标志符左移16位+2,也就是一个负数 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //7.1 扩容操作 transfer(tab, null); s = sumCount(); } } }
在put的时候,如果目标key所在的链表节点不为空,且节点类型是ForwardingNode节点(f.hash == -1)意味有其它线程正在扩容,则一起进行扩容操作,源码如下:
tab = helpTransfer(tab, f);
其中ForwardingNode 节点类型的源码如下: 继承自 Node 结点,并且它唯一的构造函数将构建一个键,值,next 都为 null 的结点,反正它就是个标识,无需那些属性。但是 hash 值却为 MOVED。
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
这个节点内部保存了 nextTable 引用,它指向一张 hash 表。在扩容操作中,我们需要对每个桶中的结点进行转移,如果某个桶结点中所有节点都已经转移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,不过我们要插入的桶的位置已经完成了所有节点的迁移。由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容。方法如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //1. 如果table不为空且节点f的类型是转移类型,同时f的nextTable(新table)不为空 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { //2. 根据数组长度得到一个标志符号 int rs = resizeStamp(tab.length); //3. 如果nextTab没有并发修改,且tab也没有被并发修改,同时sizeCtl<0,说明还在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //4. 对sizeCtl的值进行分析 //4.1 如果sizeCtl无符号右移16不等于rs,则标识符变化了 //4.2 如果sizeCtl = rs+1,表示扩容结束了,不再有线程进行扩容 //4.3 如果sizeCtl = rs + 65535,表示已经达到最大帮助线程数量,即65535 //4.4 如果转移下标transferIndex <=0,表示扩容结束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) //5. 满足以上任意一个条件,即停止扩容 break; //6. 如果都不是,则将sizeCtl +1,表示增加一个帮助线程帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //7. 对数组进行转移,执行完结束循环 transfer(tab, nextTab); break; } } return nextTab; } return table; }
什么时候会扩容或着协助扩容,除了上面介绍的2个情况外,如下:
put()方法时会判断节点类型如果是ForwardingNode 节点类型则会协助扩容
除了put()添加元素时会调用addCount(),内部检查sizeCtl看是否需要扩容。
在tryPresize()被调用也会有扩容操作,此方法被调用有两个调用点:
put()方法链表转红黑树时如果table容量小于64(MIN_TREEIFY_CAPACITY),则会触发扩容。
调用putAll()之类一次性加入大量元素,会触发扩容。
具体代码如下:
/** * 链表转红黑树 */ private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //1. 如果数组长度小于64,调用tryPresize双倍扩容 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //链表元素转红黑树,锁住头节点 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } } /** * 插入大量元素 */ public void putAll(Map<? extends K, ? extends V> m) { //1.调用tryPresize扩容 tryPresize(m.size()); for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) putVal(e.getKey(), e.getValue(), false); }
private final void tryPresize(int size) { //1. 计算容量 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; //2. sizeCtl >=0 时候循环 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //3. 判断表为空的情况下,进行初始化操作 if (tab == null || (n = tab.length) == 0) { //3.1 数组长度为sc和c的最大值 n = (sc > c) ? sc : c; //3.2 cas将sizeCtl设置成-1表示初始化操作,有且只有一个线程初始化 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") //3.3 初始化table Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; //3.4求出阈值 sc = n - (n >>> 2); } } finally { //3.5阈值赋值给sizeCtl sizeCtl = sc; } } } //4。数组不为空,判断数组长度如果小于阈值,或者超过最大容量,则break else if (c <= sc || n >= MAXIMUM_CAPACITY) break; //5. 在数组没有发生变化的情况下执行扩容判断 else if (tab == table) { //5.1 根据容量求出标志符号 int rs = resizeStamp(n); //5.2 判断是否有其他线程在扩容,5个判断如上 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) //5.3 不满足扩容条件,直接break break; //5.4 尝试将sizeCtl+1成功后扩容操作,扩容线程+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //5.5 判断没有有其他线程在扩容,该线程扩容,将sizeCtl变成负数 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
get(key)方法比较简单,不涉及并发,直接取值即可
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //1. 根据传入的key求出hash值 int h = spread(key.hashCode()); //2. 如果table不为空,且该hash值对应的节点(首节点)不为空 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //3.首节点的hash值和key相等,返回首节点value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //4.首节点的hash值小于0(红黑树结构),直接遍历,找到目标节点返回value,否则返回null else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //5.否则首节点是链表节点,遍历链表,找到目标节点返回value,否则返回null while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
总结
在 JDK1.7 中,ConcurrentHashMap 采用了分段锁策略,将一个 HashMap 切割成 Segment 数组,其中 Segment 可以看成一个 HashMap, 不同点是 Segment 继承自 ReentrantLock,在操作的时候给 Segment 赋予了一个对象锁,从而保证多线程环境下并发操作安全。
但是 JDK1.7 中,HashMap 容易因为冲突链表过长,造成查询效率低,所以在 JDK1.8 中,HashMap 引入了红黑树特性,当冲突链表长度大于 8 时,会将链表转化成红黑二叉树结构。
在 JDK1.8 中,与此对应的 ConcurrentHashMap 也是采用了与 HashMap 类似的存储结构,但是 JDK1.8 中 ConcurrentHashMap 并没有采用分段锁的策略,而是在元素的节点上采用CAS + synchronized操作来保证并发的安全性