ConcurrentHashMap 是Java并发包中提供的一个线程安全且高效的HashMap实现
HashMap的缺点:
多线程环境下HashMap会有线程安全问题,扩容可能会造成环形链表,使cpu空转达到100%,但是HashTable可以保证线程安全
HashTable缺点:
底层使用synchronized锁保证线程安全问题,但是将整个数组锁住了,最终只有一个线程能够调用get或者是put方法,如果没有获取锁的线程就会变为阻塞状态,效率非常低。
多线程环境下建议使用ConcurrentHashMap
集合
ConcurrentHashMap
相当于把一个大的ConcurrentHashMap
集合拆分成16个HashTable
类,每次存放先要计算存放在哪个HashTable
里面,然后还要计算存放在HashTable
里面的哪个HashEntry<K,V>
里面,相当于把锁的粒度进行拆分了,把大锁拆分成小锁。
核心构造参数分析
//初始的容量 private static final int DEFAULT_CAPACITY = 16; //最大容量 private static final int MAXIMUM_CAPACITY = 1 << 30; //HashEntry table的扩容因子 private static final float LOAD_FACTOR = 0.75f; // 默认的并发度 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最小的segment数量 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //最大的segment数量 static final int MAX_SEGMENTS = 1 << 16;
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //验证参数 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //并发级别大于并发值的话 if (concurrencyLevel > MAX_SEGMENTS) //最大为 concurrencyLevel = MAX_SEGMENTS; //sshift=ssize平方的次 =4 int sshift = 0; //ssize=segments数组的容量=16 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; //32-4=28 计算index segmentMask = ssize - 1; //16-1=15 与运算的时候均匀的存放到Segment对象中 this.segments = Segment.newArray(ssize); //数组容量最大不能大于2的三十次幂 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //HashEntry 的初始容量 16/16=1 int c = initialCapacity / ssize; //如果1*16<16 if (c * ssize < initialCapacity) ++c; //HashEntry table默认大小为2 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //创建第一个Segment,并放入Segment[]数组中,作为第一个Segment Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]); //初始化16大小的Segment Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //利用cas把s0放入ss中 ss[0]=s0 UNSAFE.putOrderedObject(ss, SBASE, s0); //赋值给全局segments this.segments = ss; }
这里在构造函数初始化S0,目的就是方便其他的key落到不同的Segment
中,能够知道创建的Segment
的参数,直接采用这个参数
public V put(K key, V value) { Segment<K,V> s; //参数判断 if (value == null) throw new NullPointerException(); //这里对key求hash值,并确定应该放到segment数组的索引位置 int hash = hash(key); //j为索引位置 segmentShift=28 segmentMask=15 hash >>> segmentShift保留最高位4位 int j = (hash >>> segmentShift) & segmentMask; //判断索引下是否有Segment对象,没有帮忙创建 s=Segment[j]=null if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) //创建一个Segment对象 s = ensureSegment(j); //这里很关键,找到了对应的Segment,则把元素放到Segment中去 return s.put(key, hash, value, false); }
根据key的hash值,在Segment数组中找到相应的位置,如果相应位置的Segment还未初始化,则通过CAS进行赋值,接着执行Segment对象的put方法通过加锁机制插入数据
private Segment<K,V> ensureSegment(int k) { //拿到Segments[]数组 final Segment<K,V>[] ss = this.segments; //获取k所在的segment在内存中的偏移量 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //获取k所在的segmen 判断k所在的segmen是否为null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //初始化一个segment,直接使用Segments[0]的属性,这就是为什么在构造方法中初始化Segments[0]的原因 //Segments[0]在初始化ConcurrentHashMap时,已经初始化了一个segment放到Segments[0] Segment<K,V> proto = ss[0]; // use segment 0 as prototype //然后就是获取Segments[0]中HashEntry数组的数据 int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); //初始化一个HashEntry数组,大小和Segments[0]中的HashEntry一样。 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //第二次获取k所在的segment(防止其他线程已经初始化好) if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //如果还是null,创建一个segment并通过cas设置到对应的位置 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); //第三次获取k所在的segmen,判断segmen是否为null,自旋CAS来创建对应Segment while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // CAS if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
这里有三个CAS查询
第一个判断==null,获取s0默认参数信息、加载因子、初始容量,tab = (HashEntry<K,V>[])new HashEntry[cap]
第二个判断==null, new Segment<K,V>(lf, threshold, tab)
第三个判断==null,把s赋值给seg,整个方法就是相当于创建一个Segment<K,V>
自旋CAS来创建对应Segment,即使不加锁也能保障线程安全
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //先尝试对segment加锁,如果直接加锁成功,那么node=null;如果加锁失败,则会调用scanAndLockForPut方法去获取锁, //在这个方法中,获取锁后会返回对应HashEntry(要么原来就有要么新建一个) scanAndLockForPut自旋的方式再次尝试获取锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //这里是一个优化点,由于table自身是被volatile修饰的,然而put这一块代码本身是加锁了的,所以同一时间内只会有一个线程操作这部分内容, //所以不再需要对这一块内的变量做任何volatile修饰,因为变量加了volatile修饰后,变量无法进行编译优化等,会对性能有一定的影响 //故将table赋值给put方法中的一个局部变量,从而使得能够减少volatile带来的不必要消耗。 HashEntry<K,V>[] tab = table; //计算当前元素所在HashEntry的index int index = (tab.length - 1) & hash; //先获取需要put的<k,v>对在当前这个segment中对应的链表的表头结点。 HashEntry<K,V> first = entryAt(tab, index); //开始遍历first为头结点的链表 for (HashEntry<K,V> e = first;;) { //e不为空,说明当前键值对需要存储的位置有hash冲突 if (e != null) { //依据onlyIfAbsent来判断是否覆盖已有的value值 K k; //说明键的Hash值一样,内容也一样 if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { //key相同,那么直接用新的元素覆盖旧的元素 oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } //未进入上面的if条件中,说明当前e节点对应的key不是需要的,直接遍历下一个节点。 e = e.next; } else {//<2> 头插法 //进入到这个else分支,说明e为空,对应有两种情况下e可能会为空,即: // 1>. <1>中进行循环遍历,遍历到了链表的表尾仍然没有满足条件的节点。 // 2>. e=first一开始就是null(可以理解为即一开始就遍历到了尾节点) if (node != null) //这里有可能获取到锁是通过scanAndLockForPut方法内自旋获取到的,这种情况下依据找好或者说是新建好了对应节点,node不为空 node.setNext(first); // 当然也有可能是这里直接第一次tryLock就获取到了锁,从而node没有分配对应节点,即需要给依据插入的k,v来创建一个新节点 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //总数+1 在这里依据获取到了锁,即是线程安全的!对应了上述对count变量的使用规范说明。 if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判断是否需要进行扩容 //扩容是直接重新new一个新的HashEntry数组,这个数组的容量是老数组的两倍, //新数组创建好后再依次将老的table中的HashEntry插入新数组中,所以这个过程是十分费时的,应尽量避免。 //扩容完毕后,还会将这个node插入到新的数组中。 rehash(node); else //数组无需扩容,那么就直接插入node到指定index位置,这个方法里用的是UNSAFE.putOrderedObject //网上查阅到的资料关于使用这个方法的原因都是说因为它使用的是StoreStore屏障,而不是十分耗时的StoreLoad屏障 //给我个人感觉就是putObjectVolatile是对写入对象的写入赋予了volatile语义,但是代价是用了StoreLoad屏障 //而putOrderedObject则是使用了StoreStore屏障保证了写入顺序的禁止重排序,但是未实现volatile语义导致更新后的不可见性, //当然这里由于是加锁了,所以在释放锁前会将所有变化从线程自身的工作内存更新到主存中。 //这一块对于putOrderedObject和putObjectVolatile的区别有点混乱,不是完全理解,网上也没找到详细解答,查看了C源码也是不大确定。 //希望有理解的人看到能指点一下,后续如果弄明白了再更新这一块。 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
多线程下put方法获取锁失败,就会使用scanAndLockForPut
遍历获取锁然后进行数据插入
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //获取k所在的segment中的HashEntry的头节点(segment中放得是HashEntry数组,HashEntry又是个链表结构) HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first;//缓存需要插入的数据 HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) {//自旋尝试获取k所在segment的锁 HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) {//所在HashEntry链表不存在,则根据传过来的key-value创建一个HashEntry if (node == null) node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key))//找到要放得值,则设置segment重试次数为0 retries = 0; else //从头节点往下寻找key对应的HashEntry e = e.next; } else if (++retries > MAX_SCAN_RETRIES) {//超过最大重试次数就将当前操作放入到Lock的队列中 阻塞了 lock(); break; //retries只有0和1 如果发现头节点有变化,就重新获取HashEntry链表的头结点 } else if ((retries & 1) == 0 &&(f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
trylock尝试获取锁不会阻塞,lock获取锁不成功会阻塞,先缓存了当前冲突的链表,然后每次检查当前链表头节点是否有变化,如果有变化说明被修改过,因为采用的是头插法。在发生变化的情况下,修改为最新的。
通过MAX_SCAN_RETRIES控制自旋次数最大为64,防止无限制的重复自旋浪费资源。
底层实现原理:
由多个不同的Segment对象组成,lock锁锁住,用Unsafe类查询主内存数据,使用cas做修改
假如有16个线程均匀分布在每个Segment上,往里面添加数据,锁没有产生竞争,那么效率是没有任何影响的。但如果有三个线程同时都获取到同一个Segment中,那么这三个key都会上锁,JDK1.7里面采用乐观锁lock保证线程安全。
//默认为0 -1标示其他线程已经在扩容了 -2标示当前有两个线程在扩容 private transient volatile int sizeCtl;
final V putVal(K key, V value, boolean onlyIfAbsent) { //key不允许为空 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //记录链表长度 大于8的情况下转为红黑树 int binCount = 0; //死循环 相当于自旋 table是全局可见变量 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //第一次循环如果tab为空就做初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //第二次循环tab不为空 计算key所在的index 查找该node节点 如果没有发生index冲突 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //CAS 在当前位置赋值一个node节点 if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //其他线程在扩容,参与一起扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //index节点发生冲突之后 else { V oldVal = null; //给当前index位的node节点加锁 synchronized (f) { //再次赋值判断 如果是当前缓存的节点 if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //key值比较相等就修改覆盖 if (e.hash == hash &&((ek = e.key) == key ||(ek != null&& key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //不相等的话 追加在链表后面 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //判断是否为红黑树 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // if (binCount != 0) { //binCount>8 转换为红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //对每个线程里面保存的size进行求和 算出整个集合的元素 addCount(1L, binCount); return null; }
加锁分为了两种:
1、没有发生hash冲突的时候,如果添加的元素的位置在数组中是空的话,那么就使用CAS的方式来加入元素,这里加锁的粒度是数组中的元素
2、如果出现了hash冲突,添加的元素的位置在数组中已经有了值,使用的synchronized,那么又存在三种情况
(1)key相同,那么直接用新的元素覆盖旧的元素
(2)如果数组中的元素是链表的形式,那么将新的元素挂载在链表尾部
(3)如果数组中的元素是红黑树的形式,那么将新的元素加入到红黑树
锁住的对象就是数组中的node元素,加锁的粒度和第一种情况相同
初始化Table
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //tab为空的时候 while ((tab = table) == null || tab.length == 0) { //sizeCtl如果小于0表示已经有其他线程在做扩容了 当前如果只有一个线程那么sizeCtl=0 if ((sc = sizeCtl) < 0) //当前线程就释放cpu执行权 如果线程被唤醒那么tab肯定不为空 Thread.yield(); //CAS修改SIZECTL为-1 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //如果当前tab为空的情况下 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //对node初始化长度默认16 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //n=16 16>>>2=4 sc=12 扩容大小 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } //初始化完毕 return tab; }
只有一个线程情况下就会把SIZECTL改为-1,然后赋值table结束循环返回数据,但是16个线程进来同时,只有一个能够进入到tab为空的方法判断,SIZECTL为0,这时候剩下15个一直在自旋等待SIZECTL修改为-1,假如第一个线程还没执行完就挂机,就会导致剩下的一直进行自旋,使CPU飙升。
ReentrantLock+Segment+HashEntry
一个 ConcurrentHashMap 里包含一个 Segment 数组。一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment的锁。
put操作时需要计算两次hash索引,先计算所在Segment的下标,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后计算在该Segment中HashEntry 的下标,ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,使用尾插法。如果Segment被其他线程占有,当前线程会以自旋的方式继续尝试获取锁知道达到最大重试次数就挂起等待唤醒。
ConcurrentHashMap 的扩容是仅仅和每个Segment元素中HashEntry数组的长度有关,但需要扩容时,只扩容当前Segment中HashEntry数组即可。也就是说ConcurrentHashMap中Segment[]数组的长度是在初始化的时候就确定了默认16,后面扩容不会改变这个长度。
程序运行时能够同时更新 ConccurentHashMap且不产生锁竞争的最大线程数。在JDK1.7中,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度,默认是16,这个值可以在构造函数中设置。
如果自己设置了并发度,ConcurrentHashMap 会使用大于等于该值的最小的2的幂指数作为实际并发度,也就是比如你设置的值是17,那么实际并发度是32。
如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。
实际上就是HashMap的数据结构+synchronized锁,只需要锁住这个链表的头节点就不会影响其他的哈希桶数组元素的读写,大大提高了并发度。
JVM
的内存压力,基于API
的ReentrantLock
会开销更多的内存根据 key 计算出 hash 值,判断是否需要进行初始化;
定位到 Node,拿到首节点 f,判断首节点 f:
如果为 null ,则通过 CAS 的方式尝试添加;
如果为 f.hash = MOVED = -1 ,说明其他线程在扩容,参与一起扩容;
如果都不满足 ,synchronized 锁住 f 节点,判断是链表还是红黑树,遍历插入;
1.7lock锁是加上了while循环实现自旋,效率不高,JDK1.6以后 对 synchronized锁做了优化,
hash没有发生冲突的时候使用CAS,已经发生冲突的时候使用synchronized锁。
不一定,极低的概率下会发生CPU飙升的情况。