从本节内容开始我们要正式进入并发编程设计模式的学习了,首先讲的是观察者设计模式,我们先从java的设计模式开始,然后再过度到并发编程的观察者模式
关于什么是观察者模式?我的理解是这样的。首先观察者模式有观察者这么个角色,既然有观察者,那么就有被观察者。然后这个模式强调的是观察者两个字。也就是观察者实时观察这被观察者的一举一动。一旦被观察者的状态发生改变,那么所有的观察者就会做出相应的动作。而经过网上相关资料的查找,我对其又有了更深刻的理解:观察者又称为发布-订阅者模式,发布者作为被观察的对象,而订阅者是观察者。
最典型常见的的应用场景就是:微博订阅、关注微信公众号了。一旦发被关注的微博或者微信公众号发布了新消息,那么关注的账号就会看到发布小消息。发布消息其实就是一个通知过程。
发布者,也就是事件源。被观察者首先应该具备观察者的属性,可以是单个,也可以是列表。然后发布者要有状态属性,可以设置或者获取状态。被观察者状态改变就会通知观察者。要想通知观察者,被观察者就得具备通知方法。还有最重要的一点被观察者要具备注册方法,简单来讲就是被订阅方法,这样订阅者才能订阅发布者。具体代码如下
package observer; import java.util.ArrayList; import java.util.List; public class Subject { private Integer state; private List<Observer> observerList = new ArrayList<>(); public Integer getState() { return this.state; } public void setState(Integer state) { if (this.getState().equals(state)) { return; } notifyObServer(state); } public void attach (Observer observer) { this.observerList.add(observer); } private void notifyObServer(Integer state) { this.state = state; observerList.forEach(Observer::update); } }
观察者可以有多个,所以就需要用到接口或者抽象类去实现解耦合。观察者被新建的时候要将自己注册到发布者里(订阅过程:将自己添加到被观察者的观察者列表里),然后观察者被发布者通知之后就要将做出响应,所以观察者要有响应方法。
package observer; public abstract class Observer { protected Subject subject; Observer(Subject subject) { this.subject = subject; this.subject.attach(this); } public abstract void update(); } package observer; public class BinaryObserver extends Observer { public BinaryObserver(Subject subject) { super(subject); } @Override public void update() { System.out.println("binary String: " + Integer.toBinaryString(subject.getState())); } } package observer; public class OctalObserver extends Observer { public OctalObserver(Subject subject) { super(subject); } @Override public void update() { System.out.println("octal String: " + Integer.toOctalString(subject.getState())); } }
package observer; public class client { public static void main(String[] args) { Subject subject = new Subject(); new BinaryObserver(subject); new OctalObserver(subject); System.out.println("======================"); subject.setState(10); System.out.println("======================"); System.out.println("======================"); subject.setState(10); System.out.println("======================"); System.out.println("======================"); subject.setState(12); System.out.println("======================"); } }
我们这里定义实现Runnable的抽象类。跟上面一样,作为被观察者,首先要有观察者的属性,可以是一个或者多个,这里我们简单点,使用一对一的关系。其次也同样要有注册方法,这个跟上面稍微有点不一样,这里选择新建Runnable的时候将观察者注册进来,也就是在构造方法中实现。最后,同样要有通知方法。
package threadobserver; /** * 需求:观察线程的生命周期 * 1、可被观察的Runnable * 2、线程生命周期观察者(接口与实现类) */ public abstract class ObserverableRunnable implements Runnable { /** * 1、定义观察者 * 2、定义通知方法 */ private final LifecycleListener listener; public ObserverableRunnable(final LifecycleListener listener) { this.listener = listener; } public void notifyObserver(final RunnableEvent event) { this.listener.onEvent(event); } enum RunnableState { RUNNING, DONE, ERROR; } class RunnableEvent { private RunnableState state; private Thread thread; private Throwable cause; public RunnableEvent(RunnableState state, Thread thread, Throwable cause) { this.state = state; this.thread = thread; this.cause = cause; } public RunnableState getState() { return state; } public void setState(RunnableState state) { this.state = state; } public Thread getThread() { return thread; } public void setThread(Thread thread) { this.thread = thread; } public Throwable getCause() { return cause; } public void setCause(Throwable cause) { this.cause = cause; } } }
同样,如果要实现被观察者与观察者的一对多关系,就要用到接口或者抽象类实现解耦合,这里使用接口,接口只定义响应方法。然后;创建接口的实现类,实现响应方法。另外观察者还要添加创建并启动线程的方法,然后将自己注册进Runnable中实现监听。而线程要做的就是调用Runnable的通知方法(方法体里是观察者的响应方法)。这样,只要被观察者已调用通知方法,观察者就会做出响应。
package threadobserver; public interface LifecycleListener { void onEvent(ObserverableRunnable.RunnableEvent event); } package threadobserver; import java.util.List; /** * 生命周期观察者 */ public class LifecycleListenerObserver implements LifecycleListener{ /** * 1、实现onEvent方法 * 2、提供创建线程的方法 */ private static final Object LOCK = new Object(); public void concurrentQuery(List<String> ids) { ids.forEach(x -> { new Thread(new ObserverableRunnable(this) { @Override public void run() { try { notifyObserver(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null)); // Thread.sleep(10L); notifyObserver(new RunnableEvent(RunnableState.DONE, Thread.currentThread(), null)); } catch (Throwable e) { notifyObserver(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e)); } } }, x).start(); }); } /** * * @param event */ @Override public void onEvent(ObserverableRunnable.RunnableEvent event) { System.out.println("当前线程【"+ event.getThread().getName() +"】" + "changed state and current state is 【"+ event.getState() +"】"); if (event.getCause() != null) { System.out.println("The runnable [" + event.getThread().getName() + "] process failed."); event.getCause().printStackTrace(); } } }
package threadobserver; import java.util.Arrays; public class client { public static void main(String[] args) { new LifecycleListenerObserver().concurrentQuery(Arrays.asList("1", "2", "3", "4")); } }