C/C++教程

ConcurrentHashMap你到底了解多少?

本文主要是介绍ConcurrentHashMap你到底了解多少?,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!


简介
ConcurrentHashMap是一个经常被使用的数据结构,它在线程安全的基础上提供了更好的写并发能力。ConcurrentHashMap跟Map有很大的不同,内部大量使用volatile和CAS等减少锁竞争,当然代码也比HashMap难理解的多,本章基于JDK1.8对ConcurrentHashMap做基本介绍。

ConcurrentHashMap 类

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable

继承AbstractMap抽象类,实现ConcurrentMap接口

ConcurrentMap 接口

public interface ConcurrentMap<K, V> extends Map<K, V>

继承Map接口

ConcurrentMap 方法

// 为空返回默认值
@Override
default V getOrDefault(Object key, V defaultValue) {
    V v;
    return ((v = get(key)) != null) ? v : defaultValue;
}
// 如果存在键不覆盖(put 键存在新值覆盖原值)
V putIfAbsent(K key, V value);
// 删除
boolean remove(Object key, Object value);
// 替换,oldValue一样才替换
boolean replace(K key, V oldValue, V newValue);
// 替换
V replace(K key, V value);

这里忽略使用Function的方法

重要内部类 Node

static class Node<K,V> implements Map.Entry<K,V>

实现Map.Entry

Node 属性​

// 键的hash值
final int hash;
// 键
final K key;
// 值
volatile V val;
// 下一个节点
volatile Node<K,V> next;

Node 构造函数

Node(int hash, K key, V val, Node<K,V> next) {
    this.hash = hash;
    this.key = key;
    this.val = val;
    this.next = next;
}

Node 方法

// 获取键        
public final K getKey()       { return key; }
// 获取值
public final V getValue()     { return val; }
// 获取节点hashCode(键值异或)
public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
// toString方法
public final String toString(){ return key + "=" + val; }
// 直接对节点设置值抛异常
public final V setValue(V value) {
    throw new UnsupportedOperationException();
}
// 节点equals
public final boolean equals(Object o) {
    Object k, v, u; Map.Entry<?,?> e;
    // 必须是Map.Entry实例,key不能为空值不能为空,键值一样
    return ((o instanceof Map.Entry) &&
            (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
            (v = e.getValue()) != null &&
            (k == key || k.equals(key)) &&
            (v == (u = val) || v.equals(u)));
}
// 搜索键
Node<K,V> find(int h, Object k) {
    Node<K,V> e = this;
    if (k != null) {
        do {
            K ek;
            // 当前节点与h一样,key必须一样
            if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // 节点下移,继续搜索
        } while ((e = e.next) != null);
    }
    return null;
}

重要内部类 TreeNode

static final class TreeNode<K,V> extends Node<K,V>

TreeNode 继承Node

TreeNode 属性

// 父级节点
TreeNode<K,V> parent;  // red-black tree links
// 左节点
TreeNode<K,V> left;
// 右节点
TreeNode<K,V> right;
// 前一个节点(跟Node中next组成双向链表)
TreeNode<K,V> prev;
// 颜色
boolean red;

TreeNode 构造函数

TreeNode(int hash, K key, V val, Node<K,V> next,
         TreeNode<K,V> parent) {
    // 初始化父类属性
    super(hash, key, val, next);
    // 初始化上级节点
    this.parent = parent;
}

TreeNode 方法

Node<K,V> find(int h, Object k) {
    return findTreeNode(h, k, null);
}
// 查找元素
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
    if (k != null) {
        // 当前节点
        TreeNode<K,V> p = this;
        do  {
            // 取左右节点
            int ph, dir; K pk; TreeNode<K,V> q;
            TreeNode<K,V> pl = p.left, pr = p.right;
            // h 小于当前节点hash走左边
            if ((ph = p.hash) > h)
                p = pl;
            // h 大于当前节点hash走右边
            else if (ph < h)
                p = pr;
            // hash一样,key一样
            else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                return p;
            // hash一样,key不一样,左节点为空
            else if (pl == null)
                p = pr;
            // ash一样,key不一样,右节点为空
            else if (pr == null)
                p = pl;
            // hash一样,key不一样,左右不为空
            else if ((kc != null ||
                    (kc = comparableClassFor(k)) != null) &&
                    (dir = compareComparables(kc, k, pk)) != 0)
                // 通过比较器比较结果,小于0走左边,大于0走右边
                p = (dir < 0) ? pl : pr;
            // 比较器为空先从右边递归
            else if ((q = pr.findTreeNode(h, k, kc)) != null)
                return q;
            // 右边也没找到,下次从左边找
            else
                p = pl;
        } while (p != null);
    }
    return null;
}

重要内部类 TreeBin

static final class TreeBin<K,V> extends Node<K,V>

TreeBin 也继承Node

TreeBin 属性

// 根节点
TreeNode<K,V> root;
// 头节点
volatile TreeNode<K,V> first;
// 等待
volatile Thread waiter;
// 锁定状态
volatile int lockState;
static final int WRITER = 1;
static final int WAITER = 2;
static final int READER = 4;
// 内存操作不安全类
private static final sun.misc.Unsafe U;
// lockState 偏移量
private static final long LOCKSTATE;

TreeBin 加载初始化

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = TreeBin.class;
        LOCKSTATE = U.objectFieldOffset
                (k.getDeclaredField("lockState"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TreeBin 构造函数

TreeBin(TreeNode<K,V> b) {
    // 初始化Node属性
    super(TREEBIN, null, null, null);
    // 设置当前元素b为头节点
    this.first = b;
    TreeNode<K,V> r = null;
    // 遍历b所有next
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        // 获取next
        next = (TreeNode<K,V>)x.next;
        // 清空左右节点
        x.left = x.right = null;
        // 初始化r,设置为根节点
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        } else {
            // 获取键和hash
            K k = x.key;
            int h = x.hash;
            // 比较器
            Class<?> kc = null;
            // 遍历r
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                // 确定走左边还是右边
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                        (kc = comparableClassFor(k)) == null) ||
                        (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                TreeNode<K,V> xp = p;
                // 确定的一边为空,就把x加上去
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    // 赋值根节点
    this.root = r;
    // 在运行时,如果关闭了assertion功能,这些语句将不起任何作用。
    // 如果打开了assertion功能,那么将执行checkInvariants,
    // 如果它的值为false,该语句强抛出一个AssertionError对象。
    assert checkInvariants(root);
}

比较大小方法

static int tieBreakOrder(Object a, Object b) {
    int d;
    if (a == null || b == null ||
            (d = a.getClass().getName().
                    compareTo(b.getClass().getName())) == 0)
        d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
                -1 : 1);
    return d;
}

TreeBin 锁定节点

private final void lockRoot() {
    // 使用CAS把lockState从0改为WRITER(1)
    if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
        // 修改失败
        contendedLock();
}
private final void unlockRoot() {
    lockState = 0;
}
private final void contendedLock() {
    // 等待状态
    boolean waiting = false;
    for (int s;;) {
        // lockState为0结果一定是0
        if (((s = lockState) & ~WAITER) == 0) {
            // 使用CAS把lockState从0改为WRITER(1)
            if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                // 等待状态
                if (waiting)
                    waiter = null;
                // 成功返回
                return;
            }
        }
        else if ((s & WAITER) == 0) {// s不为2时结果为0
            // 使用CAS把lockState从s修改为s | WAITER(2)
            if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                // 设置是否需要等待为true
                waiting = true;
                // 等待线程
                waiter = Thread.currentThread();
            }
        }
        else if (waiting)
            // 阻塞当前线程
            LockSupport.park(this);
    }
}

TreeBin 查找

final Node<K,V> find(int h, Object k) {
    // 值不能为空
    if (k != null) {
        // 从头节点开始遍历
        for (Node<K,V> e = first; e != null; ) {
            int s; K ek;
            // 判断是否是锁定状态(lockState为0一定不能进入)
            if (((s = lockState) & (WAITER|WRITER)) != 0) {
                // 当前节点hash是否一样,key是否一样
                if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                // 节点下移
                e = e.next;
            }
            // 更新lockState
            else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                    s + READER)) {
                TreeNode<K,V> r, p;
                try {
                    // 头不为空,从树中查找
                    p = ((r = root) == null ? null :
                            r.findTreeNode(h, k, null));
                } finally {
                    Thread w;
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                            (READER|WAITER) && (w = waiter) != null)
                        // 唤醒阻塞的线程
                        LockSupport.unpark(w);
                }
                return p;
            }
        }
    }
    return null;
}

TreeBin 添加元素

final TreeNode<K,V> putTreeVal(int h, K k, V v) {
    Class<?> kc = null;
    // 是否搜索过
    boolean searched = false;
    // 从跟节点开始遍历
    for (TreeNode<K,V> p = root;;) {
        int dir, ph; K pk;
        // 根节点为空,构建新的根节点
        if (p == null) {
            first = root = new TreeNode<K,V>(h, k, v, null, null);
            break;
        }
        // 确定左边还是右边
        else if ((ph = p.hash) > h)
            dir = -1;
        else if (ph < h)
            dir = 1;
        else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
            return p;
        else if ((kc == null &&
                (kc = comparableClassFor(k)) == null) ||
                (dir = compareComparables(kc, k, pk)) == 0) {
            if (!searched) {
                TreeNode<K,V> q, ch;
                // hash一样,比较器失效,开始搜索
                searched = true;
                if (((ch = p.left) != null &&
                        (q = ch.findTreeNode(h, k, kc)) != null) ||
                        ((ch = p.right) != null &&
                                (q = ch.findTreeNode(h, k, kc)) != null))
                    return q;
            }
            dir = tieBreakOrder(k, pk);
        }
        // 下级有空节点开始追加
        TreeNode<K,V> xp = p;
        if ((p = (dir <= 0) ? p.left : p.right) == null) {
            // 获取一把头节点
            TreeNode<K,V> x, f = first;
            // 更新头节点
            first = x = new TreeNode<K,V>(h, k, v, f, xp);
            // 头节点不为空,原头节点上级设置为新节点
            if (f != null)
                f.prev = x;
            // 树结构中插入
            if (dir <= 0)
                xp.left = x;
            else
                xp.right = x;
            // 当前xp为叶子节点,叶子节点是红色需要重新平衡
            if (!xp.red)
                x.red = true;
            else {
                // 重新平衡需要锁定
                lockRoot();
                try {
                    // 插入平衡
                    root = balanceInsertion(root, x);
                } finally {
                    // 解除锁定
                    unlockRoot();
                }
            }
            break;
        }
    }
    assert checkInvariants(root);
    return null;
}

TreeBin 删除

final boolean removeTreeNode(TreeNode<K,V> p) {
    // 获取上下节点
    TreeNode<K,V> next = (TreeNode<K,V>)p.next;
    TreeNode<K,V> pred = p.prev;  // unlink traversal pointers
    TreeNode<K,V> r, rl;
    // 上级节点为空,设置next为头节点
    if (pred == null)
        first = next;
    else
        // 上级的下级指向当前下级
        pred.next = next;
    // 下级为空下级的上级指向当前上级
    if (next != null)
        next.prev = pred;
    // 头节点为空
    if (first == null) {
        // 设置root为空
        root = null;
        return true;
    }
    // 在树中删除
    if ((r = root) == null || r.right == null || // too small
            (rl = r.left) == null || rl.left == null)
        // 不成树,直接返回(只有一个元素)
        return true;
    // 锁定头节点
    lockRoot();
    try {
        TreeNode<K,V> replacement;
        TreeNode<K,V> pl = p.left;
        TreeNode<K,V> pr = p.right;
        // 红黑树删除节点参照HashMap
        if (pl != null && pr != null) {
            TreeNode<K,V> s = pr, sl;
            while ((sl = s.left) != null) // find successor
                s = sl;
            boolean c = s.red; s.red = p.red; p.red = c; // swap colors
            TreeNode<K,V> sr = s.right;
            TreeNode<K,V> pp = p.parent;
            if (s == pr) { // p was s's direct parent
                p.parent = s;
                s.right = p;
            }
            else {
                TreeNode<K,V> sp = s.parent;
                if ((p.parent = sp) != null) {
                    if (s == sp.left)
                        sp.left = p;
                    else
                        sp.right = p;
                }
                if ((s.right = pr) != null)
                    pr.parent = s;
            }
            p.left = null;
            if ((p.right = sr) != null)
                sr.parent = p;
            if ((s.left = pl) != null)
                pl.parent = s;
            if ((s.parent = pp) == null)
                r = s;
            else if (p == pp.left)
                pp.left = s;
            else
                pp.right = s;
            if (sr != null)
                replacement = sr;
            else
                replacement = p;
        }
        else if (pl != null) // 右边为空
            replacement = pl;
        else if (pr != null) // 左边为空
            replacement = pr;
        else
            replacement = p;
        if (replacement != p) {
            TreeNode<K,V> pp = replacement.parent = p.parent;
            if (pp == null)
                r = replacement;
            else if (p == pp.left)
                pp.left = replacement;
            else
                pp.right = replacement;
            p.left = p.right = p.parent = null;
        }
        // 删除并平衡
        root = (p.red) ? r : balanceDeletion(r, replacement);

        if (p == replacement) {  // detach pointers
            TreeNode<K,V> pp;
            if ((pp = p.parent) != null) {
                if (p == pp.left)
                    pp.left = null;
                else if (p == pp.right)
                    pp.right = null;
                p.parent = null;
            }
        }
    } finally {
        // 解锁
        unlockRoot();
    }
    assert checkInvariants(root);
    return false;
}

左旋右旋

static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root,
                                      TreeNode<K,V> p) {
    。。。
}
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root,
                                       TreeNode<K,V> p) {
    。。。
}

插入删除平衡

static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
                                            TreeNode<K,V> x) {
    。。。
}
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root,
                                           TreeNode<K,V> x) {
    。。。
}

重要内部类ForwardingNode

static final class ForwardingNode<K,V> extends Node<K,V>

ForwardingNode 在扩容移动时会使用

ForwardingNode 属性

// 扩容后数组
final Node<K,V>[] nextTable;

**ForwardingNode 构造函数**
ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);
    this.nextTable = tab;
}

ForwardingNode 方法

Node<K,V> find(int h, Object k) {
    // 自旋
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        // 新数组为空或者头节点为空,返回
        if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        // 自旋
        for (;;) {
            int eh; K ek;
            // 头节点一样
            if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // hash 小于 0(说明是treeBin)
            if (eh < 0) {
                // 头节点是ForwardingNode实例
                if (e instanceof ForwardingNode) {
                    // 获取扩容后新数组
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    // 继续查找
                    return e.find(h, k);
            }
            // 达到尾节点,返回
            if ((e = e.next) == null)
                return null;
        }
    }
}

ConcurrentHashMap 属性

// 数组最大长度
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 初始化默认长度
private static final int DEFAULT_CAPACITY = 16;
// 元素个数最大值(toArray中使用)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 默认加载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转树阈值
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 树转链表,table长度阈值
static final int MIN_TREEIFY_CAPACITY = 64;
// 扩容迁移元素时,每个线程处理16个槽
private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成每次扩容都唯一的生成戳的数
private static int RESIZE_STAMP_BITS = 16;
// 最大的扩容线程的数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,
// 相反方向移位后能够反解出生成戳
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 表示正在转移
static final int MOVED     = -1;
// 表示已经转换成树
static final int TREEBIN   = -2;
// ReservationNode 初始化hash值
static final int RESERVED  = -3;
// int最大值
static final int HASH_BITS = 0x7fffffff;
// 获得可用的处理器个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final ObjectStreamField[] serialPersistentFields = {
        new ObjectStreamField("segments", Segment[].class),
        new ObjectStreamField("segmentMask", Integer.TYPE),
        new ObjectStreamField("segmentShift", Integer.TYPE)
};
// 元素数组
transient volatile Node<K,V>[] table;
// 扩容后的新的table数组
private transient volatile Node<K,V>[] nextTable;
// 元素个数计数器值
private transient volatile long baseCount;
// 扩容阈值
private transient volatile int sizeCtl;
// 下一个transfer任务的起始下标index
private transient volatile int transferIndex;
// counterCells 扩容标志
private transient volatile int cellsBusy;
// 并发时
private transient volatile CounterCell[] counterCells;
// 键集合
private transient KeySetView<K,V> keySet;
// 值集合
private transient ValuesView<K,V> values;
// 键值实体集合
private transient EntrySetView<K,V> entrySet;
// 内存操作不安全类
private static final sun.misc.Unsafe U;
// 扩容阈值偏移量
private static final long SIZECTL;
// transfer任务任务下标偏移量
private static final long TRANSFERINDEX;
// 元素个数偏移量
private static final long BASECOUNT;
// cellsBusy偏移量
private static final long CELLSBUSY;
// counterCells 偏移量
private static final long CELLVALUE;
// table偏移量
private static final long ABASE;
// table数组元素偏移量
private static final int ASHIFT;

ConcurrentHashMap 加载初始化

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

ConcurrentHashMap 构造函数

public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    // 计算扩容阈值
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
            MAXIMUM_CAPACITY :
            tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {

    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // concurrencyLevel 表示估计的参与并发更新的
    // 线程数量,必须比初始化容量的要大
    if (initialCapacity < concurrencyLevel)
        initialCapacity = concurrencyLevel;
    // 扩容阈值
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

####ConcurrentHashMap 的hash算法

static final int spread(int h) {
    // 混合高低位,并控制范围
    return (h ^ (h >>> 16)) & HASH_BITS;
}

在HashMap中hash算法是return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); 而spread方法其实就是hash算法,最后和int最大值取位与只是为了控制结果在int范围内。

ConcurrentHashMap 数组长度计算

private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

此方法请看HashMap篇,里面有详细推导

ConcurrentHashMap 操作数组头节点

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    // ((long)i << ASHIFT) + ABASE 元素位置
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    // 修改槽中元素从c改成v
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    // v放到对应槽中
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

ConcurrentHashMap 添加

public V put(K key, V value) {
    // 添加
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 键或值为空抛异常
    if (key == null || value == null) throw new NullPointerException();
    // 获取hash
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 遍历table
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // tab为空(第一次添加)
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
            // 查找hash所在的槽是否为空
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 创建新节点并放入对应槽中(失败重新来过)
            if (casTabAt(tab, i, null,
                    new Node<K,V>(hash, key, value, null)))
                break;
        }
        // 正在扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 原值
            V oldVal = null;
            // 锁定头节点
            synchronized (f) {
                // 头节点没有改变过
                if (tabAt(tab, i) == f) {
                    // 头节点hash 大于 0
                    if (fh >= 0) {
                        // 链表添加初始为1
                        binCount = 1;
                        // 从头节点开始遍历(每次binCount加1)
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 键是否一样
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                // 获取原值
                                oldVal = e.val;
                                // 不覆盖开关是否打开,false就覆盖
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 键不一样e下移
                            Node<K,V> pred = e;
                            // 后面节点为空
                            if ((e = e.next) == null) {
                                // 追加元素
                                pred.next = new Node<K,V>(hash, key,
                                        value, null);
                                break;
                            }
                        }
                    }
                    // hash 小于 0 说明是TreeBin(hash为-2)
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        // 树添加设置为2
                        binCount = 2;
                        // 调用树结构添加
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            // 原值
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                // 覆盖原值
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // binCount 大于8,链表转树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 原值不为空,要么覆盖要么忽略
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 能到这儿一定添加成功(需不需要扩容)
    addCount(1L, binCount);
    return null;
}

如果只看putVal方法ConcurrentHashMap 跟HashMap差不多,就把头元素锁了,但是如果你真这么认为,只能说明没有看第三篇。

ConcurrentHashMap 初始化数组

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // table 不为空退出循环
    while ((tab = table) == null || tab.length == 0) {
        // 判断扩容阈值是否小于0
        if ((sc = sizeCtl) < 0)
            // 让出cpu
            Thread.yield();
            // 修改扩容阈值为-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 判处table是否还没被初始化
                if ((tab = table) == null || tab.length == 0) {
                    // sc > 0 使用sc,否则使用默认初始长度
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    // 创建数组
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // 赋值
                    table = tab = nt;
                    // 计算下次扩容阈值
                    // n - n/4 等价于 n*0.75
                    sc = n - (n >>> 2);
                }
            } finally {
                // 设置新的扩容阈值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

ConcurrentHashMap 链表转树

// binCount(链表长度)大于8会调用这个方法
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    // 数组不能为空
    if (tab != null) {
        // 判断是在是否小于64
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 扩容第三篇会讲
            tryPresize(n << 1);
        // 大于64,获取头节点,判断是否已经是TreeBin
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            // 锁住头节点
            synchronized (b) {
                // 再取一次头节点判断跟b是否一样
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        // 构建树节点
                        TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                        null, null);
                        // 构建树节点的链表结构,
                        // 初始化TreeBin时才转树结构
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 构建TreeBin,并把它放入当前槽
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

ConcurrentHashMap 树转链表

static <K,V> Node<K,V> untreeify(Node<K,V> b) {
    Node<K,V> hd = null, tl = null;
    // 遍历节点
    for (Node<K,V> q = b; q != null; q = q.next) {
        // 构建普通节点
        Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
        if (tl == null)
            hd = p;
        else
            tl.next = p;
        tl = p;
    }
    return hd;
}

ConcurrentHashMap 键取值

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算hash
    int h = spread(key.hashCode());
    // 判断数组是否为空,然后判断槽中头节点是否为空
    // (n - 1) & h 就是取余,请看HashMap篇推导过程
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        // 头节点hash是否一样
        if ((eh = e.hash) == h) {
            // hash一样看key是否一样
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                // key一样返回节点中的值
                return e.val;
        }
        // hash不一样,判断hash是否小于0
        // TreeBin 的hash为-2
        else if (eh < 0)
            // 在红黑树中查找
            return (p = e.find(h, key)) != null ? p.val : null;
        // 能到这说明一定是链表,通过遍历链表查找
        while ((e = e.next) != null) {
            // 判断key是否一样,并节点下移
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

ConcurrentHashMap 删除

public V remove(Object key) {
    return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
    // 计算hash
    int hash = spread(key.hashCode());
    // 自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 判断数组或头节点为空
        if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
            // 直接退出
            break;
        // 判断hash是否为移动状态
        else if ((fh = f.hash) == MOVED)
            // 如果是就帮忙移动,移动完自旋删除
            tab = helpTransfer(tab, f);
        else {
            // 能到这儿,说明不是再移动
            V oldVal = null;
            boolean validated = false;
            // 锁住头节点
            synchronized (f) {
                //  判断头节点是否变动
                if (tabAt(tab, i) == f) {
                    // 链表删除
                    if (fh >= 0) {
                        validated = true;
                        // 遍历链表
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            // 判断key是否一样
                            if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                            (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                // 判断值是否一样
                                if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                    // 原值
                                    oldVal = ev;
                                    // value不为空,使用value覆盖原值,
                                    // 否则移除节点,
                                    // 如果移除的是头节点,需要把设置新头节点
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            // 节点下移
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    // 树结构中删除
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        // 在树结构中查找
                        if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            // 判断值是否一样
                            if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                // value不为空,使用value覆盖原值,
                                // 否则,在树结构中删除
                                // 如果移除的是头节点,需要设置新头节点
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            // 是否查找过
            if (validated) {
                // 原值是否为空
                if (oldVal != null) {
                    // value是否为空,不为空只覆盖不删除
                    if (value == null)
                        // 长度减一
                        addCount(-1L, -1);
                    // 返回原值
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

ConcurrentHashMap 清空

public void clear() {
    long delta = 0L; 
    int i = 0;
    Node<K,V>[] tab = table;
    // 遍历数组所有元素
    while (tab != null && i < tab.length) {
        int fh;
        // 获取头节点
        Node<K,V> f = tabAt(tab, i);
        // 头节点为空,数组下标加1
        if (f == null)
            ++i;
        // 判断头节点状态
        else if ((fh = f.hash) == MOVED) {
            // 帮忙移动
            tab = helpTransfer(tab, f);
            // 移动完成,重置数组下标
            i = 0; 
        }
        // 正常删除
        else {
            // 锁住头节点
            synchronized (f) {
                // 判断头节点是否变化
                if (tabAt(tab, i) == f) {
                    // 获取链表或树的头节点
                    Node<K,V> p = (fh >= 0 ? f :
                            (f instanceof TreeBin) ?
                                    ((TreeBin<K,V>)f).first : null);
                    // 遍历个数
                    while (p != null) {
                        --delta;
                        p = p.next;
                    }
                    // 清空当前槽
                    setTabAt(tab, i++, null);
                }
            }
        }
    }
    // 修改总长度
    if (delta != 0L)
        addCount(delta, -1);
}

ConcurrentHashMap 在修改链表或者树时都会锁住头元素,其他修改操作就拿不到锁,他们就会等待。
ConcurrentHashMap 的addCount

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 先判断counterCells是否为空,为空通过cas修改baseCount
    // 要么修改baseCount失败(s=b + x 一样会执行)
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // 先判断counterCells为空,
        // 然后随机取余数获取counterCells下标a,并取值
        // (counterCells长度必须是2的n次方)
        // 取值为空修改counterCells数组a位置值,修改失败调用fullAddCount
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        // 链表长度小于等于1
        if (check <= 1)
            return;
        // 统计
        s = sumCount();
    }
    // 链表长度大于等于0
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 如果map.size() 大于 sizeCtl扩容阈值 且
        // table 不是空;且 table 的长度小于 1 << 30
        // 这时sc为原扩容阈值
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // 生成一个扩容戳
            int rs = resizeStamp(n);
            // 原扩容阈值小于0(第一次一定不会进去)
            if (sc < 0) {
                // 能到这一定在扩容
                // 这里5个判断,意思是有任何一个满足就不能帮忙搬移元素
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                // 修改阈值加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 帮忙搬移元素
                    transfer(tab, nt);
            }
            // 修改扩容阈值为(rs<<16)+2,一定是负的
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                // 开始迁移元素
                transfer(tab, null);
            // 统计元素个数
            s = sumCount();
        }
    }
}

这里逻辑非常绕,我们要拆开看。元素计数器counterCells,为什么要用元素计数器?
当并发比较高并且所有线程都是修改元素个数,会是什么情况?做到了高效添加删除节点,但是所有线程最后都去竞争改baseCount,性能损耗严重,性能又被拉下来了。counterCells就是用来解决修改baseCount性能损耗问题,没有并发时直接修改baseCount,有并发时一定有修改baseCount修改失败,失败又不能不管,这时失败的就在counterCells数组中随便找个元素记一下,记录方式就是数组中元素原值加1(元素是对象)。在获取元素个数时,需要把baseCount加上counterCells所有元素值加到一起就是总元素个数,这也是ConcurrentHashMap 分片统计原理。

ConcurrentHashMap 统计元素及使用

public int size() {
    // 返回当前元素个数
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        // 遍历所有计数器
        for (int i = 0; i < as.length; ++i) {
            // 元素不为空时参与统计
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
// 统计数为0时Map为空
public boolean isEmpty() {
    return sumCount() <= 0L;
}

ConcurrentHashMap 扩容

// 第一个扩容扩容线程nextTab为null
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // stride是每个线程转移几个槽
    // 将 length/8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    // nextTab 为空(第一个线程调用)
    if (nextTab == null) {
        try {
            // 新数组(长度为原来2倍)
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {
            // 数组越界,超过int最大值
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        // 使用新数组赋值nextTable
        nextTable = nextTab;
        // 更新转移下标,就是 老的 tab 的 length
        transferIndex = n;
    }
    // 新数组长度
    int nextn = nextTab.length;
    // 创建迁移节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 领任务
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 如果 i 小于0
        // 如果 i >= tab.length
        // 如果 i + tab.length >= nextTable.length
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 如果完成了扩容
            if (finishing) {
                // 置空nextTable,赋值新table,设置新扩容阈值
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 能到这说明还在扩容。设置sc-1
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 没有线程在帮助他们扩容了。也就是说,扩容结束了。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 扩容完成
                finishing = advance = true;
                i = n;
            }
        }
        // 原槽为空时置入fwd
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 原槽头节点已经是ForwardingNode对象,忽略
        else if ((fh = f.hash) == MOVED)
            advance = true;
        // 开始迁移
        else {
            // 锁住头节点
            synchronized (f) {
                // 看一下头节点有没有改变
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // hash是否大于0
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        // 遍历链表
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            // 元素分类
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        // 高位位0(不需要移动)
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        // 高位位1(需要移动)
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 组装新链表
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 放入新数组槽中
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 原槽中使用占位符
                        setTabAt(tab, i, fwd);
                        // 成功
                        advance = true;
                    }
                    // 头节点为TreeBin
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        // 遍历树取两个树(lo、hi)
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 长度小于6时,树退链
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 放入新数组槽中
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 原槽中使用占位符
                        setTabAt(tab, i, fwd);
                        // 完成
                        advance = true;
                    }
                }
            }
        }
    }
}

扩容跟HashMap区别还是比较大的,单核时一个线程迁移所有槽,多线程时,每个线程最小迁移16个槽。

ConcurrentHashMap 链转树触发扩容

private final void tryPresize(int size) {
    // size在传入之前就已经*2了,判断size合法性
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 数组为空
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            // 修改扩容阈值
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 初始化table
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 设置新的扩容阈值
                    sizeCtl = sc;
                }
            }
        }
        // 不需要扩容
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // table是否改变
        else if (tab == table) {
            // 下面这一坨跟addcount后半段是一个意思
            int rs = resizeStamp(n);
            // 阈值小于零(被扩容线程修改为负的)
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                // 帮忙迁移元素
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 打上迁移标示
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
这篇关于ConcurrentHashMap你到底了解多少?的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!