(1)使用EventBus作为实现类,其构造方法有:
public EventBus() { this("default"); } public EventBus(String identifier) { this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE); } public EventBus(SubscriberExceptionHandler exceptionHandler) { this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler); }
(2)使用AsyncEventBus作为实现类,其构造方法为:
public AsyncEventBus(String identifier, Executor executor) { super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); } public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); } public AsyncEventBus(Executor executor) { super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); }
统一调用的构造方法为:
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.subscribers = new SubscriberRegistry(this); this.identifier = (String)Preconditions.checkNotNull(identifier); this.executor = (Executor)Preconditions.checkNotNull(executor); this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher); this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler); }
参数的意义分别是:
identifier:类似当前EventBus对象的别名,可以描述该EventBus的用途,默认为“default”
executor:使用异步执行时传入的自定义线程池
dispatcher:指定分发消息的模式
exceptionHandler:处理订阅消息异常的方法
subscribers:注册订阅者的类
(1)EventBus实现:
(2)AsyncEventBus实现:
(3)Dispatcher调度器:
eventbus包下的Dispatcher类提供了三种类型的调度器,分别为:
(4)Executor执行器
使用this.subscribers对象的register方法注册,此处的subscribers对象为SubscriberRegistry类。
使用Muitmap集合存储一个对象下有哪些方法订阅了,具体实现findAllSubscribers方法,该方法内部如下:
(1)该方法内部使用getAnnotatedMethods方法获取clazz及其多级父类以及实现的接口中所有方法上有@Subscribe注解的方法。方法具体实现如下:
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { //获取到传递的class对象的类以及父类以及实现的接口 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); //创建一个Map集合 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); //遍历得到的class对象 for (Class<?> supertype : supertypes) { //获取class对象的所有方法 for (Method method : supertype.getDeclaredMethods()) { //如果方法上有Subscribe注解,并且isSynthetic表示方法不是由java编译器生成的 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // 获取该方法的参数类型 Class<?>[] parameterTypes = method.getParameterTypes(); //检查方法的参数只能是1个 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); //根据方法创建MethodIdentifier对象,其中包含方法名、方法的参数类型 MethodIdentifier ident = new MethodIdentifier(method); //如果map集合中不包含该对象,就将ident和method对象存储到identifiers的map集合中 if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } //返回map集合中的方法 return ImmutableList.copyOf(identifiers.values()); }
(2)遍历该对象中添加了@Subscribe注解的方法集合
(3)获取该方法的参数类型,将第0个参数类型赋值给eventType
再获取eventType时,会查找监听对象的父类以及接口,查看有没有订阅方法。
(4)将参数类型作为key,订阅者作为value,存储到Multimap集合中。value中包含被监听的bus,被监听的bus中的执行器,监听该bus的对象listener,以及监听参数类型的方法。
会去检查订阅方法上有没有注解AllowConcurrentEvents,如果有该注解,在使用create方法创建订阅者对象时,订阅者对象使用Subcriber,如果没有注解使用SynchronizedSubscriber对象。这两种对象在真正分发事件时区别才会体现出来。
示例如下:说明objA中的三个方法都有@Subcribe注解
对象继承情况:
对象实现接口,接口内使用JDK8提供的default来实现方法,并添加注解,也可以订阅。
(5)获取到该对象的所偶遇订阅者后,返回Multimap集合
(1)遍历得到的Multimap集合
全局变量:
当前EventBus对象中的subscribers是SubscriberRegistry类的对象,执行该类中的unregister方法
使用findAllSubscribers方法,与注册对象中的方法相同,都是查找该类以及其多级父类中的订阅类型和方法。
(1)获取到集合中的key,也就是该对象中订阅的类型(方法上的参数类型)
(2)获取到集合中key对应的value,也就是该对象中的订阅方法。
(3)获取到全局变量subscribers中缓存的数据,赋值给currentSubscribers
(4)如果currentSubscribers为null,则抛出异常,如果移除该对象中的所有订阅者返回结果为false,也抛出异常,如果为true,则正常移除。
通过subscribers(SubscriberRegistry类)中的全局变量获取订阅该事件以及该事件父类和接口的所有方法。
获取传递的event事件的类、父类以及实现的接口。
例如:如果发送的是MsgA这个消息,那么就会找到Msg类,获取到的集合中就包含了MsgA类型和Msg类型,所有订阅了这两个类型的都会接收到该消息。接口也是一样的(MsgA和MsgB都实现了Msg接口)。
下图中就描述了发送的消息在实现了接口的情况。
如图中所示,MsgA和MsgB实现了Msg接口,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。
下图中表示发送的消息有父类的情况:
如图中所示,MsgA和MsgB继承了Msg类,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。如果发送的是Msg类,那就只有订阅了Msg类的方法可以接收到。
总结:发送消息的时候,会查找该消息的父类以及接口,查看有没有订阅的方法,如果有就会都发送一次。
根据构造方法调度器使用了两种类型,分别是:
下图为SynchronizedSubscriber中的方法,在原有基础上,使用synchronized锁住该对象,然后去执行。
两者的区别就是:如果添加了注解,那就直接使用Subscriber中的方法,如果没有添加注解,则使用加锁的方法去执行。
在缓存中找不到订阅者并且它本身不是一个DeadEvent事件时,就会发送一个DeadEvent。如果找不到DeadEvent事件的订阅者,就会不进行处理。
(1)两种创建方式对比
类型 | EventBus | AsyncEventBus |
---|---|---|
identifier | 默认“default” | 默认“default” |
executor | DirectExecutor | 自定义 |
dispatcher | PerThreadQueuedDispatcher | LegacyAsyncDispatcher |
subscribers | SubscriberRegistry | SubscriberRegistry |
exceptionHandler | 默认LoggingHandler | 默认LoggingHandler |
两种方式的相同点:
两种方式的区别:
(2)方法执行流程