Lock是J.U.C中最核心的工具,它的作用和前面所讲解的synchronized一样,也是用来解决多线程环境下的线程安全性问题。在J.U.C这个包中,很多的地方都有用到Lock这个机制。
J.U.C全称是java.util.concurrent,是并发编程中比较常用的工具包,这个包中包含很多用来在并发场景中使用的组件,比如线程池、阻塞队列、计时器、同步器、并发集合等等。并发包的作者是大名鼎鼎的Doug Lea。
在Lock接口出现之前,Java中的应用程序对于多线程的并发安全处理只能基于synchronized关键字来解决。但是synchronized在有些场景中会存在一些短板,也就是它并不适合于所有的并发场景。但是在Java5以后,Lock的出现可以解决synchronized在某些场景中的短板,它比synchronized更加灵活。
Lock是一个接口,它定义了释放锁和获得锁的抽象方法,实现Lock接口的类有很多,以下为几个常见的锁实现:
ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
ReentrantReadWriteLock:重入读写锁,它实现了ReadWriteLock接口,在这个类中维护了两个锁,一个是ReadLock,一个是WriteLock,他们都分别实现了Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。
StampedLock: stampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock是一种乐观的读策略,使得乐观锁完全不会阻塞写线程
之前在《大话Synchronized及锁升级》中讲过count++ 10000得到的结果是小于一万的,原因是非原子性,一种办法是加synchronized,另一种办法是加lock锁,代码如下:
public class ReentrantLockDemo { static ReentrantLock lock = new ReentrantLock(); static int count = 0; public static void incr() { //抢占锁,如果没有抢占到锁,会阻塞 lock.lock(); try { count++; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10000; i++) { new Thread(ReentrantLockDemo::incr).start(); } Thread.sleep(6000); System.out.println("result:" + count); } }
最终结果是10000。
上面就是用法,需要注意的是一定要在 finally 块中使用 unlock() 来解锁。
重入锁,表示同一个线程可以重复获得同一把锁,也就是说,如果当前线程t1通过调用lock方法获取了
锁之后,再次调用lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized和
ReentrantLock都是可重入锁。
下面就是ReentrantLock里面的类图关系:
ReentrantLock构造函数默认是非公平锁,也可以指定。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
下面进入重点部分,讲解 AbstractQueuedSynchronizer(AQS) 源码。
我们先来介绍一下 AQS(AbstractQueuedSynchronizer)的重要性,来看看 AQS 被用在了哪些类里面。
如图所示,AQS 在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、ThreadPoolExcutor 的 Worker 中都有运用(JDK 1.8),AQS 是这些类的底层原理。
而以上这些类,很多都是我们经常使用的类,大部分我们在前面课时中也已经详细介绍过,所以说 JUC 包里很多重要的工具类背后都离不开 AQS 框架,因此 AQS 的重要性不言而喻,AQS是J.U.C的基石。
首先进入公平锁的lock方法,看到一个acquire方法
点进去,进入到父类AQS类里面去
点击 tryAcquire 再回到具体的子类 tryAcquire 里面来
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //这里c就是锁的标记,0就是没抢到锁,1就是抢到锁了,2,3,4,5..就代表重入次数 int c = getState(); //表示无锁状态 if (c == 0) { //如果队列里面是空的,然后去CAS抢锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //如果抢成功了,就把当前线程存到exclusiveOwnerThread中去 setExclusiveOwnerThread(current); return true; } } //如果已经有锁了,看下是不是自己的 else if (current == getExclusiveOwnerThread()) { //是的话就进去把state的状态+1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
下面看下非公平锁的lock方法
final void lock() { //这个就是非公平锁的体现了,管你队列里有没有等待的线程,一上来先CAS一下,看下能不能抢成功 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //没成功的话继续acquire acquire(1); }
还是进到AQS类里面的方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
还是点击tryAcquire进入到非公平锁里面的实现
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
又跳回到Sync类里面的,注意啊这里面很绕,几个类来回跳,所以要先看清楚上面几个类的继承关系
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //无锁 if (c == 0) { //CAS抢锁 if (compareAndSetState(0, acquires)) { //成功的话把当前线程记录一下 setExclusiveOwnerThread(current); return true; } } //如果已经有锁并且是自己的话,把state+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
讲到这里,就要讲下公平锁和非公平锁的区别了。
假设线程 A 持有一把锁,线程 B 请求这把锁,由于线程 A 已经持有这把锁了,所以线程 B 会陷入等待,在等待的时候线程 B 会被挂起,也就是进入阻塞状态,那么当线程 A 释放锁的时候,本该轮到线程 B 苏醒获取锁,但如果此时突然有一个线程 C 插队请求这把锁,那么根据非公平的策略,会把这把锁给线程 C,这是因为唤醒线程 B 是需要很大开销的,很有可能在唤醒之前,线程 C 已经拿到了这把锁并且执行完任务释放了这把锁。相比于等待唤醒线程 B 的漫长过程,插队的行为会让线程 C 本身跳过陷入阻塞的过程,如果在锁代码中执行的内容不多的话,线程 C 就可以很快完成任务,并且在线程 B 被完全唤醒之前,就把这个锁交出去,这样是一个双赢的局面,对于线程 C 而言,不需要等待提高了它的效率,而对于线程 B 而言,它获得锁的时间并没有推迟,因为等它被唤醒的时候,线程 C 早就释放锁了,因为线程 C 的执行速度相比于线程 B 的唤醒速度,是很快的,所以 Java 设计者设计非公平锁,是为了提高整体的运行效率。
体现在上面的源码中,tryAcquire和nonfairTryAcquire方法长的差不多,唯一的区别就在于公平锁多了一个!hasQueuedPredecessors()
判断条件,如果是公平锁,那么一旦已经有线程在排队了,当前线程就不再尝试获取锁;对于非公平锁而言,无论是否已经有线程在排队,都会尝试获取一下锁,获取不到的话,再去排队。非公平锁介于上一个线程刚释放锁,这时候state为0,然后唤醒下一个线程去CAS抢锁,这时候另一个线程正好进来,不守规矩CAS一下给先抢到了,这样的一个临界区。
ReentrantLock默认是非公平锁。
public ReentrantLock() { sync = new NonfairSync(); }
上面acquire失败的话证明有线程在使用,或者不是当前线程不能重入,所以需要把当前线程加入到双向链表中去,进行等待。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)把它拆成两个方法。
private Node addWaiter(Node mode) { //把当前线程封装成一个Node节点。 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //尾节点不为空 if (pred != null) { node.prev = pred; //尝试CAS插入到链尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果为空或者插入到尾节点失败了 enq(node); return node; } private Node enq(final Node node) { //自旋 for (;;) { Node t = tail; //尾节点为空,进行初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) //头节点和尾节点都指向一个空节点 tail = head; } else { node.prev = t; //反复的CAS插入到链尾直到成功 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
再看acquireQueued()
//添加到链尾以后,进来先自旋一遍 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //自旋 for (;;) { final Node p = node.predecessor(); //如果前面一个节点是头结点,那证明自己是排第一个位置,然后进行tryAcquire //tryAcquire上面分析过了,分公平锁和非公平锁 if (p == head && tryAcquire(arg)) { //如果抢到了把自己设为头节点,然后直接返回 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //否则,让线程去阻塞(park) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //此方法主要用于检查状态,看看自己是否真的可以去休息了,即进入waiting状态, //万一队列前边的线程都放弃了,却在那里干等着,这样肯定是不行的 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //LockSupport.park 阻塞 LockSupport.park(this); //中断状态 return Thread.interrupted(); }
接下来看解锁的过程
public void unlock() { sync.release(1); }
进入release方法
public final boolean release(int arg) { if (tryRelease(arg)) { //得到当前AQS队列中的head节点。 Node h = head; //head节点不为空 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
解锁
protected final boolean tryRelease(int releases) { //这里是减1,因为可能是重入的 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果是0了,就完全释放了 if (c == 0) { //标记为释放 free = true; //标记当前线程的拥有者为空 setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //表示可以唤醒状态 if (ws < 0) //恢复成0 compareAndSetWaitStatus(node, ws, 0); //头结点的下一个结点 Node s = node.next; //如果结点为空或者线程已经被销毁、出现异常等等 if (s == null || s.waitStatus > 0) { //将节点置为空 s = null; //从链尾开始找,查找小于等于0的节点唤醒 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //非空的话唤醒线程 if (s != null) LockSupport.unpark(s.thread); }
好,AQS的源码到这里就分析结束了,很清晰明了,很简单,把上面的流程用一张图演示,加深下印象。
最后感谢大家的收看~