之前研读过HashMap源码,实现过一遍HashMap源码。但是HashMap是线程不安全的,而ConcurrentHashMap是线程安全的,博主我觉得自己多线程方面差了点,遂决定去研究一番ConcurrentHashMap。这一看,emmm,妙呀,有时一个方法起码得研究好几天,打开一篇知识的海洋。笔者去年终于看完整个put方法的处理,〒▽〒,实现到扩容那里了。后面由于换工作,面试,这事就耽搁下了,如今我重新捡起来,主要是我刚写完一篇HashMap的源码解读。嗯,我最近也在并行了解分布式事务。换着来,比较不那么容易腻一些。
ConcurrentHashMap是concurrent包下的线程安全的Map集合。HashMap是线程不安全,HashTable是线程安全,但是它是对整个hash表加锁,锁的范围太大。
结构
由HashTable可知,在多核多CPU的情况下,多线程可以并行,但是当遇到了HashTable时就变得不一样了。由于整个HashTable加了锁,每次只有一个线程能拿到锁执行,其他线程等待。本来每次可以同时干多件事,变成每次只能干一件事,效率就低了。而1.7的ConcurrentHashMap将锁的范围变小,每次不锁整个表,可锁的范围分成多块,代表可以有多个线程同时干活。
而这时的ConcurrentHashMap就不一样了。 当put一个元素时:
1.通过对key的hash计算得到一个下标,是Segment[]的下标
2.线程申请该segment的锁,拿到segment[i]锁
2.1.如果获取不到锁,循环一定次数去尝试拿锁,超过一定次数没获取到锁后,线程进入 阻塞状态等待。
3.开始对segment[i]的hashEntry进行put操作。
3.1Segment继承自ReentrantLock,独占的可重入式锁。所以拿到Segment后的操作不需要考虑并发,因为Segment是独占锁。
由此可知,Segment[]代表着并发度,16个Segment代表可以同时有16线程并行,而Segment是不会扩容的,只有Segment[i]下的HashEntry会扩容。当并发量越来越来大,即使有个16个并发度,对于数量大的线程数面前是不够用的,供小于求,照样会有大量的线程进入阻塞状态。
所以就有1.8的优化,直接抛弃segment这种方式。
如图,1.8时ConcurrentHashMap的结构与HashMap一样,就是一个单纯的链表数组,计算hash、寻址、扩容时链表拆分等基本计算方法和HashMap的方式是一样的。没有Segment[],所以它的并发度就是数组的长度。既然如此,没了Segement可重入锁,如何保证线程安全,put和扩容的线程安全如何保障?解决方法在于对CAS的使用,和对线程的控制。
在开始解析concurrentHashMap前,需要知道的一些知识点
CAS即比较并交换,首先线程从内存中读取x的值a,而后再将a值于当前内存中x的值,设此时内存x的值为b;比较a和b,若期待值a与b相同,则修改内存中x的值为c,不同则不修改。这是一种乐观锁策略,不加锁。
在多线程的情况下,CAS会产生ABA的问题:
内存中有X值为1,A线程拿到X的值为1,这时B线程抢占cpu时间片,将内存中的X先修改为2, 后又修改为1;这时线程A再回来查看,发现X的值还是1,遂修改为3。
ABA的问题好像不是什么大问题,但在程序中,值一样不代表没有变化,比如内存地址可能变了。
· 解决这种有两个方法:
①使得比较修改这部分操作具有原子性,比如再秒杀场景中,使用lua脚本操作redis修改数据,
因为redis只支持乐观锁,lua程序执行是原子性的。
②增加版本号,每修改一次,版本号加1,每次比较根据版本号判断。
java的线程内存模型如下图:↓
首先java内存模型规定所有变量都存在主存中,每个线程都有自己的工作内存,所有操作都在工作内存进行,不能直接操作主存,也不能操作其他线程的工作内存。
线程修改一个变量,先修改工作内存中的值,再写入主存中(什么时候写入主存未知)。
那么对于多个线程的共享的变量来说,假如线程a,线程b共同操作变量y,线程a修改了y变量值为2 ,还没有写入主存中,那线程b首先能直接操作的是自己工作内存,这时自己的工作内存中y的值不是 2,那么如何使得线程b知道变量y的值已变成2了呢?
· 对于这个问题,java提供一个关键字volatile,使得共享变量可见。具体步骤如下:
当一个变量x被volatile关键字修饰时,当线程1在工作内存修改了x的值时,会强制将x的值写入主存中;
其次由于线程1的修改操作,会导致线程2工作内存中x的缓存无效,所以当线程1要读取x时,会到主存中读取.
要注意的是,volatile关键字能保证操作的可见性,但它没法保证操作的原子性:
· 比如,有一共享变量y值为1.
· 线程1首先获取了共享变量y的值 1。
· 而这时线程2抢占cpu时间片获取共享变量y,并修改将y的值加1等于2,随后刷新主存中y的 值;
· 那么这时线程1将之前获取到的y的值加1,变成2,刷新主存y值变成2;
· 按理来说,y是共享变量,两条线程都对其进行加1操作,主存中的值应该是3才对.但是由于线程对共享变量y的加1操作不具有原子性,导致最后结果的误差.仔细想想,这个事好像事务的隔离级别的那种.
并发
并发是指,多个线程在一个cpu上交替执行,在整体上看来像是多个线程在同时做事,其实只是它们之间切换的快。
并行
并行是指,在同一时刻同时干两件事。在多核多cpu的情况下,多线程可能分布在不同的逻辑处理器中,它们不需要竞争cpu的资源。
一般开启多线程,这个线程数也是有讲究的,比如我的电脑是1个cpu,4内核,8个逻辑处理器,一个逻辑处理器一个线程,说明1个内核可以支持2个超线程。所以是8线程比较合适。
在解析put方法之前必须要知道的核心操作方法和核心全局变量
Unsafe在java.util.conncurrent包下的类中占距很重要的位置,里面大多是CAS操作。Unsafe类使得java拥有像C语言指针一样操作内存空间的能力。
Unsafe可以直接操作内存,速度会更快,在并发的条件下能提供更好的效率。但也意味着不安全,不受jvm管理,无法被GC,需要自己释放,一个不小心会内存泄漏。
Unsafe类中大多方法被native修饰,意思是这些方法调用的都是c语言实现接口。
/** * 比较地址相较于对象o地址偏移l 处的值是否等于o1,是则修改其值为o2,并返回true。 * 否,则返回false * params: * o-比较交换的对象实例 * l-偏移量,在o地址的基础上的偏移量 * o1-期望值,即期望主存中o的值 * o2-修改值 */ public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2); /** * 这个方法与上一个方法的区别是,要比较修改的是一个int类型 */ public final native boolean compareAndSwapInt(java.lang.Object o, long l, int i, int i1); /** * 这个方法与上一个方法的区别是,要比较修改的是一个long类型 */ public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2); /** * 获取主存中o对象中偏移l的值 */ public native java.lang.Object getObjectVolatile(java.lang.Object o, long l); /** * 对象o中偏移l处的值修改为o1,并对其他线程立马可见 */ public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1); /** * 获取数组的第一个元素的地址,即数组首地址,知道c语言指针的应该秒懂。 */ public native int arrayBaseOffset(java.lang.Class<?> aClass); /** * 获取数组元素所占字节个数,比如int[]数组,int类型占4个字节,返回4 */ public native int arrayIndexScale(java.lang.Class<?> aClass); 复制代码
关于ABASE,ASHIFT是用来获得数组元素的偏移量,根据偏移量访问并操作数组元素。
ABASE是数组首地址,ASHIFT表示数组元素占1<< ASHIFT个字节数;
举个例子:假设有个long[]数组,首地址ABASE=16,long占8个字节,那么ASHIFT=3,下标为i=3的元素的地址是 16+(3-0)×8=16+3×8=40=ABSE+(i-0)×(1<< ASHIFT)=ABASE+i<< ASHIFT
ps: 在sun公司的源码中,只有计算机的二进制运算,比如&、|、^、<<、>>>,因为相较于×、÷,二进制运算快得多。所以ASHIFT的存在只是为了方便二进制运算
// Unsafe mechanics private static final sun.misc.Unsafe U; //sizeCtl的偏移量 private static final long SIZECTL; //transferIndex的偏移量 private static final long TRANSFERINDEX; //baseCount的偏移量 private static final long BASECOUNT; //cellsBusy的偏移量 private static final long CELLSBUSY; //cellValue的偏移量 private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } 复制代码
这个变量简直贯穿ConcurrentHashMap扩容啊,我当时为了搞清它,都卡了好久。
sizeCtl 这个变量是个多线程共享变量,-1的时候代表有线程在初始化数组,小于-1代表有
(-sizeCtl-1)条线程在协调扩容,大于0时记录的是触发扩容的阈值。
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl; 复制代码
当前使用的数组对象
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node<K,V>[] table; 复制代码
扩容时指向的新数组对象
/** * The next table to use; non-null only while resizing. */ private transient volatile Node<K,V>[] nextTable; 复制代码
在put成功后通过CAS(Unsafe.compareAndSwapLong()方法)自增,如果CAS失败,说明有其他线程竞争,那么直接使用LongAdder的方法来计数,这涉及到另一个变量counterCells
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount; 复制代码
计数器数组,思想是longAdder原理。详细下面addCount()方法解析时会详细讲。
总之,concurrentHashMap的size = baseCount + (counterCells所有元素值累加值)
/** * Table of counter cells. When non-null, size is a power of 2. */ private transient volatile CounterCell[] counterCells; 复制代码
在扩容时,需要处理的链表(链表数组嘛)所在的下标范围最大值+1,或者说要处理的链表的个数。比如,旧数组长度是16,扩容时,nextTab.length=32,那么要处理节点的总范围是第一个到第32个,即transferIndex=32.
/** * The next table index (plus one) to split while resizing. */ private transient volatile int transferIndex; 复制代码
这部分和HashMap一致,我就贴贴源码了,具体看我解析java8 HashMap源码的文章😂
/** * Creates a new, empty map with the default initial table size (16). */ public ConcurrentHashMap() { } 复制代码
与HashMap不同的一点是,将初始数组长度赋给sizeCtl
/** * Creates a new, empty map with an initial table size * accommodating the specified number of elements without the need * to dynamically resize. * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative */ public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } 复制代码
/** * Creates a new map with the same mappings as the given map. * * @param m the map */ public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } 复制代码
与HashMap一样的原理,详见解析java8 HashMap源码的文章
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; } 复制代码
重磅级,真的很复杂,光这个一个方法和调用的方法我都整了好久。
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p>The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { return putVal(key, value, false); } 复制代码
用到的全局共享变量sizeCtl。
功能:初始化数组,和sizeCtl变量为触发扩容阈值
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //CAS操作标志,首先加上while循环,当数组尚未被初始化时,会一直尝试去初始化数组 while ((tab = table) == null || tab.length == 0) { /**sizeCtl<0,两种可能,1)-1:初始化中;2)<-1:扩容中 * 由上一步while循环的判断条件可知,显然是有其他线程在初始化中。 * 所以当前线程让步,等待另外一条线程执行初始化完成。 */ if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin /**比较主存中sizeCtl是否还是上一步获取的值,是则说明当前没有其他线程正在初 * 始化中,修改其值为-1; * 否,则说明已有其他线程在初始化中,继续循环 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次检查是否有其他线程初始化完成 if ((tab = table) == null || tab.length == 0) { /**在未初始化之前,sizeCtl>0还有种情况,即调用了有参构造方法1时 * 初始化为ConcurrentHashMap的初始大小 */ int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { /** * 若try代码块执行成功,是修改共享变量值为触发扩容阈值; * 若try代码块执行失败,是将sizeCtl的值还原 **/ sizeCtl = sc; } break; } } return tab; } 复制代码
CAS修改tab[i]值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } 复制代码
这个方法里面的spread(key.hashCode()) 方法和HashMap方式一毛一样,就不介绍了
/** Implementation for put and putIfAbsent * onlyIfAbsent false-代表如果key键存在,并且已有相应的vlaue值,覆盖旧值 * true-代表不覆盖旧值 */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); // int binCount = 0; //遍历数组 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //若数组尚未初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //hash寻址到的i位置为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } 复制代码