AQS是一个FIFO的双向队列,其内部通过head和tail记录队首和队尾元素,队列元素的类型为 Node。
Node 中的 thread变量用来存放进入 AQS 队列的线程;
Node 中的 SHARED 用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的;EXCLUSIVE 用来标记该线程是获取独占资源时被阻塞挂起后放入 AQS 队列的;
waitStatus 记录当前线程等待状态,可以为:CANCELLED (线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION (线程在条件队列里面等待〉、PROPAGATE (释放共享资源时需要通知其他节点);
prev 记录当前节点的前驱节点;next 记录当前节点的后继节点
AQS 的内部类 ConditionObject 结合锁实现线程同步。
ConditionObject 可以直接访问 AQS对象内部的变量,比如 state 和 AQS 队列。
ConditionObject 是条件变量,每个条件变量在内部维护了一个条件队列 (单向链表队列),这个条件队列和 AQS 队列不是一回事。
此处的队列是用来存放调用条件变量的 await 方法后被阻塞的线程,队列的头、尾元素分别为 firstWaiter 和 lastWaiter。
Note:
调用条件变量的 await()方法就相当于调用共享变量的 wait()方法,调用条件变量的 signal方法就相当于调用共享变量的 notify()方法,调用条件变量的 signa!All ( )方法就相当于调用共享变量的 notifyAll()方法。
至此,相信大家都已经知道条件变量是什么了,它能用来做什么。
示例中(2)处使用Lock 对象的 newCondition ()方法创建了一个 ConditionObject 变量,该变量就是 Lock锁对应的一个条件变量。
示例中(3)处获取了独占锁,示例中(4)处则调用了条件变量的 await ()方法阻塞挂起了当前线程。
当其他线程调用条件变量的 signal方法时,被阻塞的线程才会从 await处返回。
Note:
在调用条件变量的 signal 和 await方法前必须先获取条件变量对应的锁,如果在没有获取到锁之前调用了条件变量的 await方法则会抛出 java.lang.IllegalMonitorStateException异常。
一个 Lock对象可以创建多个条件变量。
AQS 只提供了 ConditionObject 的实现,并没有提供 newCondition 函数,该函数用来 new 一个 ConditionObject对象;需要由 AQS 的子类来实现 newCondition 函数。
小结:
下图反映了前面描述的关系:
详解:
当多个线程同时调用 lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁。
如果获取到锁的线程调用了对应条件变量的 await()方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。
此时因调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的 await ()方法则该线程也会被放入条件变量的条件队列里面。
当另外一个线程调用条件变量的 signal()或者 signa!All()方法时,会把条件队列里面的一个或者全部 Node节点移动到 AQS 的阻塞队列里面,等待时机获取锁。
在 AQS 中维持了一个状态值 state,可以通过 getState、setState、compareAndSetState 函数修改其值,代码如下:
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); }
对于 AQS 来说,线程同步的关键是对 state 进行操作。
根据 state 是否属于一个线程,操作 state 的方式分为独占方式和共享方式。
使用独占方式获取的资源是与具体线程绑定的,也就是说如果一个线程获取到了资源,则进行标记,其他线程尝试操作 state 获取资源时会发现当前该资源的持有者不是自己,于是在获取失败后被阻塞。
例子:独占锁 ReentrantLock 的实现
当一个线程获取了Reer rantLock的锁后,在 AQS 内部会使用 CAS 操作把状态值 state 从0 变为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则把状态值从 l 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。
在独占方式下获取和释放资源的方法为:
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } /** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
使用共享方式获取资源与具体线程不相关;当多个线程去请求资源时通过 CAS 方式竞争。
当一个线程获取到资源后,另一个线程尝试获取资源时,如果当前资源能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。
例子:Semaphore 信号量
当一个线程通过 acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。
在共享方式下获取和释放资源的方法为:
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
Note:
AQS是锁和同步容器的基础框架,AQS并没有提供可用的tryAcquire和tryRelease方法。
tryAcquire和 tryRelease需要由具体的子类来实现。
子类在实现 tryAcquire和 tryRelease时要根据具体场景使用 CAS算法尝试修改 state状态值, 成功则返回 true,否则返回 false。
子类还需要定义在调用 acquire 和 release 方法时状态值 state 的增减代表什么含义。
例子:
继承自 AQS 实现的独占锁 ReentrantLock,定义当 status 为 0 时表示锁空闲,为 1 时表示锁己经被占用。
故其在重写 tryAcquire 时,需要使用 CAS 算法查看当前 state 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前锁的持有者为当前线程,而后返回true;如果 CAS 失败则返回 false。
同理,tryAcquireShared 和 tryReleaseShared 也需要由具体的子类来实现。
例子:
继承自 AQS 实现的读写锁 ReentrantReadWriteLock,读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false; 否则使用 CAS 递增 state 的高 16 位。
(在 ReentrantReadWriteLock 中, state 的高 16 位为获取读锁的次数)
基于 AQS 实现的锁除了需要重写上面介绍的方法外,还需要重写 isHeldExclusively 方法,来判断锁是被当前线程独占还是被共享。
问题:带有 Interruptibly关键字的函数和不带该关键字的函数有什么区别?
不带 Intenuptibly 关键字的方法意思是不对中断进行响应。
详解:线程在调用不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,则该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。
带 Interruptibly关键字的方法要对中断进行响应。
详解:线程在调用带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,则该线程抛出 InterruptedException 异常而返回。