简介
ConcurrentHashMap是一个经常被使用的数据结构,它在线程安全的基础上提供了更好的写并发能力。ConcurrentHashMap跟Map有很大的不同,内部大量使用volatile和CAS等减少锁竞争,当然代码也比HashMap难理解的多,本章基于JDK1.8对ConcurrentHashMap做基本介绍。
ConcurrentHashMap 类
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable
继承AbstractMap抽象类,实现ConcurrentMap接口
ConcurrentMap 接口
public interface ConcurrentMap<K, V> extends Map<K, V>
继承Map接口
ConcurrentMap 方法
// 为空返回默认值 @Override default V getOrDefault(Object key, V defaultValue) { V v; return ((v = get(key)) != null) ? v : defaultValue; } // 如果存在键不覆盖(put 键存在新值覆盖原值) V putIfAbsent(K key, V value); // 删除 boolean remove(Object key, Object value); // 替换,oldValue一样才替换 boolean replace(K key, V oldValue, V newValue); // 替换 V replace(K key, V value);
这里忽略使用Function的方法
重要内部类 Node
static class Node<K,V> implements Map.Entry<K,V>
实现Map.Entry
Node 属性
// 键的hash值 final int hash; // 键 final K key; // 值 volatile V val; // 下一个节点 volatile Node<K,V> next;
Node 构造函数
Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; }
Node 方法
// 获取键 public final K getKey() { return key; } // 获取值 public final V getValue() { return val; } // 获取节点hashCode(键值异或) public final int hashCode() { return key.hashCode() ^ val.hashCode(); } // toString方法 public final String toString(){ return key + "=" + val; } // 直接对节点设置值抛异常 public final V setValue(V value) { throw new UnsupportedOperationException(); } // 节点equals public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; // 必须是Map.Entry实例,key不能为空值不能为空,键值一样 return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } // 搜索键 Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; // 当前节点与h一样,key必须一样 if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; // 节点下移,继续搜索 } while ((e = e.next) != null); } return null; }
重要内部类 TreeNode
static final class TreeNode<K,V> extends Node<K,V>
TreeNode 继承Node
TreeNode 属性
// 父级节点 TreeNode<K,V> parent; // red-black tree links // 左节点 TreeNode<K,V> left; // 右节点 TreeNode<K,V> right; // 前一个节点(跟Node中next组成双向链表) TreeNode<K,V> prev; // 颜色 boolean red;
TreeNode 构造函数
TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { // 初始化父类属性 super(hash, key, val, next); // 初始化上级节点 this.parent = parent; }
TreeNode 方法
Node<K,V> find(int h, Object k) { return findTreeNode(h, k, null); } // 查找元素 final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { if (k != null) { // 当前节点 TreeNode<K,V> p = this; do { // 取左右节点 int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; // h 小于当前节点hash走左边 if ((ph = p.hash) > h) p = pl; // h 大于当前节点hash走右边 else if (ph < h) p = pr; // hash一样,key一样 else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; // hash一样,key不一样,左节点为空 else if (pl == null) p = pr; // ash一样,key不一样,右节点为空 else if (pr == null) p = pl; // hash一样,key不一样,左右不为空 else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) // 通过比较器比较结果,小于0走左边,大于0走右边 p = (dir < 0) ? pl : pr; // 比较器为空先从右边递归 else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; // 右边也没找到,下次从左边找 else p = pl; } while (p != null); } return null; }
重要内部类 TreeBin
static final class TreeBin<K,V> extends Node<K,V>
TreeBin 也继承Node
TreeBin 属性
// 根节点 TreeNode<K,V> root; // 头节点 volatile TreeNode<K,V> first; // 等待 volatile Thread waiter; // 锁定状态 volatile int lockState; static final int WRITER = 1; static final int WAITER = 2; static final int READER = 4; // 内存操作不安全类 private static final sun.misc.Unsafe U; // lockState 偏移量 private static final long LOCKSTATE;
TreeBin 加载初始化
static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = TreeBin.class; LOCKSTATE = U.objectFieldOffset (k.getDeclaredField("lockState")); } catch (Exception e) { throw new Error(e); } }
TreeBin 构造函数
TreeBin(TreeNode<K,V> b) { // 初始化Node属性 super(TREEBIN, null, null, null); // 设置当前元素b为头节点 this.first = b; TreeNode<K,V> r = null; // 遍历b所有next for (TreeNode<K,V> x = b, next; x != null; x = next) { // 获取next next = (TreeNode<K,V>)x.next; // 清空左右节点 x.left = x.right = null; // 初始化r,设置为根节点 if (r == null) { x.parent = null; x.red = false; r = x; } else { // 获取键和hash K k = x.key; int h = x.hash; // 比较器 Class<?> kc = null; // 遍历r for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; // 确定走左边还是右边 if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; // 确定的一边为空,就把x加上去 if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } // 赋值根节点 this.root = r; // 在运行时,如果关闭了assertion功能,这些语句将不起任何作用。 // 如果打开了assertion功能,那么将执行checkInvariants, // 如果它的值为false,该语句强抛出一个AssertionError对象。 assert checkInvariants(root); }
比较大小方法
static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a.getClass().getName(). compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; }
TreeBin 锁定节点
private final void lockRoot() { // 使用CAS把lockState从0改为WRITER(1) if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) // 修改失败 contendedLock(); } private final void unlockRoot() { lockState = 0; } private final void contendedLock() { // 等待状态 boolean waiting = false; for (int s;;) { // lockState为0结果一定是0 if (((s = lockState) & ~WAITER) == 0) { // 使用CAS把lockState从0改为WRITER(1) if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) { // 等待状态 if (waiting) waiter = null; // 成功返回 return; } } else if ((s & WAITER) == 0) {// s不为2时结果为0 // 使用CAS把lockState从s修改为s | WAITER(2) if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) { // 设置是否需要等待为true waiting = true; // 等待线程 waiter = Thread.currentThread(); } } else if (waiting) // 阻塞当前线程 LockSupport.park(this); } }
TreeBin 查找
final Node<K,V> find(int h, Object k) { // 值不能为空 if (k != null) { // 从头节点开始遍历 for (Node<K,V> e = first; e != null; ) { int s; K ek; // 判断是否是锁定状态(lockState为0一定不能进入) if (((s = lockState) & (WAITER|WRITER)) != 0) { // 当前节点hash是否一样,key是否一样 if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; // 节点下移 e = e.next; } // 更新lockState else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) { TreeNode<K,V> r, p; try { // 头不为空,从树中查找 p = ((r = root) == null ? null : r.findTreeNode(h, k, null)); } finally { Thread w; if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER) && (w = waiter) != null) // 唤醒阻塞的线程 LockSupport.unpark(w); } return p; } } } return null; }
TreeBin 添加元素
final TreeNode<K,V> putTreeVal(int h, K k, V v) { Class<?> kc = null; // 是否搜索过 boolean searched = false; // 从跟节点开始遍历 for (TreeNode<K,V> p = root;;) { int dir, ph; K pk; // 根节点为空,构建新的根节点 if (p == null) { first = root = new TreeNode<K,V>(h, k, v, null, null); break; } // 确定左边还是右边 else if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) { if (!searched) { TreeNode<K,V> q, ch; // hash一样,比较器失效,开始搜索 searched = true; if (((ch = p.left) != null && (q = ch.findTreeNode(h, k, kc)) != null) || ((ch = p.right) != null && (q = ch.findTreeNode(h, k, kc)) != null)) return q; } dir = tieBreakOrder(k, pk); } // 下级有空节点开始追加 TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { // 获取一把头节点 TreeNode<K,V> x, f = first; // 更新头节点 first = x = new TreeNode<K,V>(h, k, v, f, xp); // 头节点不为空,原头节点上级设置为新节点 if (f != null) f.prev = x; // 树结构中插入 if (dir <= 0) xp.left = x; else xp.right = x; // 当前xp为叶子节点,叶子节点是红色需要重新平衡 if (!xp.red) x.red = true; else { // 重新平衡需要锁定 lockRoot(); try { // 插入平衡 root = balanceInsertion(root, x); } finally { // 解除锁定 unlockRoot(); } } break; } } assert checkInvariants(root); return null; }
TreeBin 删除
final boolean removeTreeNode(TreeNode<K,V> p) { // 获取上下节点 TreeNode<K,V> next = (TreeNode<K,V>)p.next; TreeNode<K,V> pred = p.prev; // unlink traversal pointers TreeNode<K,V> r, rl; // 上级节点为空,设置next为头节点 if (pred == null) first = next; else // 上级的下级指向当前下级 pred.next = next; // 下级为空下级的上级指向当前上级 if (next != null) next.prev = pred; // 头节点为空 if (first == null) { // 设置root为空 root = null; return true; } // 在树中删除 if ((r = root) == null || r.right == null || // too small (rl = r.left) == null || rl.left == null) // 不成树,直接返回(只有一个元素) return true; // 锁定头节点 lockRoot(); try { TreeNode<K,V> replacement; TreeNode<K,V> pl = p.left; TreeNode<K,V> pr = p.right; // 红黑树删除节点参照HashMap if (pl != null && pr != null) { TreeNode<K,V> s = pr, sl; while ((sl = s.left) != null) // find successor s = sl; boolean c = s.red; s.red = p.red; p.red = c; // swap colors TreeNode<K,V> sr = s.right; TreeNode<K,V> pp = p.parent; if (s == pr) { // p was s's direct parent p.parent = s; s.right = p; } else { TreeNode<K,V> sp = s.parent; if ((p.parent = sp) != null) { if (s == sp.left) sp.left = p; else sp.right = p; } if ((s.right = pr) != null) pr.parent = s; } p.left = null; if ((p.right = sr) != null) sr.parent = p; if ((s.left = pl) != null) pl.parent = s; if ((s.parent = pp) == null) r = s; else if (p == pp.left) pp.left = s; else pp.right = s; if (sr != null) replacement = sr; else replacement = p; } else if (pl != null) // 右边为空 replacement = pl; else if (pr != null) // 左边为空 replacement = pr; else replacement = p; if (replacement != p) { TreeNode<K,V> pp = replacement.parent = p.parent; if (pp == null) r = replacement; else if (p == pp.left) pp.left = replacement; else pp.right = replacement; p.left = p.right = p.parent = null; } // 删除并平衡 root = (p.red) ? r : balanceDeletion(r, replacement); if (p == replacement) { // detach pointers TreeNode<K,V> pp; if ((pp = p.parent) != null) { if (p == pp.left) pp.left = null; else if (p == pp.right) pp.right = null; p.parent = null; } } } finally { // 解锁 unlockRoot(); } assert checkInvariants(root); return false; }
左旋右旋
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) { 。。。 } static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) { 。。。 }
插入删除平衡
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) { 。。。 } static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) { 。。。 }
重要内部类ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V>
ForwardingNode 在扩容移动时会使用
ForwardingNode 属性
// 扩容后数组 final Node<K,V>[] nextTable; **ForwardingNode 构造函数** ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
ForwardingNode 方法
Node<K,V> find(int h, Object k) { // 自旋 outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; // 新数组为空或者头节点为空,返回 if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; // 自旋 for (;;) { int eh; K ek; // 头节点一样 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; // hash 小于 0(说明是treeBin) if (eh < 0) { // 头节点是ForwardingNode实例 if (e instanceof ForwardingNode) { // 获取扩容后新数组 tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else // 继续查找 return e.find(h, k); } // 达到尾节点,返回 if ((e = e.next) == null) return null; } } }
ConcurrentHashMap 属性
// 数组最大长度 private static final int MAXIMUM_CAPACITY = 1 << 30; // 初始化默认长度 private static final int DEFAULT_CAPACITY = 16; // 元素个数最大值(toArray中使用) static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 默认并发级别 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 默认加载因子 private static final float LOAD_FACTOR = 0.75f; // 链表转树阈值 static final int TREEIFY_THRESHOLD = 8; // 树转链表阈值 static final int UNTREEIFY_THRESHOLD = 6; // 树转链表,table长度阈值 static final int MIN_TREEIFY_CAPACITY = 64; // 扩容迁移元素时,每个线程处理16个槽 private static final int MIN_TRANSFER_STRIDE = 16; // 用于生成每次扩容都唯一的生成戳的数 private static int RESIZE_STAMP_BITS = 16; // 最大的扩容线程的数量 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数, // 相反方向移位后能够反解出生成戳 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 表示正在转移 static final int MOVED = -1; // 表示已经转换成树 static final int TREEBIN = -2; // ReservationNode 初始化hash值 static final int RESERVED = -3; // int最大值 static final int HASH_BITS = 0x7fffffff; // 获得可用的处理器个数 static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final ObjectStreamField[] serialPersistentFields = { new ObjectStreamField("segments", Segment[].class), new ObjectStreamField("segmentMask", Integer.TYPE), new ObjectStreamField("segmentShift", Integer.TYPE) }; // 元素数组 transient volatile Node<K,V>[] table; // 扩容后的新的table数组 private transient volatile Node<K,V>[] nextTable; // 元素个数计数器值 private transient volatile long baseCount; // 扩容阈值 private transient volatile int sizeCtl; // 下一个transfer任务的起始下标index private transient volatile int transferIndex; // counterCells 扩容标志 private transient volatile int cellsBusy; // 并发时 private transient volatile CounterCell[] counterCells; // 键集合 private transient KeySetView<K,V> keySet; // 值集合 private transient ValuesView<K,V> values; // 键值实体集合 private transient EntrySetView<K,V> entrySet; // 内存操作不安全类 private static final sun.misc.Unsafe U; // 扩容阈值偏移量 private static final long SIZECTL; // transfer任务任务下标偏移量 private static final long TRANSFERINDEX; // 元素个数偏移量 private static final long BASECOUNT; // cellsBusy偏移量 private static final long CELLSBUSY; // counterCells 偏移量 private static final long CELLVALUE; // table偏移量 private static final long ABASE; // table数组元素偏移量 private static final int ASHIFT;
ConcurrentHashMap 加载初始化
static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
ConcurrentHashMap 构造函数
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); // 计算扩容阈值 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // concurrencyLevel 表示估计的参与并发更新的 // 线程数量,必须比初始化容量的要大 if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; // 扩容阈值 long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
####ConcurrentHashMap 的hash算法
static final int spread(int h) { // 混合高低位,并控制范围 return (h ^ (h >>> 16)) & HASH_BITS; }
在HashMap中hash算法是return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); 而spread方法其实就是hash算法,最后和int最大值取位与只是为了控制结果在int范围内。
ConcurrentHashMap 数组长度计算
private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
此方法请看HashMap篇,里面有详细推导
ConcurrentHashMap 操作数组头节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { // ((long)i << ASHIFT) + ABASE 元素位置 return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { // 修改槽中元素从c改成v return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { // v放到对应槽中 U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
ConcurrentHashMap 添加
public V put(K key, V value) { // 添加 return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // 键或值为空抛异常 if (key == null || value == null) throw new NullPointerException(); // 获取hash int hash = spread(key.hashCode()); int binCount = 0; // 遍历table for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // tab为空(第一次添加) if (tab == null || (n = tab.length) == 0) tab = initTable(); // 查找hash所在的槽是否为空 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 创建新节点并放入对应槽中(失败重新来过) if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } // 正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { // 原值 V oldVal = null; // 锁定头节点 synchronized (f) { // 头节点没有改变过 if (tabAt(tab, i) == f) { // 头节点hash 大于 0 if (fh >= 0) { // 链表添加初始为1 binCount = 1; // 从头节点开始遍历(每次binCount加1) for (Node<K,V> e = f;; ++binCount) { K ek; // 键是否一样 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 获取原值 oldVal = e.val; // 不覆盖开关是否打开,false就覆盖 if (!onlyIfAbsent) e.val = value; break; } // 键不一样e下移 Node<K,V> pred = e; // 后面节点为空 if ((e = e.next) == null) { // 追加元素 pred.next = new Node<K,V>(hash, key, value, null); break; } } } // hash 小于 0 说明是TreeBin(hash为-2) else if (f instanceof TreeBin) { Node<K,V> p; // 树添加设置为2 binCount = 2; // 调用树结构添加 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { // 原值 oldVal = p.val; if (!onlyIfAbsent) // 覆盖原值 p.val = value; } } } } if (binCount != 0) { // binCount 大于8,链表转树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 原值不为空,要么覆盖要么忽略 if (oldVal != null) return oldVal; break; } } } // 能到这儿一定添加成功(需不需要扩容) addCount(1L, binCount); return null; }
如果只看putVal方法ConcurrentHashMap 跟HashMap差不多,就把头元素锁了,但是如果你真这么认为,只能说明没有看第三篇。
ConcurrentHashMap 初始化数组
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // table 不为空退出循环 while ((tab = table) == null || tab.length == 0) { // 判断扩容阈值是否小于0 if ((sc = sizeCtl) < 0) // 让出cpu Thread.yield(); // 修改扩容阈值为-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 判处table是否还没被初始化 if ((tab = table) == null || tab.length == 0) { // sc > 0 使用sc,否则使用默认初始长度 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 创建数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 赋值 table = tab = nt; // 计算下次扩容阈值 // n - n/4 等价于 n*0.75 sc = n - (n >>> 2); } } finally { // 设置新的扩容阈值 sizeCtl = sc; } break; } } return tab; }
ConcurrentHashMap 链表转树
// binCount(链表长度)大于8会调用这个方法 private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; // 数组不能为空 if (tab != null) { // 判断是在是否小于64 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 扩容第三篇会讲 tryPresize(n << 1); // 大于64,获取头节点,判断是否已经是TreeBin else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 锁住头节点 synchronized (b) { // 再取一次头节点判断跟b是否一样 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { // 构建树节点 TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); // 构建树节点的链表结构, // 初始化TreeBin时才转树结构 if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 构建TreeBin,并把它放入当前槽 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
ConcurrentHashMap 树转链表
static <K,V> Node<K,V> untreeify(Node<K,V> b) { Node<K,V> hd = null, tl = null; // 遍历节点 for (Node<K,V> q = b; q != null; q = q.next) { // 构建普通节点 Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null); if (tl == null) hd = p; else tl.next = p; tl = p; } return hd; }
ConcurrentHashMap 键取值
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算hash int h = spread(key.hashCode()); // 判断数组是否为空,然后判断槽中头节点是否为空 // (n - 1) & h 就是取余,请看HashMap篇推导过程 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 头节点hash是否一样 if ((eh = e.hash) == h) { // hash一样看key是否一样 if ((ek = e.key) == key || (ek != null && key.equals(ek))) // key一样返回节点中的值 return e.val; } // hash不一样,判断hash是否小于0 // TreeBin 的hash为-2 else if (eh < 0) // 在红黑树中查找 return (p = e.find(h, key)) != null ? p.val : null; // 能到这说明一定是链表,通过遍历链表查找 while ((e = e.next) != null) { // 判断key是否一样,并节点下移 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
ConcurrentHashMap 删除
public V remove(Object key) { return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { // 计算hash int hash = spread(key.hashCode()); // 自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 判断数组或头节点为空 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) // 直接退出 break; // 判断hash是否为移动状态 else if ((fh = f.hash) == MOVED) // 如果是就帮忙移动,移动完自旋删除 tab = helpTransfer(tab, f); else { // 能到这儿,说明不是再移动 V oldVal = null; boolean validated = false; // 锁住头节点 synchronized (f) { // 判断头节点是否变动 if (tabAt(tab, i) == f) { // 链表删除 if (fh >= 0) { validated = true; // 遍历链表 for (Node<K,V> e = f, pred = null;;) { K ek; // 判断key是否一样 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; // 判断值是否一样 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { // 原值 oldVal = ev; // value不为空,使用value覆盖原值, // 否则移除节点, // 如果移除的是头节点,需要把设置新头节点 if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } // 节点下移 pred = e; if ((e = e.next) == null) break; } } // 树结构中删除 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; // 在树结构中查找 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; // 判断值是否一样 if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; // value不为空,使用value覆盖原值, // 否则,在树结构中删除 // 如果移除的是头节点,需要设置新头节点 if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } // 是否查找过 if (validated) { // 原值是否为空 if (oldVal != null) { // value是否为空,不为空只覆盖不删除 if (value == null) // 长度减一 addCount(-1L, -1); // 返回原值 return oldVal; } break; } } } return null; }
ConcurrentHashMap 清空
public void clear() { long delta = 0L; int i = 0; Node<K,V>[] tab = table; // 遍历数组所有元素 while (tab != null && i < tab.length) { int fh; // 获取头节点 Node<K,V> f = tabAt(tab, i); // 头节点为空,数组下标加1 if (f == null) ++i; // 判断头节点状态 else if ((fh = f.hash) == MOVED) { // 帮忙移动 tab = helpTransfer(tab, f); // 移动完成,重置数组下标 i = 0; } // 正常删除 else { // 锁住头节点 synchronized (f) { // 判断头节点是否变化 if (tabAt(tab, i) == f) { // 获取链表或树的头节点 Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null); // 遍历个数 while (p != null) { --delta; p = p.next; } // 清空当前槽 setTabAt(tab, i++, null); } } } } // 修改总长度 if (delta != 0L) addCount(delta, -1); }
ConcurrentHashMap 在修改链表或者树时都会锁住头元素,其他修改操作就拿不到锁,他们就会等待。
ConcurrentHashMap 的addCount
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 先判断counterCells是否为空,为空通过cas修改baseCount // 要么修改baseCount失败(s=b + x 一样会执行) if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 先判断counterCells为空, // 然后随机取余数获取counterCells下标a,并取值 // (counterCells长度必须是2的n次方) // 取值为空修改counterCells数组a位置值,修改失败调用fullAddCount if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } // 链表长度小于等于1 if (check <= 1) return; // 统计 s = sumCount(); } // 链表长度大于等于0 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果map.size() 大于 sizeCtl扩容阈值 且 // table 不是空;且 table 的长度小于 1 << 30 // 这时sc为原扩容阈值 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // 生成一个扩容戳 int rs = resizeStamp(n); // 原扩容阈值小于0(第一次一定不会进去) if (sc < 0) { // 能到这一定在扩容 // 这里5个判断,意思是有任何一个满足就不能帮忙搬移元素 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 修改阈值加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 帮忙搬移元素 transfer(tab, nt); } // 修改扩容阈值为(rs<<16)+2,一定是负的 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 开始迁移元素 transfer(tab, null); // 统计元素个数 s = sumCount(); } } }
这里逻辑非常绕,我们要拆开看。元素计数器counterCells,为什么要用元素计数器?
当并发比较高并且所有线程都是修改元素个数,会是什么情况?做到了高效添加删除节点,但是所有线程最后都去竞争改baseCount,性能损耗严重,性能又被拉下来了。counterCells就是用来解决修改baseCount性能损耗问题,没有并发时直接修改baseCount,有并发时一定有修改baseCount修改失败,失败又不能不管,这时失败的就在counterCells数组中随便找个元素记一下,记录方式就是数组中元素原值加1(元素是对象)。在获取元素个数时,需要把baseCount加上counterCells所有元素值加到一起就是总元素个数,这也是ConcurrentHashMap 分片统计原理。
ConcurrentHashMap 统计元素及使用
public int size() { // 返回当前元素个数 long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { // 遍历所有计数器 for (int i = 0; i < as.length; ++i) { // 元素不为空时参与统计 if ((a = as[i]) != null) sum += a.value; } } return sum; } // 统计数为0时Map为空 public boolean isEmpty() { return sumCount() <= 0L; }
ConcurrentHashMap 扩容
// 第一个扩容扩容线程nextTab为null private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // stride是每个线程转移几个槽 // 将 length/8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // nextTab 为空(第一个线程调用) if (nextTab == null) { try { // 新数组(长度为原来2倍) Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // 数组越界,超过int最大值 sizeCtl = Integer.MAX_VALUE; return; } // 使用新数组赋值nextTable nextTable = nextTab; // 更新转移下标,就是 老的 tab 的 length transferIndex = n; } // 新数组长度 int nextn = nextTab.length; // 创建迁移节点 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 领任务 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 如果 i 小于0 // 如果 i >= tab.length // 如果 i + tab.length >= nextTable.length if (i < 0 || i >= n || i + n >= nextn) { int sc; // 如果完成了扩容 if (finishing) { // 置空nextTable,赋值新table,设置新扩容阈值 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } // 能到这说明还在扩容。设置sc-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 没有线程在帮助他们扩容了。也就是说,扩容结束了。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 扩容完成 finishing = advance = true; i = n; } } // 原槽为空时置入fwd else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 原槽头节点已经是ForwardingNode对象,忽略 else if ((fh = f.hash) == MOVED) advance = true; // 开始迁移 else { // 锁住头节点 synchronized (f) { // 看一下头节点有没有改变 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // hash是否大于0 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; // 遍历链表 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; // 元素分类 if (b != runBit) { runBit = b; lastRun = p; } } // 高位位0(不需要移动) if (runBit == 0) { ln = lastRun; hn = null; } // 高位位1(需要移动) else { hn = lastRun; ln = null; } // 组装新链表 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 放入新数组槽中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 原槽中使用占位符 setTabAt(tab, i, fwd); // 成功 advance = true; } // 头节点为TreeBin else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍历树取两个树(lo、hi) for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 长度小于6时,树退链 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 放入新数组槽中 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 原槽中使用占位符 setTabAt(tab, i, fwd); // 完成 advance = true; } } } } } }
扩容跟HashMap区别还是比较大的,单核时一个线程迁移所有槽,多线程时,每个线程最小迁移16个槽。
ConcurrentHashMap 链转树触发扩容
private final void tryPresize(int size) { // size在传入之前就已经*2了,判断size合法性 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 数组为空 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; // 修改扩容阈值 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 初始化table if (table == tab) { Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { // 设置新的扩容阈值 sizeCtl = sc; } } } // 不需要扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; // table是否改变 else if (tab == table) { // 下面这一坨跟addcount后半段是一个意思 int rs = resizeStamp(n); // 阈值小于零(被扩容线程修改为负的) if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 帮忙迁移元素 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 打上迁移标示 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }