Java教程

面试官:谈一谈java中基于AQS的并发锁原理

本文主要是介绍面试官:谈一谈java中基于AQS的并发锁原理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

面试官:谈一谈java中基于AQS的并发锁原理

我:java中的AQS是指AbstractQueuedSynchronizer类,java中并发锁控制逻辑都是基于这个类来实现的。

面试官:能说一下你用过的基于AQS的并发类有哪些吗?

我:首先是基于AQS在内部类实现了公平锁和非公平锁,具体有3个类:ReentrantLock、ReentrantReadWriteLock、Semaphore,UML类图如下:

还有2个Latch类基于AQS实现了并发控制,他们是CountDownLatch和LimitLatch,UML类图如下:

   

面试官:谈一下AQS是怎么获取锁的?

我:首先,AbstractQueuedSynchronizer是一个基于FIFO的队列实现的并发控制,队列中的元素通过操作共享资源state来获取和释放锁,state是一个volatile修饰的int类型变量。我以ReentrantLock中独占锁为例,如果有一个线程来获取锁,这时如果队列中没有元素,那就把这个线程加入队列,同时线程申请的数量加入到state变量。如果队列中已经有元素,这个线程入队尾,之后线程中断等待队列前一个元素释放锁后唤醒。
下面的流程是一个获取锁的流程,如果下面的流程返回false,则把当前线程加入到等待队列。

面试官:读过这部分的源代码吗,能不能讲一下?

我:看一下流程中的源码

final void lock() {
    acquire(1);//获取1个资源
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取锁失败,入队
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //是否有等待时间更长的元素
            compareAndSetState(0, acquires)) {//自旋锁
            setExclusiveOwnerThread(current);//设置当前线程为独占线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//当前队列中是否有等待时间更长的元素
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
下面的代码是加入到等待队列的过程
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {//前置节点是head并且获取成功
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {//注意:如果前置节点不通知,继续往前查找,找到一个可以通知的节点作为当前节点的node节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {//把前置节点的状态设置为通知状态-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //把当前节点加入到队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { //队列为空,初始化队列,head=tail=node
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//队列非空,放到队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 这儿要注意:队列上的元素是Node对象,node中定义了waitStatus变量,有4个状态,

static final int CANCELLED =  1;//当前线程已经取消锁等待
static final int SIGNAL    = -1;//下一个节点需要被唤醒
static final int CONDITION = -2;//当前线程正在等待condition,这个状态需要跟condition配合使用
static final int PROPAGATE = -3;//释放锁的时候使用,这个状态只给头节点,并且要不断传播下去

面试官:再说一下释放锁的过程?

我:锁的释放过程比较简单,还是以ReentrantLock为例。首先尝试释放锁(state变量中减去1),把当前锁的拥有者置空,通知队列中下一个节点。整个流程入下:

                                                 

面试官:这部分的源代码能不能讲一下?

我:以ReentrantLock为例,源码入下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//通知链表中下个等待节点
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())//锁的当前拥有者不是当前线程,抛出异常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);//设置当前锁拥有者为空
    }
    setState(c);
    return free;
}
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
private void unparkSuccessor(Node node) {
	//把当前节点的等待状态置为0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
	//如果队列中下一个节点为空或者下一个节点等待状态是取消状态(1),则从队尾开始查找,知道找到一个非空并且等待状态小于0的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)//为什么从队尾开始查找,这样遍历整个队列啊?但是如果当前节点的下一个是null,那就始终找不到下下个节点了,必须从队尾找。
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//唤醒等待线程
        LockSupport.unpark(s.thread);
}

面试官:上面你讲的是独占锁,那AQS中的共享锁怎么用的?

我:首先共享锁是指多个线程可以同时使用这个锁,AQS中的使用是只要不超过共享锁允许的总数,都可以获取到。在获取读锁时,首先尝试获取共享锁,如果获取失败,入队后等待获取。以ReentrantReadWriteLock为例,获取共享锁流程入下:

 

从上面这个流程可以看到,如果获取不到锁,就会进入fullTryAcquireShared,这个方式是在死循环中不断尝试获取到锁,直到成功。
面试官:这部分的源代码能介绍一下吗?

我:源码入下

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//先尝试获取,获取失败,则进入队列等待获取
        doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)//其他线程已经获取到排它锁
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() && //是否需要阻塞,在公平锁中,如果队列中已经有元素了那就返回true;在非公平锁中,队列中第二个元素不是共享节点,返回true
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {//cas失败
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {//下面的方法前面都讲过了
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//共享模式入队
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {//获取成功
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  //失败后等待前置节点通知
                parkAndCheckInterrupt()) //睡眠后等待唤醒
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);//不再获取
    }
}
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//自己成功头结点从而唤醒
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();//唤醒下一个节点
        }
    }

面试官:好的,共享锁的释放流程是怎样的?

我:跟共享锁的获取流程一样,先尝试释放(state变量中减去1),成功后唤醒队列中下一个等待线程
这部分代码如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//当前线程是第一个获取到锁的线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;//释放一个count数减1
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))//CAS修改state值
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//当前节点waiteStatus置为0
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//唤醒下一个节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//头节点waiteStatus置为-3
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

面试官:前面你提到了公平锁和非公平锁,这2个有什么区别呢?
我:一般情况下,公平锁是指当前线程在释放锁的时候,会通知一个等待队列中等待时间最长的线程,而非公平锁,当前线程释放锁的时候,会随机通知一个线程。非公平锁代码如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

在ReentrantLock中,公平锁和非公平锁的不同就是非公平锁会直接尝试compareAndSetState(0, 1),失败后才走获取锁流程。而公平锁直接走获取锁流程。
在ReentrantReadWriteLock中,公平锁只要队列中有其他线程占用锁,读写锁就需要阻塞,而非公平锁中写锁不阻塞,读锁只有队列中第一个排队等待线程使用独占锁时才阻塞,代码如下

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();//队列中第一个等待线程以独占方式等待锁
    }
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}
static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();//队列中有元素正在使用锁或者第一个等待的线程不是当前线程
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

面试官:好的,再谈一谈CountDownLatch的使用和原理?
我:上面的原理理解了之后,CountDownLatch的使用就非常简单了。示例代码如下:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);//将state置为3
        for (int i = 0; i < 3; i++){
            LocalThreadPool.run(new WorkerThread(latch));
        }
        System.out.println("latch test begin");
        latch.await();//获取到共享锁
        System.out.println("latch test end");
    }

    static class WorkerThread implements Runnable {
        private CountDownLatch latch;
        public WorkerThread(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " finished");
            latch.countDown();//将state值减1,最后一个线程将state减为0后,释放上面await方法获取到的共享锁。
        }
    }
}

上面输出结果如下:

latch test begin
pool-1-thread-3 finished
pool-1-thread-2 finished
pool-1-thread-1 finished
latch test end

初始化的时候,批量将state值设置为线程数量,然后通过await获取共享锁。每个线程执行完成后调用释放锁的方法将state减1,最后一个线程将state减为0后返回true,这时CountDownLatch就会释放掉共享锁。看下面源代码

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//state减为0才会执行这个逻辑
        doReleaseShared();//释放await()方法获取到的共享锁
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c - 1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;//减为0时返回true
    }
}

面试官:那CountDownLatch在await方法中加入参数是怎么实现等待超时的呢?
我:这个就是在一个死循环中不断获取锁,直到超时,这个是AQS本身就支持的逻辑,代码如下:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//设置申请锁失败时间
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {//超时,取消获取锁
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

面试官:那CountDownLatch设置等待超时时间,有什么好处呢?
我:这个主要作用是设置主线程等待时间,以免长期阻塞主线程。从上面源代码看出这个并不影响任务线程的执行,不过如果等待任务执行线程执行完成后再做一些日志或者通知,就会失败,因为超时后直接就会调用这些日志或通知,不一定真的所有任务都完成了。
比如下面的代码:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);

        for (int i = 0; i < 3; i++){
            LocalThreadPool.run(new WorkerThread(latch));
        }
        System.out.println("latch test begin");
        latch.await(1, TimeUnit.SECONDS);
        System.out.println("latch test end");
    }

    static class WorkerThread implements Runnable {
        private CountDownLatch latch;
        public WorkerThread(CountDownLatch latch) {
            this.latch = latch;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " finished");
            latch.countDown();
        }
    }
}

输出结果是

latch test begin
latch test end
pool-1-thread-2 finished
pool-1-thread-1 finished
pool-1-thread-3 finished

面试官:再聊一聊Semaphore的使用?
我:Semaphore的使用也是基于AQS的,它更像一个限流器,初始化是给state赋值,每次执行任务时Semaphore获取共享锁并且将state值减1。如果state值小于0则入队等待。任务执行完成后,Semaphore释放锁,首先state值加1,如果state小于0,则通知队列中下一个等待线程。我们可以用Semaphore实现一个数据库连接池,代码如下:

class ConnectionPool {
    final List<Connection> pool = new ArrayList<>(10);//这儿没有考虑线程安全
    final Semaphore sem = new Semaphore(10);//将state设置为10
    //构造函数初始化连接池
    ConnectionPool(int size) {
        for (int i = 0; i < size; i++) {
            pool.add(new Connection() {
                //省略实现代码
            });
        }
    }
    //执行任务过程
    void doExecution() throws InterruptedException {
        Connection connection = null;
        sem.acquire();//获取可中断共享锁,如果state小于1,则进入锁等待队列
        try {
            connection = pool.remove(0);//从连接池取连接
            connection.commit();//执行事务
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            pool.add(connection);//用完后归还连接池
            sem.release();//释放锁,将state值加1,如果state小于0,去等待队列唤醒下一个等待线程
        }
    }
}

面试官:再聊一聊Condition的使用?
我:Condition类的作用是可以实现wait-notify机制,比synchronized灵活,可以在一个Lock上创建多个Condition,线程选择注册不同的Condition进项调度。假设我们现在有一个需求,2个线程轮流打印出1到10的数字,这个时候我们可以用信号量来做,代码如下:

 

public class ConditionTest {
    private ReentrantLock lock = new ReentrantLock();
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();
    private int count = 0;
    public String printThreadA() throws InterruptedException {
        while (count < 10){
            lock.lock();
            System.out.println(Thread.currentThread().getName() + ":" +  (++count));
            conditionB.signal();//打印后唤醒B线程打印
            conditionA.await();//打印后等待B线程唤醒
            lock.unlock();
        }
        return null;
    }

    public String printThreadB() throws InterruptedException {
        while (count < 10){
            lock.lock();
            System.out.println(Thread.currentThread().getName() + ":" + (++count));
            conditionA.signal();//打印后唤醒A线程打印
            conditionB.await();//打印后等待A线程唤醒
            lock.unlock();
        }
        return null;
    }

    public void printTenNumber(){
        LocalThreadPool.call(() -> printThreadA());
        LocalThreadPool.call(() -> printThreadB());
    }

    public static void main(String[] args){
        ConditionTest conditionTest = new ConditionTest();
        conditionTest.printTenNumber();
    }
}

上面signal()方法具体实现为将当前线程从condition等待队列转入锁等待队列队尾等待前一个节点唤醒,await()方法的具体实现为在condition等待队列队尾新加一个等待者,释放锁并且唤醒下一个等待线程。所以,condition的await和signal本质上是当前线程在condition等待队列和锁等待队列直接的转移。
代码实现如下:

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
        //waitStatus置为初始化
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
		//放入锁等待队列队尾并且把前置节点置为SIGNAL,以通知当前线程
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();//等待队列队尾新加一个等待者
    int savedState = fullyRelease(node);//释放锁并且唤醒下一个等待线程
    //删除部分代码
}

面试官:最后一个问题,如果我们自己实现一个AQS的类,需要实现哪些方法呢?
我:AQS类中以下5个方法没有实现,都是throw new UnsupportedOperationException(),这些方法留给自定义子类来实现。

protected boolean tryAcquire(int arg):独占方式获取锁
protected boolean tryRelease(int arg):独占方式释放锁
protected int tryAcquireShared(int arg):共享方式获取锁
protected boolean tryReleaseShared(int arg):共享方式释放锁
isHeldExclusively():判断当前线程是否正在独占锁

面试官:恭喜你,通过了。 

 

微信公众号,所有文章都是个人原创,欢迎关注

这篇关于面试官:谈一谈java中基于AQS的并发锁原理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!