Java教程

spring cloud stream 3.1.2 源码搭配rocketmq学习 (四)

本文主要是介绍spring cloud stream 3.1.2 源码搭配rocketmq学习 (四),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前言

这篇是channel和外部消息中间件的绑定.


spring-cloud-stream的spring.factories

在这我们发现有一个 BindingServiceConfiguration 的自动装载的类.

主要绑定是在 outputBindingLifecycleinputBindingLifecycle

发现这两个类继承了SmartLifecycle

废话不多说, 多说都是废话

直接看start方法

public void start() {
    ...
    this.bindables.values().forEach(this::doStartWithBindable);
    ...

}

直接就盯着 doStartWithBindable看!!!

发现这个方法是实现类实现的.

我们以input为例子

看到InputBindingLifecycle#doStartWithBindable

void doStartWithBindable(Bindable bindable) {
    Collection<Binding<Object>> bindableBindings = bindable.createAndBindInputs(this.bindingService);
}

有一个createAndBindInputs, 直接定位到了 AbstractBindableProxyFactory#createAndBindInputs这个方法(具体技巧大家要不自己悟一下吧).

发现里面有非常重要的方法调用 bindingService.bindConsumer

我们就是在找怎么绑定的.就是你了!

public <T> Collection<Binding<T>> bindConsumer(T input, String inputName) {
    ...
    Binder<T, ConsumerProperties, ?> binder = this.getBinder(inputName, inputClass);
    ...
    this.doBindConsumer(binder);
    ...
}

// 伪代码
public <T> Binding<T> doBindConsumer(Binder<T, ConsumerProperties, ?> binder) {
    binder.bindConsumer();
}

啊吧啊吧一堆代码, 终于在其中发现关键的两个方法 getBinderdoBindConsumer

会发现这个调用是DefaultBinderFactory#getBinder

他在里面调用了this.context.getBeansOfType(Binder.class)

取出了一个Binder.class类型的bean

记住这个Binder类型.

看doBindConsumer实现主线就是调用的binder的bindConsumer方法.

接下来我们看到rocketmq的注册.


rocketmq注册

要新版的rocketmq才有下述代码

我们找到spring.binders

发现RocketMQBinderAutoConfiguration里注册了一个Bean: RocketMQMessageChannelBinder

ChannelBinder 又有Channel 又有Binder 就是他了呀

发现他继承的是AbstractBindingLifecycle, 我们看看继承图, 有Binder.

![AbstractBindingLifecycle](AbstractMessageChannelBinder继承关系.png

我们上面是不是get了一个Binder的Bean, 芜湖, 就是他了!

我们调用的bindConsumer就是调用他的bindConsumer呀.

我们看看他的实现是调用的doBindConsumer

public final Binding<T> bindConsumer(String name, String group, T target, C properties) {
    return this.doBindConsumer(name, group, target, properties);
}

protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);

我们看看这个doBindConsumer

public final Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, final C properties) throws BinderException {
    ...
    consumerEndpoint = this.createConsumerEndpoint(destination, group, properties);
    consumerEndpoint.setOutputChannel(inputChannel);
    ...
}

发现主要调用的是createConsumerEndpoint

还有把inputChannel设置进了OutputChannel中.

那我们来看看createConsumerEndpoint做了什么.

//伪代码
protected MessageProducer createConsumerEndpoint() {
    RocketMQInboundChannelAdapter inboundChannelAdapter = new RocketMQInboundChannelAdapter(destination.getName(), extendedConsumerProperties);
}

发现他主要是new了RocketMQInboundChannelAdapter这个类然后对属性进行设置.

我们发现RocketMQInboundChannelAdapter有一个onInit的方法, 他会被IntegrationObjectSupport#afterPropertiesSet调用, 具体说明省略, 有兴趣的同学可以自己挖掘一下吧~~~

protected void onInit() {
    this.pushConsumer = RocketMQConsumerFactory.initPushConsumer(this.extendedConsumerProperties);
    
    this.pushConsumer.registerMessageListener((msgs, context) -> {
        return (ConsumeOrderlyStatus)this.consumeMessage(msgs, () -> {
           ...
        }, () -> {
            return ConsumeOrderlyStatus.SUCCESS;
        });
    });
}

初始化了一个Consumer并把rocketmq的监听注册了进去. 写过rocketmq的是不是很熟悉, 这就是原本的写法.

那这个监听做了啥????

private <R> R consumeMessage(List<MessageExt> messageExtList, Supplier<R> failSupplier, Supplier<R> sucSupplier) {
    this.sendMessage(message);
}

sendMessage??? 发送了出去...!!!!

这时候我们不禁化身柯南, 我们之前把channel和function连接在了一起, 这里是不是直接通过channel发送到了我们的function里!~真相???

离答案原来越近了, 万恶剧透..确实是这样!

我们来看看sendMessage做了什么

protected void sendMessage(Message<?> messageArg) {
   this.messagingTemplate.send(this.getRequiredOutputChannel(), message);
}

getRequiredOutputChannel看到这个方法是不是想到上面有个方法往adapter里设置了一个channel啊.

想到(三)中channel在function初始化的时候是不是已经和function进行了绑定了, 那外部消息中间件, channel, function???? 芜湖, 真相越来越近了. 三者的联系. 明朗了!!!

那我们就可以猜测流程, 外部消息中间件接收到了消息, 通过channel将信息转发到了function中进行处理.

那我们继续看来验证一下我们的猜测!~

看到messagingTemplate.send这个方法

我们打个断点能发现执行到了GenericMessagingTemplate#doSend这个方法, 发现他最核心的就是调用了channel.send

看过第三章的应该知道这个channel就是DirectWithAttributesChannel, 所以我们看看里面的send方法.

发现在父类AbstractMessageChannel调用了dosend, doSend的实现又在他的子类AbstractSubscribableChannel中.

protected boolean doSend(Message<?> message, long timeout) {
    try {
        return this.getRequiredDispatcher().dispatch(message);
    } catch (MessageDispatchingException var6) {
        ...
    }
}

getRequiredDispatcher(), Dispatcher是不是很熟悉, 在(三)中我们往其中handler加入了messageHandler.

private boolean doDispatch(Message<?> message) {
    ...
    Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
    ...
    handler.handleMessage(message);
    ...
}

// 返回一个handler Iterator
private  Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
    Set<MessageHandler> handlers = this.getHandlers();
    return this.loadBalancingStrategy != null ? this.loadBalancingStrategy.getHandlerIterator(message, handlers) : handlers.iterator();
}

我们再深入看代码会发现其实就是拿出messageHandler对消息进行了处理, messageHandler其实就是框架对function进行的再次封装实际就是调用了我们function...至此, 我们的猜想是不是验证成立了!!!~
芜湖~ 终于完结, 把所有流程都串起来了!!!

调用流程图传送门


总结

  1. inputBindingLifecycle/outputBindingLifecycle的Bean进行初始化, 调用getBinder获取到了外部消息中间件的Bean
  2. 调用binder的bindConsumer进行绑定, 并传入了对应的channel.
  3. 外部消息中间件调用createConsumerEndpoint生成一个adapter, 根据配置生成初始化配置, 并把channel设置进adapter中.
  4. adapter初始化被调用, 把消息处理注册进pushConsumer中, 等待消息的触发.
  5. 消息到来触发注册的函数, 调用sendMessage方法.
  6. 使用上述设置的channel进行发送, 会执行在(三)中注册的messageHandler, 调用对应的function.

至此, channel和外部消息中间件的绑定完成, 其实就是把channel传入对应的binder中, 等消息到来的时候用对应的channel进行消息的转发.

Wish. Do.

这篇关于spring cloud stream 3.1.2 源码搭配rocketmq学习 (四)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!