锁和协作类(信号量)有共同点:类似一个闸门(只允许部分线程通过),因为它们底层都用一个共同的基类AQS
因为上面的那些协作类,它们有很多工作类似,所以可以提取出一个工具类,就可以直接用,对于ReentrantLock和Semaphore而言就可以屏蔽很多细节,只关注它们自己的业务逻辑就可以。
AQS是一个用于构建锁、同步器、协作工具类的工具类(框架),有了AQS以后,更多的协作工具类都可以很方便的写出来。
Semaphore内部有一个Sync类,Sync类继承了AQS
类似面试中的群面和单面
安排就坐、叫号、先来后到等HR的工作就是AQS所做的,面试官(并发类)不关心两个面试者是不是号码会冲突,也不用管面试者需要一个地方坐关等待,这些都交给HR(AQS去做)
Semaphore:一个人面试完后,后一个人才能进来继续面试(许可证)
CountDownLatch:群面,需要等待10个人到齐一起面试。
Semaphore、CountDownLatch这些同步工具类,要做的就是写下自己”要人”的规则,比如”出一个,再进一个”或”凑齐10人一起面试”。
剩下的招呼面试者的活都由AQS来做。
比如在Semaphore里,它表示”剩余的许可证数量”
在CountDownLatch里,它表示”还需要倒数的数量”。
在ReentrantLock中,state表示锁的占有情况,包括可重入计数。
state是volatile修饰,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState、compareAndSetState操作来读取和更新这个状态,这些方法都依赖于juc.atomic包支持。
这个队列用来存放”等待的线程”,AQS就是”排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚释放的锁。
AQS会维护一个等待的线程队列,把线程都放到这个双向链表队列中。
head是拿到锁的线程,后面是等待拿锁的线程。
这里获取和释放方法是利用AQS协作工具类里最重要的方法,是由协作类自己去实现,并且含义各不相同。
获取操作依赖state变量,经常会阻塞(获取不到锁时),如在Semaphore中,获取就是acquire方法,作用是获取一个许可证,如可state变量>0,则可以获取,获取到后值会减1
释放操作不会阻塞,如在Semaphore中,释放就是release方法,作用是释放一个许可证,使state变量加1
第一步:写一个类,想好协作逻辑,实现获取和释放方法
第二步:内部写一个Sync类继承AbstractQueuedSynchronized
第三步:根据是否独占或共享来重写tryAcquire/tryRelease或tryAcquireShared(int acquires)和tryReleaseShared(int release)等方法,在之前写的获取/释放方法中调用AQS的acquire/release或Shared方法。
aqs/OneShotLatch.java
package aqs; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * 描述:自己用AQS实现一个简单的线程协作器 */ public class OneShotLatch { private final Sync sync = new Sync(); //获取锁,利用AQS提供的 public void await() { sync.acquireShared(0); } //释放锁 public void signal() { sync.releaseShared(0); } private class Sync extends AbstractQueuedSynchronizer { @Override protected int tryAcquireShared(int arg) { //state=1放行 return (getState() == 1) ? 1 : -1; } @Override protected boolean tryReleaseShared(int arg) { setState(1); //唤醒所有其它等待的线程 return true; } } public static void main(String[] args) throws InterruptedException { OneShotLatch oneShotLatch = new OneShotLatch(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待"); oneShotLatch.await(); System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行"); } }).start(); } //5秒后放闸 Thread.sleep(5000); //唤醒等待线程。 oneShotLatch.signal(); } }