面试官:谈一谈java中基于AQS的并发锁原理
我:java中的AQS是指AbstractQueuedSynchronizer类,java中并发锁控制逻辑都是基于这个类来实现的。
面试官:能说一下你用过的基于AQS的并发类有哪些吗?
我:首先是基于AQS在内部类实现了公平锁和非公平锁,具体有3个类:ReentrantLock、ReentrantReadWriteLock、Semaphore,UML类图如下:
还有2个Latch类基于AQS实现了并发控制,他们是CountDownLatch和LimitLatch,UML类图如下:
面试官:谈一下AQS是怎么获取锁的?
我:首先,AbstractQueuedSynchronizer是一个基于FIFO的队列实现的并发控制,队列中的元素通过操作共享资源state来获取和释放锁,state是一个volatile修饰的int类型变量。我以ReentrantLock中独占锁为例,如果有一个线程来获取锁,这时如果队列中没有元素,那就把这个线程加入队列,同时线程申请的数量加入到state变量。如果队列中已经有元素,这个线程入队尾,之后线程中断等待队列前一个元素释放锁后唤醒。
下面的流程是一个获取锁的流程,如果下面的流程返回false,则把当前线程加入到等待队列。
面试官:读过这部分的源代码吗,能不能讲一下?
我:看一下流程中的源码
final void lock() { acquire(1);//获取1个资源 } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取锁失败,入队 selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //是否有等待时间更长的元素 compareAndSetState(0, acquires)) {//自旋锁 setExclusiveOwnerThread(current);//设置当前线程为独占线程 return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //当前队列中是否有等待时间更长的元素 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 下面的代码是加入到等待队列的过程 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//前置节点是head并且获取成功 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) {//注意:如果前置节点不通知,继续往前查找,找到一个可以通知的节点作为当前节点的node节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//把前置节点的状态设置为通知状态-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //把当前节点加入到队尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //队列为空,初始化队列,head=tail=node if (compareAndSetHead(new Node())) tail = head; } else {//队列非空,放到队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这儿要注意:队列上的元素是Node对象,node中定义了waitStatus变量,有4个状态,
static final int CANCELLED = 1;//当前线程已经取消锁等待 static final int SIGNAL = -1;//下一个节点需要被唤醒 static final int CONDITION = -2;//当前线程正在等待condition,这个状态需要跟condition配合使用 static final int PROPAGATE = -3;//释放锁的时候使用,这个状态只给头节点,并且要不断传播下去
面试官:再说一下释放锁的过程?
我:锁的释放过程比较简单,还是以ReentrantLock为例。首先尝试释放锁(state变量中减去1),把当前锁的拥有者置空,通知队列中下一个节点。整个流程入下:
面试官:这部分的源代码能不能讲一下?
我:以ReentrantLock为例,源码入下:
public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//通知链表中下个等待节点 return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread())//锁的当前拥有者不是当前线程,抛出异常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null);//设置当前锁拥有者为空 } setState(c); return free; } protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } private void unparkSuccessor(Node node) { //把当前节点的等待状态置为0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //如果队列中下一个节点为空或者下一个节点等待状态是取消状态(1),则从队尾开始查找,知道找到一个非空并且等待状态小于0的节点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev)//为什么从队尾开始查找,这样遍历整个队列啊?但是如果当前节点的下一个是null,那就始终找不到下下个节点了,必须从队尾找。 if (t.waitStatus <= 0) s = t; } if (s != null)//唤醒等待线程 LockSupport.unpark(s.thread); }
面试官:上面你讲的是独占锁,那AQS中的共享锁怎么用的?
我:首先共享锁是指多个线程可以同时使用这个锁,AQS中的使用是只要不超过共享锁允许的总数,都可以获取到。在获取读锁时,首先尝试获取共享锁,如果获取失败,入队后等待获取。以ReentrantReadWriteLock为例,获取共享锁流程入下:
从上面这个流程可以看到,如果获取不到锁,就会进入fullTryAcquireShared,这个方式是在死循环中不断尝试获取到锁,直到成功。
面试官:这部分的源代码能介绍一下吗?
我:源码入下
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//先尝试获取,获取失败,则进入队列等待获取 doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)//其他线程已经获取到排它锁 return -1; int r = sharedCount(c); if (!readerShouldBlock() && //是否需要阻塞,在公平锁中,如果队列中已经有元素了那就返回true;在非公平锁中,队列中第二个元素不是共享节点,返回true r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {//cas失败 if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } final int fullTryAcquireShared(Thread current) {//下面的方法前面都讲过了 HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//共享模式入队 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {//获取成功 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //失败后等待前置节点通知 parkAndCheckInterrupt()) //睡眠后等待唤醒 interrupted = true; } } finally { if (failed) cancelAcquire(node);//不再获取 } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);//自己成功头结点从而唤醒 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared();//唤醒下一个节点 } }
面试官:好的,共享锁的释放流程是怎样的?
我:跟共享锁的获取流程一样,先尝试释放(state变量中减去1),成功后唤醒队列中下一个等待线程
这部分代码如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {//当前线程是第一个获取到锁的线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count;//释放一个count数减1 } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc))//CAS修改state值 // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//当前节点waiteStatus置为0 continue; // loop to recheck cases unparkSuccessor(h);//唤醒下一个节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//头节点waiteStatus置为-3 continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
面试官:前面你提到了公平锁和非公平锁,这2个有什么区别呢?
我:一般情况下,公平锁是指当前线程在释放锁的时候,会通知一个等待队列中等待时间最长的线程,而非公平锁,当前线程释放锁的时候,会随机通知一个线程。非公平锁代码如下:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在ReentrantLock中,公平锁和非公平锁的不同就是非公平锁会直接尝试compareAndSetState(0, 1),失败后才走获取锁流程。而公平锁直接走获取锁流程。
在ReentrantReadWriteLock中,公平锁只要队列中有其他线程占用锁,读写锁就需要阻塞,而非公平锁中写锁不阻塞,读锁只有队列中第一个排队等待线程使用独占锁时才阻塞,代码如下
static final class NonfairSync extends Sync { final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive();//队列中第一个等待线程以独占方式等待锁 } } final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; } static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors();//队列中有元素正在使用锁或者第一个等待的线程不是当前线程 } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
面试官:好的,再谈一谈CountDownLatch的使用和原理?
我:上面的原理理解了之后,CountDownLatch的使用就非常简单了。示例代码如下:
public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3);//将state置为3 for (int i = 0; i < 3; i++){ LocalThreadPool.run(new WorkerThread(latch)); } System.out.println("latch test begin"); latch.await();//获取到共享锁 System.out.println("latch test end"); } static class WorkerThread implements Runnable { private CountDownLatch latch; public WorkerThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " finished"); latch.countDown();//将state值减1,最后一个线程将state减为0后,释放上面await方法获取到的共享锁。 } } }
上面输出结果如下:
latch test begin pool-1-thread-3 finished pool-1-thread-2 finished pool-1-thread-1 finished latch test end
初始化的时候,批量将state值设置为线程数量,然后通过await获取共享锁。每个线程执行完成后调用释放锁的方法将state减1,最后一个线程将state减为0后返回true,这时CountDownLatch就会释放掉共享锁。看下面源代码
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//state减为0才会执行这个逻辑 doReleaseShared();//释放await()方法获取到的共享锁 return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0;//减为0时返回true } }
面试官:那CountDownLatch在await方法中加入参数是怎么实现等待超时的呢?
我:这个就是在一个死循环中不断获取锁,直到超时,这个是AQS本身就支持的逻辑,代码如下:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;//设置申请锁失败时间 final Node node = addWaiter(Node.SHARED); try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {//超时,取消获取锁 cancelAcquire(node); return false; } if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }
面试官:那CountDownLatch设置等待超时时间,有什么好处呢?
我:这个主要作用是设置主线程等待时间,以免长期阻塞主线程。从上面源代码看出这个并不影响任务线程的执行,不过如果等待任务执行线程执行完成后再做一些日志或者通知,就会失败,因为超时后直接就会调用这些日志或通知,不一定真的所有任务都完成了。
比如下面的代码:
public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); for (int i = 0; i < 3; i++){ LocalThreadPool.run(new WorkerThread(latch)); } System.out.println("latch test begin"); latch.await(1, TimeUnit.SECONDS); System.out.println("latch test end"); } static class WorkerThread implements Runnable { private CountDownLatch latch; public WorkerThread(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " finished"); latch.countDown(); } } }
输出结果是
latch test begin latch test end pool-1-thread-2 finished pool-1-thread-1 finished pool-1-thread-3 finished
面试官:再聊一聊Semaphore的使用?
我:Semaphore的使用也是基于AQS的,它更像一个限流器,初始化是给state赋值,每次执行任务时Semaphore获取共享锁并且将state值减1。如果state值小于0则入队等待。任务执行完成后,Semaphore释放锁,首先state值加1,如果state小于0,则通知队列中下一个等待线程。我们可以用Semaphore实现一个数据库连接池,代码如下:
class ConnectionPool { final List<Connection> pool = new ArrayList<>(10);//这儿没有考虑线程安全 final Semaphore sem = new Semaphore(10);//将state设置为10 //构造函数初始化连接池 ConnectionPool(int size) { for (int i = 0; i < size; i++) { pool.add(new Connection() { //省略实现代码 }); } } //执行任务过程 void doExecution() throws InterruptedException { Connection connection = null; sem.acquire();//获取可中断共享锁,如果state小于1,则进入锁等待队列 try { connection = pool.remove(0);//从连接池取连接 connection.commit();//执行事务 } catch (SQLException e) { e.printStackTrace(); } finally { pool.add(connection);//用完后归还连接池 sem.release();//释放锁,将state值加1,如果state小于0,去等待队列唤醒下一个等待线程 } } }
面试官:再聊一聊Condition的使用?
我:Condition类的作用是可以实现wait-notify机制,比synchronized灵活,可以在一个Lock上创建多个Condition,线程选择注册不同的Condition进项调度。假设我们现在有一个需求,2个线程轮流打印出1到10的数字,这个时候我们可以用信号量来做,代码如下:
public class ConditionTest { private ReentrantLock lock = new ReentrantLock(); public Condition conditionA = lock.newCondition(); public Condition conditionB = lock.newCondition(); private int count = 0; public String printThreadA() throws InterruptedException { while (count < 10){ lock.lock(); System.out.println(Thread.currentThread().getName() + ":" + (++count)); conditionB.signal();//打印后唤醒B线程打印 conditionA.await();//打印后等待B线程唤醒 lock.unlock(); } return null; } public String printThreadB() throws InterruptedException { while (count < 10){ lock.lock(); System.out.println(Thread.currentThread().getName() + ":" + (++count)); conditionA.signal();//打印后唤醒A线程打印 conditionB.await();//打印后等待A线程唤醒 lock.unlock(); } return null; } public void printTenNumber(){ LocalThreadPool.call(() -> printThreadA()); LocalThreadPool.call(() -> printThreadB()); } public static void main(String[] args){ ConditionTest conditionTest = new ConditionTest(); conditionTest.printTenNumber(); } }
上面signal()方法具体实现为将当前线程从condition等待队列转入锁等待队列队尾等待前一个节点唤醒,await()方法的具体实现为在condition等待队列队尾新加一个等待者,释放锁并且唤醒下一个等待线程。所以,condition的await和signal本质上是当前线程在condition等待队列和锁等待队列直接的转移。
代码实现如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { //waitStatus置为初始化 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; //放入锁等待队列队尾并且把前置节点置为SIGNAL,以通知当前线程 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter();//等待队列队尾新加一个等待者 int savedState = fullyRelease(node);//释放锁并且唤醒下一个等待线程 //删除部分代码 }
面试官:最后一个问题,如果我们自己实现一个AQS的类,需要实现哪些方法呢?
我:AQS类中以下5个方法没有实现,都是throw new UnsupportedOperationException(),这些方法留给自定义子类来实现。
protected boolean tryAcquire(int arg):独占方式获取锁 protected boolean tryRelease(int arg):独占方式释放锁 protected int tryAcquireShared(int arg):共享方式获取锁 protected boolean tryReleaseShared(int arg):共享方式释放锁 isHeldExclusively():判断当前线程是否正在独占锁
面试官:恭喜你,通过了。
微信公众号,所有文章都是个人原创,欢迎关注