这篇是channel和外部消息中间件的绑定.
在这我们发现有一个 BindingServiceConfiguration 的自动装载的类.
主要绑定是在 outputBindingLifecycle 和 inputBindingLifecycle
发现这两个类继承了SmartLifecycle
废话不多说, 多说都是废话
直接看start方法
public void start() { ... this.bindables.values().forEach(this::doStartWithBindable); ... }
直接就盯着 doStartWithBindable看!!!
发现这个方法是实现类实现的.
我们以input为例子
看到InputBindingLifecycle#doStartWithBindable
void doStartWithBindable(Bindable bindable) { Collection<Binding<Object>> bindableBindings = bindable.createAndBindInputs(this.bindingService); }
有一个createAndBindInputs, 直接定位到了 AbstractBindableProxyFactory#createAndBindInputs这个方法(具体技巧大家要不自己悟一下吧).
发现里面有非常重要的方法调用 bindingService.bindConsumer
我们就是在找怎么绑定的.就是你了!
public <T> Collection<Binding<T>> bindConsumer(T input, String inputName) { ... Binder<T, ConsumerProperties, ?> binder = this.getBinder(inputName, inputClass); ... this.doBindConsumer(binder); ... } // 伪代码 public <T> Binding<T> doBindConsumer(Binder<T, ConsumerProperties, ?> binder) { binder.bindConsumer(); }
啊吧啊吧一堆代码, 终于在其中发现关键的两个方法 getBinder 和 doBindConsumer
会发现这个调用是DefaultBinderFactory#getBinder
他在里面调用了this.context.getBeansOfType(Binder.class)
取出了一个Binder.class类型的bean
记住这个Binder类型.
看doBindConsumer实现主线就是调用的binder的bindConsumer方法.
接下来我们看到rocketmq的注册.
要新版的rocketmq才有下述代码
我们找到spring.binders
发现RocketMQBinderAutoConfiguration里注册了一个Bean: RocketMQMessageChannelBinder
ChannelBinder 又有Channel 又有Binder 就是他了呀
发现他继承的是AbstractBindingLifecycle, 我们看看继承图, 有Binder.
![AbstractBindingLifecycle](
我们上面是不是get了一个Binder的Bean, 芜湖, 就是他了!
我们调用的bindConsumer就是调用他的bindConsumer呀.
我们看看他的实现是调用的doBindConsumer
public final Binding<T> bindConsumer(String name, String group, T target, C properties) { return this.doBindConsumer(name, group, target, properties); } protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);
我们看看这个doBindConsumer
public final Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, final C properties) throws BinderException { ... consumerEndpoint = this.createConsumerEndpoint(destination, group, properties); consumerEndpoint.setOutputChannel(inputChannel); ... }
发现主要调用的是createConsumerEndpoint
还有把inputChannel设置进了OutputChannel中.
那我们来看看createConsumerEndpoint做了什么.
//伪代码 protected MessageProducer createConsumerEndpoint() { RocketMQInboundChannelAdapter inboundChannelAdapter = new RocketMQInboundChannelAdapter(destination.getName(), extendedConsumerProperties); }
发现他主要是new了RocketMQInboundChannelAdapter这个类然后对属性进行设置.
我们发现RocketMQInboundChannelAdapter有一个onInit的方法, 他会被IntegrationObjectSupport#afterPropertiesSet调用, 具体说明省略, 有兴趣的同学可以自己挖掘一下吧~~~
protected void onInit() { this.pushConsumer = RocketMQConsumerFactory.initPushConsumer(this.extendedConsumerProperties); this.pushConsumer.registerMessageListener((msgs, context) -> { return (ConsumeOrderlyStatus)this.consumeMessage(msgs, () -> { ... }, () -> { return ConsumeOrderlyStatus.SUCCESS; }); }); }
初始化了一个Consumer并把rocketmq的监听注册了进去. 写过rocketmq的是不是很熟悉, 这就是原本的写法.
那这个监听做了啥????
private <R> R consumeMessage(List<MessageExt> messageExtList, Supplier<R> failSupplier, Supplier<R> sucSupplier) { this.sendMessage(message); }
sendMessage??? 发送了出去...!!!!
这时候我们不禁化身柯南, 我们之前把channel和function连接在了一起, 这里是不是直接通过channel发送到了我们的function里!~真相???
离答案原来越近了, 万恶剧透..确实是这样!
我们来看看sendMessage做了什么
protected void sendMessage(Message<?> messageArg) { this.messagingTemplate.send(this.getRequiredOutputChannel(), message); }
getRequiredOutputChannel看到这个方法是不是想到上面有个方法往adapter里设置了一个channel啊.
想到(三)中channel在function初始化的时候是不是已经和function进行了绑定了, 那外部消息中间件, channel, function???? 芜湖, 真相越来越近了. 三者的联系. 明朗了!!!
那我们就可以猜测流程, 外部消息中间件接收到了消息, 通过channel将信息转发到了function中进行处理.
那我们继续看来验证一下我们的猜测!~
看到messagingTemplate.send这个方法
我们打个断点能发现执行到了GenericMessagingTemplate#doSend这个方法, 发现他最核心的就是调用了channel.send
看过第三章的应该知道这个channel就是DirectWithAttributesChannel, 所以我们看看里面的send方法.
发现在父类AbstractMessageChannel调用了dosend, doSend的实现又在他的子类AbstractSubscribableChannel中.
protected boolean doSend(Message<?> message, long timeout) { try { return this.getRequiredDispatcher().dispatch(message); } catch (MessageDispatchingException var6) { ... } }
getRequiredDispatcher(), Dispatcher是不是很熟悉, 在(三)中我们往其中handler加入了messageHandler.
private boolean doDispatch(Message<?> message) { ... Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message); ... handler.handleMessage(message); ... } // 返回一个handler Iterator private Iterator<MessageHandler> getHandlerIterator(Message<?> message) { Set<MessageHandler> handlers = this.getHandlers(); return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator(); }
我们再深入看代码会发现其实就是拿出messageHandler对消息进行了处理, messageHandler其实就是框架对function进行的再次封装实际就是调用了我们function...至此, 我们的猜想是不是验证成立了!!!~
芜湖~ 终于完结, 把所有流程都串起来了!!!
调用流程图传送门
至此, channel和外部消息中间件的绑定完成, 其实就是把channel传入对应的binder中, 等消息到来的时候用对应的channel进行消息的转发.