这篇文章讨论读写锁stampedLock。文章根据《Java并发编程的艺术》这本书以及黑马的视频 黑马多线程 做的笔记。
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁
long stamp = lock.readLock(); lock.unlockRead(stamp);
加解写锁
long stamp = lock.writeLock(); lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁升级成真正的读锁,保证数据安全。
long stamp = lock.tryOptimisticRead(); // 验戳 if(!lock.validate(stamp)){ // 锁升级 }
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
class DataContainerStamped { //数据 private int data; //StampedLock 锁 private final StampedLock lock = new StampedLock(); public DataContainerStamped(int data) { this.data = data; } //读取操作 public int read(int readTime) { //首先获取stamp long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}", stamp); sleep(readTime); //验证如果是有效的,证明这期间没有写操作,直接返回即可,这时还是乐观锁 if (lock.validate(stamp)) { //就可以读到数据 log.debug("read finish...{}, data:{}", stamp, data); return data; } // 否则证明已经有写锁修改过了,这里需要再次获取读锁,升级为真正的读锁 // 锁升级 - 读锁 log.debug("updating to read lock... {}", stamp); try { //获取stamp stamp = lock.readLock(); log.debug("read lock {}", stamp); sleep(readTime); log.debug("read finish...{}, data:{}", stamp, data); return data; } finally { log.debug("read unlock {}", stamp); lock.unlockRead(stamp); } } public void write(int newData) { //获取戳 long stamp = lock.writeLock(); log.debug("write lock {}", stamp); try { sleep(2); this.data = newData; } finally { log.debug("write unlock {}", stamp); lock.unlockWrite(stamp); } } }
public class TestStampedLock { public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.read(0); }, "t2").start(); } }
输出结果:下面结果中可以看到两个线程同时获取读锁并执行读操作,没有先后的关系。
19:09:07.857 [t1] DEBUG c.DataContainerStamped - optimistic read locking...256 19:09:08.361 [t2] DEBUG c.DataContainerStamped - optimistic read locking...256 19:09:08.362 [t2] DEBUG c.DataContainerStamped - read finish...256, data:1 19:09:08.873 [t1] DEBUG c.DataContainerStamped - read finish...256, data:1
public class TestStampedLock { public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); new Thread(() -> { dataContainer.read(1); }, "t1").start(); sleep(0.5); new Thread(() -> { dataContainer.write(0); }, "t2").start(); } }
结果输出:一开始是读操作先睡眠一秒,在睡眠之前已经获取了戳了,在 t1 线程睡眠期间 t2 线程获取到了写锁,并将数据修改,而且戳也改成了384.此时 t1 线程醒过来校验发现戳已经被修改了,所以这时候 t1 线程会等待 t2 线程释放写锁之后去获取读锁。完成从乐观读 -> 读锁 的升级。
19:10:49.987 [t1] DEBUG c.DataContainerStamped - optimistic read locking...256 19:10:50.485 [t2] DEBUG c.DataContainerStamped - write lock 384 19:10:50.998 [t1] DEBUG c.DataContainerStamped - updating to read lock... 256 19:10:52.498 [t2] DEBUG c.DataContainerStamped - write unlock 384 19:10:52.498 [t1] DEBUG c.DataContainerStamped - read lock 513 19:10:53.508 [t1] DEBUG c.DataContainerStamped - read finish...513, data:0 19:10:53.508 [t1] DEBUG c.DataContainerStamped - read unlock 513
信号量,用来限制能同时访问共享资源的线程上限。它通过协调各个线程,以保证合理的使用公共资源。
@Slf4j public class TestSemaphore { public static void main(String[] args) { // 1. 创建 semaphore 对象 //这里设置上限为3,表示线程只支持三个,达到了3个线程之后这个变量就为0了 //第二个参数是表示公平非公平:其他线程来了如果是公平是不可能竞争的 //如果是非公平是可以和等待队列里面的线程竞争的 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> { try { //3. 获取许可,acquire后semaphore-1变成2 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..."); sleep(1); log.debug("end..."); } finally { semaphore.release(); } }).start(); } } }
结果输出:很明显的是,首先 0,1,2三个线程先执行,然后其他线程暂停,然后线程0,1,2执行完成之后其他线程接着竞争三个名额。
Semaphore 可以用于流量控制,特别是公共资源有限的应用场景,比如数据库连接。加入有一个需求,需要读取几万个文件的数据,这属于 IO 密集型任务,我们可以启动几十个线程去读取文件,但是在读取到本地之后要写入数据库时我们就得控制连接数,否则连接数过多会报错无法获取连接。这时候就可以使用 Semaphore 来控制并发数。
public class TestPoolSemaphore { public static void main(String[] args) { Pool pool = new Pool(2); for (int i = 0; i < 5; i++) { new Thread(() -> { Connection conn = pool.borrow(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(conn); }).start(); } } } class Pool { // 1. 连接池大小 private final int poolSize; // 2. 连接对象数组 private Connection[] connections; // 3. 连接状态数组 0 表示空闲, 1 表示繁忙 private AtomicIntegerArray states; private Semaphore semaphore; // 4. 构造方法初始化 public Pool(int poolSize) { this.poolSize = poolSize; // 让许可数与资源数一致,保证一个线程一个资源 this.semaphore = new Semaphore(poolSize); this.connections = new Connection[poolSize]; this.states = new AtomicIntegerArray(new int[poolSize]); for (int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i+1)); } } // 5. 借连接 public Connection borrow() {// t1, t2, t3 // 获取许可 try { semaphore.acquire(); // 没有许可的线程,在此等待 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < poolSize; i++) { // 获取空闲连接 if(states.get(i) == 0) { if (states.compareAndSet(i, 0, 1)) { log.debug("borrow {}", connections[i]); return connections[i]; } } } // 不会执行到这里,肯定能找到空闲连接的,线程数和连接数一样 return null; } // 6. 归还连接 public void free(Connection conn) { for (int i = 0; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0); log.debug("free {}", conn); //归还许可 semaphore.release(); break; } } } } class MockConnection implements Connection{}
输出结果:可以看到,这里我们设置了大小为2之后,每次获取都是两个两个获取的。其他线程就在等着,下面就以这个为例,讲讲原理
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
1、刚开始,假设permits(state)为 3,这时 5 个线程来获取资源,下面时构造方法的调用链
其实到这里,本质上就是赋值给了 AQS 中的 state
2、假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,这时候就进入 AQS 队列park 阻塞
3、这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 从1再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
加锁
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //tryAcquireShared返回值时剩余的资源数,当满了之后就返回负数 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //tryAcquireShared里面调用了一个这个方法 final int nonfairTryAcquireShared(int acquires) { for (;;) { //获取状态,还剩多少名额 int available = getState(); //减去许可数 3-1 = 2,如果这时候已经为0了,那么就返回负数 int remaining = available - acquires; //设置剩余的许可数 if (remaining < 0 || compareAndSetState(available, remaining)) //返回剩余的数 return remaining; } } //doAcquireSharedInterruptibly已经说过了 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建 Share 节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { //找到前驱节点 final Node p = node.predecessor(); //判断是不是头 if (p == head) { //此时当前线程时老二,尝试再次获取 int r = tryAcquireShared(arg); if (r >= 0) { //获取成功了就设置头节点为空,并且唤醒后面所有的共享节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //然后在这里park住 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
解锁
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } //释放 protected final boolean tryReleaseShared(int releases) { for (;;) { //拿到状态,就是剩余的资源数,此时是0 int current = getState(); //释放了一个当然要加上1了 int next = current + releases; //如果next < 当前的,证明加法溢出了 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS设置 if (compareAndSetState(current, next)) return true; } } private void doReleaseShared() { //for循环 for (;;) { //获取头节点,如果是tail,证明此时没有其他等待节点了 Node h = head; if (h != null && h != tail) { //获取状态 int ws = h.waitStatus; //如果是-1,证明有义务唤醒下一个节点 if (ws == Node.SIGNAL) { //把状态从 -1 改成 0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //唤醒下一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
用来进行线程同步协作,等待所有线程完成倒计时
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数 -1
下面是内部的锁的实现
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //设置状态,就是计数值 Sync(int count) { setState(count); } int getCount() { return getState(); } //这个方法就是看看线程有没有都执行完了,如果是0就表示线程都执行完了 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //释放锁 protected boolean tryReleaseShared(int releases) { //一个线程用完就让计数值-1 for (;;) { //获取状态 int c = getState(); if (c == 0) return false; //-1 int nextc = c-1; //CAS修改 if (compareAndSetState(c, nextc)) return nextc == 0; } } }
@Slf4j(topic = "c.TestCountDownLatch") public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException, ExecutionException { CountDownLatch latch = new CountDownLatch(3); new Thread(()->{ log.debug("线程 t1 开始"); sleep(1); latch.countDown(); log.debug("线程 t1 结束"); }, "t1").start(); new Thread(()->{ log.debug("线程 t2 开始"); sleep(2); latch.countDown(); log.debug("线程 t2 结束"); }, "t2").start(); new Thread(()->{ log.debug("线程 t3 开始"); sleep(5); latch.countDown(); log.debug("线程 t3 结束"); }, "t3").start(); log.debug("主线程等待其他线程执行完成再往下执行"); latch.await(); log.debug("主线程执行完成"); }
输出结果:可以看到下面主线程是等待其他线程执行完成了才继续往下执行的,至于下面的 t3 结束打印在最后是因为 t3 结束先唤醒了主线程
其实后来的线程的使用基本都是使用线程池的,所以线程一般不会轻易结束,这时候用 join 那些 api 就不行了。
我们来看下面的线程池用法:我们使用三个线程来执行任务,然后一个线程等待任务结束返回结果
@Slf4j(topic = "c.TestCountDownLatch") public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException, ExecutionException { CountDownLatch latch = new CountDownLatch(3); ExecutorService service = Executors.newFixedThreadPool(4); service.submit(()->{ log.debug("线程开始"); sleep(1); latch.countDown(); log.debug("线程结束"); }); service.submit(()->{ log.debug("线程开始"); sleep(1.5); latch.countDown(); log.debug("线程结束"); }); service.submit(()->{ log.debug("线程开始"); sleep(2); latch.countDown(); log.debug("线程结束"); }); service.submit(()->{ try { log.debug("线程等待结果"); latch.await(); log.debug("线程获取到结果"); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
输出结果:
我们定义下面这个方法,然后实现10个用户加载的情况,等到加载完成打印一句游戏开始
private static void wangzherongyao() throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(10); Random r = new Random(); String[] all = new String[10]; //10个玩家 for (int j = 0; j < 10; j++) { //final是因为lambda表达式接收的是常量 final int k = j; service.submit(()->{ for (int i = 0; i <= 100; i++) { try { Thread.sleep(r.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } all[k] = i + "%"; //使用 \r 可以回退到最开始的输出位置然后对原来的输出进行覆盖 System.out.print("\r" + Arrays.toString(all)); } //一个任务运行结束了就 -1 latch.countDown(); }); } //主线程等待任务执行完成 latch.await(); System.out.println("\n游戏开始"); }
最终输出结果:
下面是使用restTemplate来发送请求,在工作中的应用可以是当要不断一个接口请求多此的时候,此时可以用到多线程配合CountDown 来进行计数,当读取完成接口之后就可以对这些获取到的数据进行调用
private static void waitForObject() throws Exception { RestTemplate restTemplate = new RestTemplate(); log.debug("等待远程服务调用返回结果"); ExecutorService service = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(4); service.submit(() -> { Map<String, Object> response = restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1); latch.countDown(); }); service.submit(() -> { Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1); latch.countDown(); }); service.submit(() -> { Map<String, Object> response1 = restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2); latch.countDown(); }); service.submit(() -> { Map<String, Object> response3 = restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1); latch.countDown(); }); latch.await(); log.debug("执行完毕"); service.shutdown(); }
如果想要返回结果,那么还是使用 Future 更合使:
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。
为什么要用 CyclicBarrier,这个类作用和 CountdownLatch 类似,但是不同点就在于 CyclicBarrier是支持重用的,使用里面的 reset 方法进行重置。所以 CyclicBarrier可以处理更加复杂的业务,比如在计数错误的时候可以进行重置。此外 CyclicBarrier提供了 getNumberWaiting 方法可以获取阻塞的线程个数。isBroken() 方法可以用来了解阻塞的线程是否被中断
private static void test2() { //创建大小为3的线程池对象 ExecutorService service = Executors.newFixedThreadPool(2); //我们设置计数为2,第二个参数是任务,实际是其他两个任务执行完成之后会执行这个任务 CyclicBarrier barrier = new CyclicBarrier(2, ()-> { log.debug("任务1任务2结束"); }); for (int i = 0; i < 3; i++) { // task1 task2 task1 service.submit(() -> { log.debug("任务1开始运行"); sleep(1); try { //调用 await 方法进行等待 barrier.await(); // 2-1=1 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); service.submit(() -> { log.debug("任务2开始运行"); sleep(2); try { barrier.await(); // 1-1=0 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } service.shutdown(); }
输出结果:
注意线程池的数量和计数值要一样,上面这个例子,如果设置线程为3,那么一次执行的顺序是 第1个任务1、第1个任务2 和 第2个任务1,注意任务二这时睡眠了2s,也就是说三个线程执行的时间刚刚好是2s,和线程数是2的时候的结果是一样的,此时执行完成后计数器重置。这样就不对了,这样就是第一个任务2 和 第二个 任务1 让计数器变为 0.
使用《Java并发编程的艺术》这本书里面给出的例子。这个计数器可以用于多线程计算数据,最终合并计算结果的场景。例如,用一个 Excel 保存了所有的银行流水,每个 Sheet 保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完成之后,得到每个 sheet 的日军银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均 银行流水,代码如下:
public class BankWaterService implements Runnable{ /** * 创建4个屏障,处理完之后执行当前类的 run 方法 */ private CyclicBarrier c = new CyclicBarrier(4, this); /** * 假设只有四个 sheet,那么启动四个线程 */ private Executor executor = Executors.newFixedThreadPool(4); /** * 保存每个sheet处理出来的结果 */ private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>(); private void count(){ for (int i = 0; i < 4; i++) { executor.execute(new Runnable() { @Override public void run() { //计算当前sheet的影流数据,代码略,下面直接模拟结果 sheetBankWaterCount.put(Thread.currentThread().getName(), 1); try { c.await(); } catch (Exception e) { e.printStackTrace(); } } }); } } @Override public void run() { int result = 0; //汇总每个sheet计算出的结果 for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) { result += sheet.getValue(); } //将结果输出 sheetBankWaterCount.put("result", result); System.out.println(result); } public static void main(String[] args) { BankWaterService bankWaterService = new BankWaterService(); bankWaterService.count(); //4 } }
如有错误,欢迎指出!!!