Java教程

java8 ConcurrentHashMap源码&原理解读,并行/并发编程思想(持续更新)

本文主要是介绍java8 ConcurrentHashMap源码&原理解读,并行/并发编程思想(持续更新),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言

之前研读过HashMap源码,实现过一遍HashMap源码。但是HashMap是线程不安全的,而ConcurrentHashMap是线程安全的,博主我觉得自己多线程方面差了点,遂决定去研究一番ConcurrentHashMap。这一看,emmm,妙呀,有时一个方法起码得研究好几天,打开一篇知识的海洋。笔者去年终于看完整个put方法的处理,〒▽〒,实现到扩容那里了。后面由于换工作,面试,这事就耽搁下了,如今我重新捡起来,主要是我刚写完一篇HashMap的源码解读。嗯,我最近也在并行了解分布式事务。换着来,比较不那么容易腻一些。

概述

ConcurrentHashMap是concurrent包下的线程安全的Map集合。HashMap是线程不安全,HashTable是线程安全,但是它是对整个hash表加锁,锁的范围太大。

java7时的ConcurrentHashMap

结构

java8之前ConcurrentHashMap结构

 由HashTable可知,在多核多CPU的情况下,多线程可以并行,但是当遇到了HashTable时就变得不一样了。由于整个HashTable加了锁,每次只有一个线程能拿到锁执行,其他线程等待。本来每次可以同时干多件事,变成每次只能干一件事,效率就低了。而1.7的ConcurrentHashMap将锁的范围变小,每次不锁整个表,可锁的范围分成多块,代表可以有多个线程同时干活。

 而这时的ConcurrentHashMap就不一样了。 当put一个元素时
 1.通过对key的hash计算得到一个下标,是Segment[]的下标
 2.线程申请该segment的锁,拿到segment[i]锁
  2.1.如果获取不到锁,循环一定次数去尝试拿锁,超过一定次数没获取到锁后,线程进入    阻塞状态等待。
 3.开始对segment[i]的hashEntry进行put操作。
  3.1Segment继承自ReentrantLock,独占的可重入式锁。所以拿到Segment后的操作不需要考虑并发,因为Segment是独占锁。

 由此可知,Segment[]代表着并发度,16个Segment代表可以同时有16线程并行,而Segment是不会扩容的,只有Segment[i]下的HashEntry会扩容。当并发量越来越来大,即使有个16个并发度,对于数量大的线程数面前是不够用的,供小于求,照样会有大量的线程进入阻塞状态。

 所以就有1.8的优化,直接抛弃segment这种方式。

java8的ConcurrentHashMap

如图,1.8时ConcurrentHashMap的结构与HashMap一样,就是一个单纯的链表数组,计算hash、寻址、扩容时链表拆分等基本计算方法和HashMap的方式是一样的。没有Segment[],所以它的并发度就是数组的长度。既然如此,没了Segement可重入锁,如何保证线程安全,put和扩容的线程安全如何保障?解决方法在于对CAS的使用,和对线程的控制。

前置知识点

在开始解析concurrentHashMap前,需要知道的一些知识点

CAS

 CAS即比较并交换,首先线程从内存中读取x的值a,而后再将a值于当前内存中x的值,设此时内存x的值为b;比较a和b,若期待值a与b相同,则修改内存中x的值为c,不同则不修改。这是一种乐观锁策略,不加锁。

在多线程的情况下,CAS会产生ABA的问题:
  内存中有X值为1,A线程拿到X的值为1,这时B线程抢占cpu时间片,将内存中的X先修改为2, 后又修改为1;这时线程A再回来查看,发现X的值还是1,遂修改为3。
 ABA的问题好像不是什么大问题,但在程序中,值一样不代表没有变化,比如内存地址可能变了。

· 解决这种有两个方法:
 ①使得比较修改这部分操作具有原子性,比如再秒杀场景中,使用lua脚本操作redis修改数据,  因为redis只支持乐观锁,lua程序执行是原子性的。
 ②增加版本号,每修改一次,版本号加1,每次比较根据版本号判断。

线程的内存模型

java的线程内存模型如下图:↓

 首先java内存模型规定所有变量都存在主存中,每个线程都有自己的工作内存,所有操作都在工作内存进行,不能直接操作主存,也不能操作其他线程的工作内存。

 线程修改一个变量,先修改工作内存中的值,再写入主存中(什么时候写入主存未知)。
 那么对于多个线程的共享的变量来说,假如线程a,线程b共同操作变量y,线程a修改了y变量值为2 ,还没有写入主存中,那线程b首先能直接操作的是自己工作内存,这时自己的工作内存中y的值不是 2,那么如何使得线程b知道变量y的值已变成2了呢?
· 对于这个问题,java提供一个关键字volatile,使得共享变量可见。具体步骤如下:
  当一个变量x被volatile关键字修饰时,当线程1在工作内存修改了x的值时,会强制将x的值写入主存中;
  其次由于线程1的修改操作,会导致线程2工作内存中x的缓存无效,所以当线程1要读取x时,会到主存中读取.

要注意的是,volatile关键字能保证操作的可见性,但它没法保证操作的原子性:
  · 比如,有一共享变量y值为1.
  · 线程1首先获取了共享变量y的值 1。
  · 而这时线程2抢占cpu时间片获取共享变量y,并修改将y的值加1等于2,随后刷新主存中y的 值;
  · 那么这时线程1将之前获取到的y的值加1,变成2,刷新主存y值变成2;
  · 按理来说,y是共享变量,两条线程都对其进行加1操作,主存中的值应该是3才对.但是由于线程对共享变量y的加1操作不具有原子性,导致最后结果的误差.仔细想想,这个事好像事务的隔离级别的那种.

线程的并发和并行

并发
并发是指,多个线程在一个cpu上交替执行,在整体上看来像是多个线程在同时做事,其实只是它们之间切换的快。

所以在单核单cpu的情况下,多线程可能还不如单线程,多线程还涉及到线程状态的切换带来的资源消耗,单也的确提高了cpu的利用率。

并行
并行是指,在同一时刻同时干两件事。在多核多cpu的情况下,多线程可能分布在不同的逻辑处理器中,它们不需要竞争cpu的资源。

一般开启多线程,这个线程数也是有讲究的,比如我的电脑是1个cpu,4内核,8个逻辑处理器,一个逻辑处理器一个线程,说明1个内核可以支持2个超线程。所以是8线程比较合适。

源码&原理解析

在解析put方法之前必须要知道的核心操作方法和核心全局变量

依赖的类

Unsafe类

Unsafe在java.util.conncurrent包下的类中占距很重要的位置,里面大多是CAS操作。Unsafe类使得java拥有像C语言指针一样操作内存空间的能力。
Unsafe可以直接操作内存,速度会更快,在并发的条件下能提供更好的效率。但也意味着不安全,不受jvm管理,无法被GC,需要自己释放,一个不小心会内存泄漏。
Unsafe类中大多方法被native修饰,意思是这些方法调用的都是c语言实现接口。

/**
* 比较地址相较于对象o地址偏移l 处的值是否等于o1,是则修改其值为o2,并返回true。
* 否,则返回false
* params:
* o-比较交换的对象实例
* l-偏移量,在o地址的基础上的偏移量
* o1-期望值,即期望主存中o的值
* o2-修改值
*/
public final native boolean compareAndSwapObject(java.lang.Object o, long l, java.lang.Object o1, java.lang.Object o2);
/**
* 这个方法与上一个方法的区别是,要比较修改的是一个int类型
*/
    public final native boolean compareAndSwapInt(java.lang.Object o, long l, int i, int i1);
/**
* 这个方法与上一个方法的区别是,要比较修改的是一个long类型
*/
    public final native boolean compareAndSwapLong(java.lang.Object o, long l, long l1, long l2);
/**
* 获取主存中o对象中偏移l的值
*/
    public native java.lang.Object getObjectVolatile(java.lang.Object o, long l);
/**
* 对象o中偏移l处的值修改为o1,并对其他线程立马可见
*/
    public native void putObjectVolatile(java.lang.Object o, long l, java.lang.Object o1);
    /**
    * 获取数组的第一个元素的地址,即数组首地址,知道c语言指针的应该秒懂。
    */
    public native int arrayBaseOffset(java.lang.Class<?> aClass);
    /**
    * 获取数组元素所占字节个数,比如int[]数组,int类型占4个字节,返回4
    */
    public native int arrayIndexScale(java.lang.Class<?> aClass);
复制代码

concurrentHashMap中对Unsafe的使用

关于ABASE,ASHIFT是用来获得数组元素的偏移量,根据偏移量访问并操作数组元素。
ABASE是数组首地址,ASHIFT表示数组元素占1<< ASHIFT个字节数;
举个例子:假设有个long[]数组,首地址ABASE=16,long占8个字节,那么ASHIFT=3,下标为i=3的元素的地址是 16+(3-0)×8=16+3×8=40=ABSE+(i-0)×(1<< ASHIFT)=ABASE+i<< ASHIFT

ps: 在sun公司的源码中,只有计算机的二进制运算,比如&、|、^、<<、>>>,因为相较于×、÷,二进制运算快得多。所以ASHIFT的存在只是为了方便二进制运算

// Unsafe mechanics
    private static final sun.misc.Unsafe U;
    //sizeCtl的偏移量
    private static final long SIZECTL;
    //transferIndex的偏移量
    private static final long TRANSFERINDEX;
    //baseCount的偏移量
    private static final long BASECOUNT;
    //cellsBusy的偏移量
    private static final long CELLSBUSY;
    //cellValue的偏移量
    private static final long CELLVALUE;
    private static final long ABASE;
    private static final int ASHIFT;

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }
复制代码

比较重要的共享变量

sizeCtl变量

这个变量简直贯穿ConcurrentHashMap扩容啊,我当时为了搞清它,都卡了好久。

sizeCtl 这个变量是个多线程共享变量,-1的时候代表有线程在初始化数组,小于-1代表有
(-sizeCtl-1)条线程在协调扩容,大于0时记录的是触发扩容的阈值。

/**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;
复制代码

table[]变量

当前使用的数组对象

/**
     * The array of bins. Lazily initialized upon first insertion.
     * Size is always a power of two. Accessed directly by iterators.
     */
    transient volatile Node<K,V>[] table;
复制代码

nextTab[]变量

扩容时指向的新数组对象

/**
     * The next table to use; non-null only while resizing.
     */
    private transient volatile Node<K,V>[] nextTable;
复制代码

baseCount 变量

在put成功后通过CAS(Unsafe.compareAndSwapLong()方法)自增,如果CAS失败,说明有其他线程竞争,那么直接使用LongAdder的方法来计数,这涉及到另一个变量counterCells

/**
     * Base counter value, used mainly when there is no contention,
     * but also as a fallback during table initialization
     * races. Updated via CAS.
     */
    private transient volatile long baseCount;
复制代码

counterCells 变量

计数器数组,思想是longAdder原理。详细下面addCount()方法解析时会详细讲。
总之,concurrentHashMap的size = baseCount + (counterCells所有元素值累加值)

/**
     * Table of counter cells. When non-null, size is a power of 2.
     */
    private transient volatile CounterCell[] counterCells;
复制代码

transferIndex

在扩容时,需要处理的链表(链表数组嘛)所在的下标范围最大值+1,或者说要处理的链表的个数。比如,旧数组长度是16,扩容时,nextTab.length=32,那么要处理节点的总范围是第一个到第32个,即transferIndex=32.

/**
     * The next table index (plus one) to split while resizing.
     */
    private transient volatile int transferIndex;
复制代码

构造方法

这部分和HashMap一致,我就贴贴源码了,具体看我解析java8 HashMap源码的文章😂

· 无参构造方法

/**
     * Creates a new, empty map with the default initial table size (16).
     */
    public ConcurrentHashMap() {
    }
复制代码

· 有参构造方法1

与HashMap不同的一点是,将初始数组长度赋给sizeCtl

/**
     * Creates a new, empty map with an initial table size
     * accommodating the specified number of elements without the need
     * to dynamically resize.
     *
     * @param initialCapacity The implementation performs internal
     * sizing to accommodate this many elements.
     * @throws IllegalArgumentException if the initial capacity of
     * elements is negative
     */
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
复制代码

· 有参构造方法2

/**
     * Creates a new map with the same mappings as the given map.
     *
     * @param m the map
     */
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
复制代码

· tableSizeFor

与HashMap一样的原理,详见解析java8 HashMap源码的文章

/**
     * Returns a power of two table size for the given desired capacity.
     * See Hackers Delight, sec 3.2
     */
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
复制代码

put方法

重磅级,真的很复杂,光这个一个方法和调用的方法我都整了好久。

 

put(key,value)

/**
     * Maps the specified key to the specified value in this table.
     * Neither the key nor the value can be null.
     *
     * <p>The value can be retrieved by calling the {@code get} method
     * with a key that is equal to the original key.
     *
     * @param key key with which the specified value is to be associated
     * @param value value to be associated with the specified key
     * @return the previous value associated with {@code key}, or
     *         {@code null} if there was no mapping for {@code key}
     * @throws NullPointerException if the specified key or value is null
     */
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
复制代码

initTable()

用到的全局共享变量sizeCtl。
功能:初始化数组,和sizeCtl变量为触发扩容阈值

/**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //CAS操作标志,首先加上while循环,当数组尚未被初始化时,会一直尝试去初始化数组
        while ((tab = table) == null || tab.length == 0) {
            /**sizeCtl<0,两种可能,1)-1:初始化中;2)<-1:扩容中
            * 由上一步while循环的判断条件可知,显然是有其他线程在初始化中。
            * 所以当前线程让步,等待另外一条线程执行初始化完成。
            */
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            /**比较主存中sizeCtl是否还是上一步获取的值,是则说明当前没有其他线程正在初  
            * 始化中,修改其值为-1; 
            * 否,则说明已有其他线程在初始化中,继续循环
            */
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //再次检查是否有其他线程初始化完成
                    if ((tab = table) == null || tab.length == 0) {
                        /**在未初始化之前,sizeCtl>0还有种情况,即调用了有参构造方法1时  
                        * 初始化为ConcurrentHashMap的初始大小
                        */
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    /**
                    * 若try代码块执行成功,是修改共享变量值为触发扩容阈值;
                    * 若try代码块执行失败,是将sizeCtl的值还原
                    **/
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
复制代码

caseTabAt(tab,i,c,v)

CAS修改tab[i]值

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
复制代码

putVal(key,value,onlyIfAbsent)

这个方法里面的spread(key.hashCode()) 方法和HashMap方式一毛一样,就不介绍了

/** Implementation for put and putIfAbsent 
*  onlyIfAbsent false-代表如果key键存在,并且已有相应的vlaue值,覆盖旧值
*               true-代表不覆盖旧值
*/
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        //
        int binCount = 0;
        //遍历数组
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //若数组尚未初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //hash寻址到的i位置为null
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
复制代码
这篇关于java8 ConcurrentHashMap源码&原理解读,并行/并发编程思想(持续更新)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!