- ConcurrentHashMap:线程安全的HashMap
- CopyOnWriteArrayList:线程安全的List
- BlockingQueue:这是一个借口,表示阻塞队列,非常适合用于数据共享的通道
- ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看作是一个线程安全的LinkedList
- ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查找
- Vector和HashTable性能差
Vector是直接在方法上添加synchronize关键字来解决并发。但是在并发量大的情况下,性能较差
public synchronized E get(int index) { if (index >= elementCount) throw new ArrayIndexOutOfBoundsException(index); return elementData(index); } public synchronized E set(int index, E element) { if (index >= elementCount) throw new ArrayIndexOutOfBoundsException(index); E oldValue = elementData(index); elementData[index] = element; return oldValue; }HashTable源码分析,直接在方法上加上synchronize,所以导致性能不好
public synchronized V put(K key, V value) { // Make sure the value is not null if (value == null) { throw new NullPointerException(); } // Makes sure the key is not already in the hashtable. Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") Entry<K,V> entry = (Entry<K,V>)tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } addEntry(hash, key, value, index); return null; } public synchronized V get(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; }
- 利用Collections.synchronizedList(new ArrayList<Integer>())实现线程安全,采用同步代码块来实现线程安全
public E get(int index) { synchronized (mutex) {return list.get(index);} } public E set(int index, E element) { synchronized (mutex) {return list.set(index, element);} }绝大多数并发情况下, ConcurrentHashMap和CopyOnWriteArrayList性能较好
- HashMap:根据hashcode来进行访问,允许一个key为null,对值为null不做限制,是线程非安全的
- HashTable:线程安全的,只能一个线程同时访问,不建议使用,效率低
- LinkedHashMap:保存了插入的顺序
- TreeMap:可以根据键进行排序
- 同时put碰撞导致数据丢失:hashcode值计算一样,就会丢失
- 同时put扩容导致数据丢失:多个线程同时需要扩容,就只会保留一个
- 死循环CPU100%:多个线程同时扩容的时候,链表会死循环,相互指向对方
JDK1.7中
- ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,依然是数组和链表组成的拉链法
- 每个segment独立上ReentrantLock锁,每个segment之期间互不影响,提高了并发效率
- ConcurrentHashMap默认有16个segments,所以最多支持16个线程并发写(操作分布在不同的segment上)。这个设置后,不可以扩容
JDK1.8中
在链表达到了一个阈值,就会转换为红黑树。
升级后的区别:
- 增加了并发性。从原来的16变成了每个node
- Hash碰撞:1.7使用拉链法;1.8先使用拉链法,达到了阈值,就采用红黑树
- 保证并发性安全:1.7采用分段锁,segment继承我们的ReentrantLock。1.8采用cas加上synchronize。
- 查询复杂度:1.7的链表为O(n),1.8的红黑树则是O(logn),1.8提高了效率
- 为什么超过8才采用红黑树:数据量小的情况下,效率差不多;红黑树的占用空间是链表的两倍,所以默认使用链表;通过概率的计算,8的概率很小,所以采用8。
JDK1.8中put()方法源码分析:
- 判断key、value不为空
- 计算hash值
- 根据对应位置的节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加节点
- 检查满足阈值就“红黑树化”
- 返回oldVal
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //如果key或者value是null直接抛出异常 if (key == null || value == null) throw new NullPointerException(); //计算hash值 int hash = spread(key.hashCode()); int binCount = 0; //利用for循环来完成插入工作 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //判断是否没有初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //判断当前计算的hash值的位置是否有值 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //没有值,利用cas操作,如果放入成功,返回true if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //判断是否正在扩容 else if ((fh = f.hash) == MOVED) //帮助进行扩容 tab = helpTransfer(tab, f); else { V oldVal = null; //利用synchronize锁住代码块 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; //进行链表的操作 for (Node<K,V> e = f;; ++binCount) { K ek; //判断当前是否存在这个key if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //不存在则创建一个节点 Node<K,V> pred = e; if ((e = e.next) == null) { //将节点放到链表的最后 pred.next = new Node<K,V>(hash, key, value, null); break; } } } //判断是红黑树 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //走到这里说明完成了添加工作 if (binCount != 0) { //判断是否到了阈值8 if (binCount >= TREEIFY_THRESHOLD) //treeifyBin()方法中,还需要满足容量为64,才会转为红黑树 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }JDK1.8中get()方法源码分析:
- 计算hash值
- 找到对应的位置,根据情况进行:
- 直接取值
- 红黑树里找值
- 便利链表取值
- 返回找到的结果
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算出hash值 int h = spread(key.hashCode()); //判断map是否初始化完毕了,否则直接跳出循环,返回null if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //判断第一个节点是不是 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果值为负数,说明可能是红黑树,或者是一个转移节点 else if (eh < 0) //查找红黑树需要用find()方法查找 return (p = e.find(h, key)) != null ? p.val : null; //说明是一个链表,所以需要用循环来查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
我们用代码来看一个现象
public class OptionsNotSafe implements Runnable{ private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>(); /** * 使用两个线程对小明的score值进行相加2000次 * 理想:结果为2000 */ public static void main(String[] args) throws InterruptedException { scores.put("小明", 0); Thread t1 = new Thread(new OptionsNotSafe()); Thread t2 = new Thread(new OptionsNotSafe()); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(scores.get("小明")); } public void run() { for (int i = 0; i < 1000; i++) { Integer score = scores.get("小明"); int newScore = score + 1; scores.put("小明", newScore); } } }理想状态,这个代码执行完后,结果应该是2000,但是事实是这样吗?
其实不是,这是因为我们的操作是组合操作,并不能保证组合操作的原子性。ConcurrentHashMap的put方法中保证数据不错乱
解决方案1:使用synchronize类锁的方式
public class OptionsNotSafe implements Runnable{ private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>(); /** * 使用两个线程对小明的score值进行相加2000次 * 理想:结果为2000 */ public static void main(String[] args) throws InterruptedException { scores.put("小明", 0); Thread t1 = new Thread(new OptionsNotSafe()); Thread t2 = new Thread(new OptionsNotSafe()); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(scores.get("小明")); } public void run() { for (int i = 0; i < 1000; i++) { synchronized (OptionsNotSafe.class) { Integer score = scores.get("小明"); int newScore = score + 1; scores.put("小明", newScore); } } } }但是这种方案并不好,因为这种方式破坏了ConcurrentHashMap的设计,而且这种方式用HashMap也是可以实现的。就没必要用ConcurrentHashMap了。
解决方案2:推荐使用ConcurrentHashMap提供的replace()
public class OptionsNotSafe implements Runnable{ private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>(); /** * 使用两个线程对小明的score值进行相加2000次 * 理想:结果为2000 */ public static void main(String[] args) throws InterruptedException { scores.put("小明", 0); Thread t1 = new Thread(new OptionsNotSafe()); Thread t2 = new Thread(new OptionsNotSafe()); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(scores.get("小明")); } public void run() { for (int i = 0; i < 1000; i++) { while (true) { Integer score = scores.get("小明"); int newScore = score + 1; //如果map中的值是score,就将值修改为newScore,修改成功返回tue,失败返回false boolean b = scores.replace("小明", score, newScore); if (b) break; } } } }replace()方法作用是,一个key的值是oldVal就将他修改为newVal,并返回true,否则返回false。replace能保证这个组合操作线程安全
7.1、ConcurrentHashMap提供的组合操作
putIfAbsent(K key, V value):判断这个key是否存在,存在就直接返回key的vlaue值,没有就将key,value存入
8.1、CopyOnWriteArrayList简介
代理Vector和SynchronizedList,就和ConcurrentHashMap代理synchronize的原因一样
Vector和SynchronizedList的锁的粒度太大,并发效率相比较低,并且迭代时无法编辑
8.2、CopyOnWriteArrayList使用场景
读操作尽可能地快,而写即使慢一些也没有太大的关系
读多写少:黑名单,每日更新;监听器,迭代操作远多于修改操作
8.3、CopyOnWriteArrayList读写规则
回顾读写锁:读读共享,其他都互斥(写写互斥、读写互斥、写读互斥)
读写锁规则升级:读取是完全不用加锁的,并且更厉害的是,写入写不会阻塞读取操作。只有写入和写入之间需要进行同步等待
- 演示ArrayList在迭代中进行修改报错
public class CopyOnWriteArrayListDemo1 { public static void main(String[] args) { ArrayList<String > list = new ArrayList<String>(); for (int i = 0; i < 5; i++) { list.add((i+1) + ""); } Iterator<String> iterator = list.iterator(); while (iterator.hasNext()){ System.out.println("list is " + list); String next = iterator.next(); System.out.println(next); if (next.equals("2")){ list.remove("5"); } if (next.equals("3")){ list.add("3 found"); } } } }抛出异常的源码分析:集合没进行一次修改操作(add,remove)都会对modCount进行加一
public E next() { //判断集合的修改次数 checkForComodification(); int i = cursor; if (i >= size) throw new NoSuchElementException(); Object[] elementData = ArrayList.this.elementData; if (i >= elementData.length) throw new ConcurrentModificationException(); cursor = i + 1; return (E) elementData[lastRet = i]; } final void checkForComodification() { //集合没修改一次,modCount就会加一,modCount是当前修改次数,expectedModCount是希望修改次数, if (modCount != expectedModCount) //如果次数不相等,就会抛出异常 throw new ConcurrentModificationException(); }
- 演示CopyOnWriteArrayList在迭代中可以修改
public class CopyOnWriteArrayListDemo1 { public static void main(String[] args) { // ArrayList<String > list = new ArrayList<String>(); CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(); for (int i = 0; i < 5; i++) { list.add((i+1) + ""); } Iterator<String> iterator = list.iterator(); while (iterator.hasNext()){ System.out.println("list is " + list); String next = iterator.next(); System.out.println(next); //Exception in thread "main" java.util.ConcurrentModificationException //ArrayList不能在迭代的时候进行修改 if (next.equals("2")){ list.remove("5"); } if (next.equals("3")){ list.add("3 found"); } } } }
CopyOnWriteArrayList在添加元素的时候,会将这个容器拷贝一份新的出来,然后进行添加删除。结束后,再将对象引用指向这个新的容器里。
创建新副本,读写分离
不可变原理:因为每次都是创建一个新的副本,旧的副本就完全是线程安全的,就可以并发的去读取。
迭代的时候:不会报错,但是数据不是最新的
public class CopyOnWriteArrayListDemo2 { public static void main(String[] args) { CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<Integer>(new Integer[]{1, 2, 3}); System.out.println(list); // iterator的个数取决于诞生时间,而不是迭代时间 Iterator<Integer> itr1 = list.iterator(); list.add(4); System.out.println(list); Iterator<Integer> itr2 = list.iterator(); itr1.forEachRemaining(System.out::println); itr2.forEachRemaining(System.out::println); } }9.2、CopyOnWriteArrayList的缺点
- 数据一致性问题:只能保证数据的最终一致性,不能保证数据的事实一致性。所以如果希望写入的数据,马上可以读取到,请不要使用
- 内存占用问题:因为CopyOnWriteArrayList的写是复制机制,所以在进行写操作的时候,内存会同时驻扎两个对象的内存
数据结构是数组
public class CopyOnWriteArrayList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable { private static final long serialVersionUID = 8673264195747942595L; //使用的ReentrantLock来保证线程的安全性 final transient ReentrantLock lock = new ReentrantLock(); //用数组来存取数据 private transient volatile Object[] array; public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //复制出一个新的数组 Object[] newElements = Arrays.copyOf(elements, len + 1); //将添加的数据放入到新数组的最后一位 newElements[len] = e; //引用指向新的数组 setArray(newElements); return true; } finally { lock.unlock(); } } private E get(Object[] a, int index) { //直接返回下标的数据,没有进行加锁的操作 return (E) a[index]; } }
用队列可以在线程间传递数据:生产者消费者模式、银行转账
队列的关系图
11.1、ArrayBlockingQueue阻塞队列
public class ArrayBlockingQueueDemo { private static final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3); public static void main(String[] args) { Interviewer r1 = new Interviewer(queue); Consumer r2 = new Consumer(queue); new Thread(r1).start(); new Thread(r2).start(); } } class Interviewer implements Runnable{ BlockingQueue queue; public Interviewer(BlockingQueue queue){ this.queue = queue; } @Override public void run() { System.out.println("10个候选人都来啦"); for (int i = 0; i < 10; i++) { String candidate = "Candidate" + i; try { queue.put(candidate); System.out.println("安排好了"+candidate); }catch (InterruptedException e){ e.printStackTrace(); } } try { queue.put("stop"); } catch (InterruptedException e) { e.printStackTrace(); } } } class Consumer implements Runnable{ BlockingQueue<String> queue; public Consumer(BlockingQueue queue){ this.queue = queue; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String msg; try { while (!(msg = queue.take()).equals("stop")){ System.out.println(msg + "到了"); } System.out.println("所有候选人都结束了"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); } }put源码
public void put(E e) throws InterruptedException { //判断是不是null checkNotNull(e); final ReentrantLock lock = this.lock; //等待的过程中可以被中断的 lock.lockInterruptibly(); try { //判断当前队列是否已经满了 while (count == items.length) notFull.await(); //完成入队 enqueue(e); } finally { lock.unlock(); } }11.2、LinkedBlockingQueue阻塞队列
LinkedBlockingQueue的put的源码
private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); public void put(E e) throws InterruptedException { //检查放的是不是空 if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; //保证了并发性 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //判断队列是否满了 while (count.get() == capacity) { notFull.await(); } enqueue(node); //返回的是旧值,没有加1之前的值 c = count.getAndIncrement(); //判断当前容量是否还没有满 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }11.3、PriorityBlockingQueue阻塞队列
- 支持优先级
- 自然顺序(而不是先进先出)
- 无界队列
- PriorityQueue线程安全版本
11.4、SynchronousQueue阻塞队列
- 它的容量是0
- 没有peek等函数
- 他是一个非常好的直接传递的并发线程结构
- Executors.newCachedThreadPool()使用的阻塞队列
11.5、DelayQueue阻塞队列
- 延迟队列,根据延迟时间排序
- 元素需要实现Delay接口,规定排序规则
- 无界队列
非阻塞队列只有ConcurrentLinkedQueue这一种,是使用链表作为其数据结构,使用CAS非阻塞算法来实现线程安全的,适用于在对性能较高的并发场景
public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); //死循环,做cas操作 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } else if (p == q) p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } } boolean casNext(Node<E> cmp, Node<E> val) { //采用UNSAFE来保证cas操作,保证线程安全 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
根据:边界,空间,吞吐量 来进行选择