即 Guarded Suspension
,用在一个线程等待另一个线程的执行结果
要点
资源类
public class GuardedObject { private Object response; /** * 获取结果 */ public Object get(){ synchronized (this){ while (response == null) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } /** * 产生结果 */ public void complete(Object response){ synchronized (this) { this.response = response; this.notifyAll(); } } }
测试
public static void main(String[] args) { GuardedObject obj = new GuardedObject(); new Thread(() -> { log.debug("等待结果。。。"); List<String> download = (List<String>) obj.get(); log.debug("结果大小:" + download.size()); }, "t1").start(); new Thread(() -> { log.debug("执行下载。。。"); try { List<String> download = Downloader.download(); obj.complete(download); } catch (IOException e) { e.printStackTrace(); } }, "t2").start(); }
优点:
V1版本 如果生产资源的速度非常慢 获取资源时就必须一直等待。V2版本给get方法加上超时时间
public Object get(long timeout){ synchronized (this){ //开始时间 long start = System.currentTimeMillis(); //经历的时间 long passedTime = 0; while (response == null) { //经历的时间超过了最大时间 退出循环 if(passedTime >= timeout) { log.debug("超时{}", timeout); break; } try { // 避免虚假唤醒导致 timeout重新等待 this.wait(timeout - passedTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - start; } return response; } }
测试:
public static void main(String[] args) { GuardedObjectV2 obj = new GuardedObjectV2(); new Thread(() -> { log.debug("begin..."); Object response = obj.get(2000); log.debug("结果是:" + response); }, "t1").start(); new Thread(() -> { try { log.debug("begin..."); // TimeUnit.SECONDS.sleep(1); // 测试正常获取 // obj.complete(new Object()); // TimeUnit.SECONDS.sleep(3); // 测试超时 // obj.complete(new Object()); TimeUnit.SECONDS.sleep(1); // 测试虚假唤醒 obj.complete(null); } catch (InterruptedException e) { log.error("被打断"); } }, "t2").start(); }
查看join源码 :
轮询检查线程 alive 状态,join 体现的正是【保护性暂停】
模式,
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
资源类加上唯一标识
//用来唯一标记 GuardedObject private int id; public int getId() { return id; } public GuardedObjectV3(int id) { this.id = id; }
邮箱类
class MailBoxes{ private static Map<Integer, GuardedObjectV3> boxes = new Hashtable<>(); private static int id = 1; //产生唯一id private static synchronized int generateId(){ return id++; } public static GuardedObjectV3 createGuardedObjectV3(){ GuardedObjectV3 objectV3 = new GuardedObjectV3(generateId()); boxes.put(objectV3.getId(), objectV3); return objectV3; } public static GuardedObjectV3 getGuardedObject(int id){ //需要清除不需要的信件 防止内存溢出 return boxes.remove(id); } //Hashtable是线程安全的 不需要自己加上锁 public static Set<Integer> getIds(){ return boxes.keySet(); } }
邮件员
class Postman extends Thread{ private int id; private String mail; public Postman(int id, String mail){ this.id = id; this.mail = mail; } @Override public void run() { GuardedObjectV3 obj = MailBoxes.getGuardedObject(id); log.debug("开始送信:id = {} 内容 = {} ", obj.getId(), mail); obj.complete(mail); } }
居民
class People extends Thread{ @Override public void run() { GuardedObjectV3 obj = MailBoxes.createGuardedObjectV3(); log.debug("开始收信:id = {}", obj.getId()); Object mail = obj.get(5000); log.debug("收到信: id = {}, 内容:{}", obj.getId(), mail); } }
测试类
public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } TimeUnit.SECONDS.sleep(2); //同步模式缺点: 必须一个生产者对应一个消费者 for (int id : MailBoxes.getIds()) { new Postman(id, "内容 " + id).start(); } }