可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的的内层方法会自动获取锁(前提是锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。
Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
可重入锁的种类:
文档地址
LockSupport是一个编程工具类,主要是为了阻塞和唤醒线程用的,它的内部其实两类主要的方法:park(停车阻塞线程)和unpark(启动唤醒线程)。
3种让线程等待和唤醒的方法:
park()/park(Object blocker) - 阻塞当前线程阻塞传入的具体线程
public class LockSupport { ... public static void park() { UNSAFE.park(false, 0L); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } ... }
permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回。
unpark(Thread thread) - 唤醒处于阻塞状态的指定线程
public class LockSupport { ... public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } ... }
调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,pemit值还是1)会自动唤醒thead线程,即之前阻塞中的LockSupport.park()方法会立即返回。
public class WaitNotifyDemo { static Object lock = new Object(); public static void main(String[] args) { new Thread(()->{ synchronized (lock) { System.out.println(Thread.currentThread().getName()+" come in."); try { lock.wait(); } catch (Exception e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName()+" 换醒."); }, "Thread A").start(); new Thread(()->{ synchronized (lock) { lock.notify(); System.out.println(Thread.currentThread().getName()+" 通知."); } }, "Thread B").start(); } }
结论:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ConditionAwaitSignalDemo { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" come in."); lock.lock(); condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println(Thread.currentThread().getName()+" 换醒."); },"Thread A").start(); new Thread(()->{ try { lock.lock(); condition.signal(); System.out.println(Thread.currentThread().getName()+" 通知."); }finally { lock.unlock(); } },"Thread B").start(); } }
结论:
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit),permit只有两个值1和零,默认是零。可以把许可看成是一种(0.1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
也就是说,调用LockSupport的park方法时,需要消耗一张许可证,如果没有获取到许可证(也就是permit的值为0)时,就会一直阻塞,等待许可证来进行消耗,如果有许可证可消耗,就会停止阻塞。调用LockSupport的unpark方法,就是生产一份许可证(permint的值加一,由原来的0变为1)。而且许可证的的累加上限是1。
线程阻塞需要消耗凭证(permit),这个凭证最多只有1个。当调用park方法时,如果有凭证,则会直接消耗掉这个凭证然后正常退出。如果无凭证,就必须阻塞等待凭证可用。而unpark则相反,它会增加一个凭证,但凭证最多只能有1个,累加无效。
public class LockSupportDemo { public static void main(String[] args) { Thread a = new Thread(()->{ System.out.println(Thread.currentThread().getName() + " come in. " + System.currentTimeMillis()); LockSupport.park(); System.out.println(Thread.currentThread().getName() + " 换醒. " + System.currentTimeMillis()); }, "Thread A"); a.start(); Thread b = new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(a); System.out.println(Thread.currentThread().getName()+" 通知."); }, "Thread B"); b.start(); } } ======================结果======================= Thread A come in. Thread B 通知. Thread A 换醒.
结论:
因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
因为凭证的数量最多为1(不能累加),连续调用两次 unpark和调用一次 unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行。
AQS指的是AbstractQueuedSynchronizer 抽象队列同步器。通过内置的FIFO队列来完成资源获取线程的排队工作,以及一个int类型变量表示持有锁的状态。是构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石。
CLH:Craig、Landin and Hagersten队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO。
进一步理解锁和同步器的关系
实现了AQS的锁有:自旋锁、互斥锁、读锁写锁、条件产量、信号量、栅栏都是AQS的衍生物。
多个线程共享同一个资源时,同一时刻只有一个线程获取到锁,抢占到资源,抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制,抢占资源失败的线程继续去等待但等候线程仍然保留获取锁的可能且获取锁流程仍在继续。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupportpark)的方式,维护state变量的状态,使并发达到同步的控制效果。
以Lock lock = new ReentrantLock();
/** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); }
ReentrantLock默认是非公平锁,定义了一个最终类Sync来创建非公平锁。NonfairSync继承与Sync类,Sync类又继承于AbstractQueuedSynchronizer。
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
/** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFo队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node,节点来实现锁的分配,通过CAS完成对State值的修改。
static final class Node{ ...... } /** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state;
Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。
NonfairSync ,FairSync继承于 Sync类, ReentrantLock 是基于 NonfairSync ,FairSync来实现的,Sync类又继承于AbstractQueuedSynchronizer 。
公平锁与非公平锁抢占锁的区别:
非公平锁:
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁:
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()
hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。
hasQueuedPredecessors() 在AbstractQueuedSynchronizer中
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。如果获取不到,再去队列中排队。
整个ReentrantLock 的加锁过程,可以分为三个阶段:
带入一个银行办理业务的案例来模拟我们的AQS 如何进行线程的管理和通知唤醒机制,3个线程模拟3个来银行网点,受理窗口办理业务的顾客。
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class AQSDemo { public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); //带入一个银行办理业务的案例来模拟我们的AQs 如何进行线程的管理和通知唤醒机制 //3个线程模拟3个来银行网点,受理窗口办理业务的顾客 //A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理 new Thread(()->{ lock.lock(); try { System.out.println(Thread.currentThread().getName() + " come in."); try { TimeUnit.SECONDS.sleep(5);//模拟办理业务时间 } catch (InterruptedException e) { e.printStackTrace(); } } finally { lock.unlock(); } }, "Thread A").start(); //第2个顾客,第2个线程---->,由于受理业务的窗口只有一个(只能一个线程持有锁),此代B只能等待, //进入候客区 new Thread(()->{ lock.lock(); try { System.out.println(Thread.currentThread().getName() + " come in."); } finally { lock.unlock(); } }, "Thread B").start(); //第3个顾客,第3个线程---->,由于受理业务的窗口只有一个(只能一个线程持有锁),此代C只能等待, //进入候客区 new Thread(()->{ lock.lock(); try { System.out.println(Thread.currentThread().getName() + " come in."); } finally { lock.unlock(); } }, "Thread C").start(); } }
非公平锁为例进入源码: 首先是运行线程A,
public void lock() { sync.lock(); } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
线程A进入,先调用lock()方法,获取锁,此时state(锁的同步状态)处于初始状态,即为0,然后进行判断比较,使用的是compareAndSetState()方法,CAS的思想,底层实现是unsafe类,比较锁状态的期望值和预计值是否相同,相同则更新状态1,接着设置当前线程为独占的所有者线程。线程A占用锁成功,更改锁同步的状态为1,开始处理自己的业务。
接着线程B进入,也调用lock方法,进行状态比较的时候,发现线程A正在占用,所以就会走acquire(1);方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
先调用tryAcquire(arg),尝试再次获取锁
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 线程A已经占用,状态更改为1 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 获取当前获取独占的的所有者的线程 线程A int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 返回false }
接着进入addWaiter(Node.EXCLUSIVE)方法
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 该node节点是封装线程B的节点 Node node = new Node(Thread.currentThread(), mode); // 自定义一个节点根据当前的线程以及占用模式 获取当前的线程,已经占用模式 独占 // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 初始状态是null 将null赋值值新创建节点 if (pred != null) { node.prev = pred; // if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); // 将节点插入队列中 return node; // 返回线程B的节点 }
线程B将创建的节点插入队列
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { // 该node节点是封装线程B的节点 for (;;) { // 自旋 通过自旋 进入下一轮判断 Node t = tail; // 此时tail为null if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 插入一个哨兵节点 tail = head; // 此时head为传过来的node哨兵节点 赋值给尾节点 } else { // 第二次进来 t的值为哨兵节点 node.prev = t; //线程B的前置节点是哨兵节点 if (compareAndSetTail(t, node)) { // t是哨兵节点 node是线程B封装的节点 期望是哨兵节点 实际也是哨兵节点 尾节点更改为 线程B封装的节点 t.next = node; // 设置哨兵的后置节点是 线程B封装的节点 return t; // 返回哨兵节点 } } } } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); // 将输入的节点和预期的节点进行比较 如果原来的为null 则设置为传过来的节点 } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { // 比较期望的节点和实际的节点是否相同,相同则更新值 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
然后线程B进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { // 该节点是线程B的节点 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 获取线程B节点的前置节点 if (p == head && tryAcquire(arg)) { // 判断是否是头节点并尝试获取锁 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 线程B返回flase 再次此自旋 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; // 获取哨兵节点 if (p == null) throw new NullPointerException(); else return p; // 返回 } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // pred 哨兵节点 node 线程B节点 int ws = pred.waitStatus; // 获取节点的状态值 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // CAS比较 } return false; } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 消耗许可证,未获取进入阻塞状态 return Thread.interrupted(); }
线程B加入等待队列。
线程A依然工作,线程C如线程B那样炮制加入等待队列。
双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。
假设线程A工作结束,调用unLock(),释放锁占用。
线程A调用unlock()方法
public void unlock() { sync.release(1); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 获取到线程B if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); // 锁的状态更改为原始的 释放独占所有者的线程 return free; } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; // 获取线程B的下个节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 生产 许可证 } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }
获取到许可证 线程B不在阻塞,再次去尝试获取锁成功,线程B执行业务。线程C也是如此。