本文章首发自本人公众号:壹枝花算不算浪漫,如若转载请标明来源!
感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫
之前出过一个设计模式的系列文章,这些文章和其他讲设计模式的文章 有些不同
文章没有拘泥于讲解设计模式的原理,更多的是梳理工作中实际用到的一些设计模式,并提取出对应业务模型进行总结,回顾下之前的一些文章:(PS:之前的文章都在其他平台发布的,以后也会一起发布到掘金)
【一起学设计模式】策略模式实战一:基于消息发送的策略模式实战
【一起学习设计模式】策略模式实战二:配合注解 干掉业务代码中冗余的if else...
【一起学设计模式】访问者模式实战:权限管理树删除节点操作
【一起学设计模式】命令模式+模板方法+工厂方法实战: 如何优雅的更新商品库存...
【一起学设计模式】状态模式+装饰器模式+简单工厂模式实战:(一)提交个订单我到底经历了什么鬼?
【一起学设计模式】中介者模式+观察者模式+备忘录模式实战:(二)提交个订单我到底经历了什么鬼?
所以:任何脱离实际业务的设计模式都是耍流氓
最近项目在对接神策埋点相关需求。 有一个场景是:产品自定义了很多埋点事件,有些事件需要后端进行一定的业务处理,然后进行埋点。
业务其实很简单,就是前端请求到后端,后端进行一定业务处理组装后将数据发送到神策后台。
说到这里是不是还有小伙伴没听懂??那么就画张图吧:
这里只是简单的举个栗子,说明下业务场景。
针对这个业务场景,最开始的想法是尽量少的侵入原有业务方法,所以这里选择使用观察者模式。
原有业务场景中加入发布事件的能力,然后订阅者自己消费进行埋点数据逻辑。做到尽可能的业务解耦。
这里还是要多啰嗦几句,说下观察者模式原理:
所谓的观察者模式也称为发布订阅模式,这里肯定至少存在两种角色:发布者/订阅者
接着看下UML图:
所涉及到的角色如下:
在上述类图中,ConcreteSubject中有一个存储Observer的列表,这意味着ConcreteSubject并不需要知道引用了哪些ConcreteObserver,只要实现(继承)了Observer的对象都可以存到该列表中。在需要的时候调用Observer的update方法。
话不多说,我们自己动手来模拟一个简单的观察者模式:
/** * 观察者模式测试代码 * * @author wangmeng * @date 2020/4/25 19:38 */ public class ObserverTest { public static void main(String[] args) { Subject subject = new Subject(); Task1 task1 = new Task1(); subject.addObserver(task1); Task2 task2 = new Task2(); subject.addObserver(task2); subject.notifyObserver("xxxx"); } } class Subject { // observer集合 private List<Observer> observerList = Lists.newArrayList(); // add public void addObserver(Observer observer) { observerList.add(observer); } // remove public void removeObserver(Observer observer) { observerList.remove(observer); } // 通知观察者 public void notifyObserver(Object object) { for (Observer item : observerList) { item.update(object); } } } interface Observer { void update(Object object); } class Task1 implements Observer { @Override public void update(Object object) { System.out.println("task1 received: " + object); } } class Task2 implements Observer { @Override public void update(Object object) { System.out.println("task2 received: " + object); } } 复制代码
针对于观察者模式,JDK和Spring也有一些内置实现,具体可以参见:JDK中Observable
,Spring中ApplicationListener
这里就不再赘述了,想深入了解的小伙伴可执行谷歌,毕竟我们这次文章的重点还是Guava
中观察者模式的使用实现原理。
这里使用的是Guava中自带的EventBus组件,我们继续用取消订单业务场景做示例,这里抽离了部分代码,只展示核心的一些代码:
/** * 事件总线服务 * * @author wangmeng * @date 2020/4/14 */ @Service public class EventBusService { /** * 订阅者异步执行器,如果同步可以使用EventBus **/ @Autowired private AsyncEventBus asyncEventBus; /** * 订阅者集合,里面方法通过@Subscribe进行事件订阅 **/ @Autowired private EventListener eventListener; /** * 注册方法,启动的时候将所有的订阅者进行注册 **/ @PostConstruct public void register() { asyncEventBus.register(eventListener); } /** * 消息投递,根据入参自动投递到对应的方法中去消费。 */ public void post(Object object) { asyncEventBus.post(object); } } 复制代码
这里使用了异步的实现方式,如果使用同步的方式可以将AsyncEventBus
改为EventBus
/** * AsyncEventBus 线程池配置 * * @author wangmeng * @date 2020/04/14 */ @Configuration public class EventBusConfiguration { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 10; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 30; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 500; @Bean public AsyncEventBus asyncEventBus() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("jv-mall-user-sensorsData:"); executor.initialize(); return new AsyncEventBus(executor); } } 复制代码
线程池数据大家可以随意配置,这里只是参考。
/** * 观察者代码 * * @author wangmeng * @date 2020/4/14 */ @Service @Slf4j public class EventListener { @Autowired private SensorsDataManager sensorsDataManager; /** * 观察者处理数据埋点方法 */ @Subscribe @AllowConcurrentEvents public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) { Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO); propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId()); // 各种属性赋值,这里只截取一点 propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess()); propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason()); sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap); } } 复制代码
这个EventLister
是我们在上面EventBusService
中注册的类,观察者方法上面添加@Subscribe
即可对发布者的数据进行订阅。
@AllowConcurrentEvents
注解字面意思是允许事件并发执行,这个原理后面会讲。
PS:这里sensorsDataManager
是封装生成埋点相关的类。
在业务逻辑中加入埋点数据发布的方法:
@Autowired private EventBusService eventBusService; public void cancelOrder(Long orderId) { // 业务逻辑执行 // 埋点数据 TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class); trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason()); trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime()); trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP); trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString()); eventBusService.post(trackCancelOrderDTO); } 复制代码
到了这里所有的如何使用EventBus
的代码都已经贴出来了,下面就看看具体的源码实现吧
com.google.common.eventbus.SubscriberRegistry#register:
void register(Object listener) { //查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { //获取事件类型 Class<?> eventType = entry.getKey(); //获取这个事件类型的订阅者集合 Collection<Subscriber> eventMethodsInListener = entry.getValue(); //从缓存中按事件类型查找订阅者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { //从缓存中取不到,更新缓存 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } } 复制代码
事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的:
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); 复制代码
到这里,订阅者已经准备好了,准备接受事件了。通过debug 看下subscribers
中数据:
com.google.common.eventbus.EventBus#post
public void post(Object event) { //获取事件的订阅者集合 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { //转发事件 dispatcher.dispatch(event, eventSubscribers); //如果不是死亡事件,重新包装成死亡事件重新发布 } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } } Iterator<Subscriber> getSubscribers(Object event) { //获取事件类型类的超类集合 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) { //获取事件类型的订阅者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); } 复制代码
事件转发器有三种实现:
第一种是立即转发,实时性比较高,其他两种都是队列实现。
我们使用的是AsyncEventBus
,其中指定的事件转发器是:LegacyAsyncDispatcher
,接着看看其中的dispatch()
方法的实现:
com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher
private static final class LegacyAsyncDispatcher extends Dispatcher { private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { // 先将所有发布的事件放入队列中 queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { // 消费队列中的消息 e.subscriber.dispatchEvent(e.event); } } } 复制代码
接着看subscriber.dispatchEvent()
方法实现:
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } 复制代码
执行订阅方法都是异步实现,我们在上面初始化AsyncEventBus
的时候有为其构造线程池,就是在这里使用的。
在看invokeSubscriberMethod()
具体代码之前,我们先来看看@AllowConcurrentEvents
,我们在订阅方法上有加这个注解,来看看这个注解的作用吧:
在我们执行register()
方法的时候,会为每一个订阅者构造一个Subscriber
对象,如果配置了@AllowConcurrentEvents
注解,就会为它配置一个允许并发的Subscriber
对象。
class Subscriber { /** * Creates a {@code Subscriber} for {@code method} on {@code listener}. */ static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); } private static boolean isDeclaredThreadSafe(Method method) { // 如果有AllowConcurrentEvents注解,则返回true return method.getAnnotation(AllowConcurrentEvents.class) != null; } @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { // 通过反射直接执行订阅者中方法 method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } } @VisibleForTesting static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } @Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { // SynchronizedSubscriber不支持并发,这里用synchronized修饰,所有执行都串行化执行 synchronized (this) { super.invokeSubscriberMethod(event); } } } } 复制代码
这里面包含了invokeSubscriberMethod()
方法的实现原理,其实就是通过反射去执行订阅者中的方法。
还有就是如果没有添加注解,就会走SynchronizedSubscriber
中invokeSubscriberMethod()
逻辑,添加了synchronized
关键字,不支持并发执行。
这里主要是整理了guava 中实现观察者模式的使用及原理。
大家如果有类似的业务场景也可以使用到自己项目中。