以下源码分析基于 JDK 1.8。
写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。
写操作需要加锁,防止并发写入时导致写入数据丢失。
写操作结束之后需要把原始数组指向新的复制数组。
public boolean add(E e) { //加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // newElements 是一个复制的数组 Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; // 写操作在一个复制的数组上进行 setArray(newElements); return true; } finally { lock.unlock(); } } final void setArray(Object[] a) { array = a; }
@SuppressWarnings("unchecked") private E get(Object[] a, int index) { //读取操作仍然在原始的数组中 return (E) a[index]; }
CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,很适合读多写少的应用场景。
CopyOnWriteArrayList 有其缺陷:
所以 CopyOnWriteArrayList 不适合内存敏感以及对实时性要求很高的场景。
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
ConcurrentHashMap 和 HashMap 实现上类似,最主要的差别是 ConcurrentHashMap 采用了分段锁(Segment),每个分段锁维护着几个桶(HashEntry),多个线程可以同时访问不同分段锁上的桶,
从而使其并发度更高(并发度就是 Segment 的个数)。
Segment 继承自 ReentrantLock。
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; }
final Segment<K,V>[] segments;
默认的并发级别为 16,也就是说默认创建 16 个 Segment。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
每个 Segment 维护了一个 count 变量来统计该 Segment 中的键值对个数。
/** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ transient int count;
在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。
ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。
尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。
如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。
/** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ static final int RETRIES_BEFORE_LOCK = 2; public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { // 超过尝试次数,则对每个 Segment 加锁 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } // 连续两次得到的结果一致,则认为这个结果是正确的 if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
ConcurrentHashMap 取消了 Segment 分段锁。
JDK 1.8 使用 CAS 操作来支持更高的并发度,在 CAS 操作失败时使用内置锁 synchronized。
数据结构与HashMap 1.8 的结构类似,数组+链表 / 红黑二叉树(链表长度 > 8 时,转换为红黑树 )。synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 Hash 值不冲突,就不会产生并发。
(1)hash 算法
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
(2)定位索引位置
i = (n - 1) & hash
(3)获取 table 中对应索引的元素 f
f = tabAt(tab, i = (n - 1) & hash // Unsafe.getObjectVolatile 获取 f // 因为可以直接指定内存中的数据,保证了每次拿到的数据都是新的 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }Copy to clipboardErrorCopied
(4)如果 f 是 null,说明 table 中是第一次插入数据,利用
(5)其余情况把新的 Node 节点按链表或红黑树的方式插入到合适位置,这个过程采用内置锁实现并发。
底层数据结构:
实现线程安全的方式