Java教程

spring cloud stream 3.1.2 源码搭配rocketmq学习 (三)

本文主要是介绍spring cloud stream 3.1.2 源码搭配rocketmq学习 (三),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

(二)中介绍了函数的注册, 这篇介绍一下函数的初始化

这文章涉及到了大量响应式编程的方式, reactor 需要补一下

前言

这个 functionInitializer 其实是 channelfunction bean的绑定

响应式的doOn

同步钩子方法,在subscriber触发一系列事件的时候触发

先来熟悉一下doOn系列的方法. 这个方法在subscriber的时候如果没触发对应的钩子, 是不会执行的.

doOn资料传送门

热身

@Bean
public Function<Flux<Message<String>>, Mono<Void>> demo() {
    return flux -> flux.map(message -> {
        System.out.println("接收到了: " + message);
        return message;
    }).then();
}

@Component
static class DemoRunner implements CommandLineRunner {
    @Autowired
    Wrapper wrapper;

    @Override
    public void run(String... args) throws Exception {
        InputChannel inputChannel = new InputChannel();
        Flux<Message<String>> input = Flux.defer(() -> {
            Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
            System.out.println("初始化了inputChannel");
            MessageHandler messageHandler = message -> {
                System.out.println("处理信息");
                sink.tryEmitNext((Message<String>) message);
            };
            inputChannel.subscribe(messageHandler);
            return sink.asFlux().doOnCancel(() -> {
                // ...
            });
        });

        Mono<Void> result = wrapper.apply(input);

        // 上面这一段操作等同于 操作 flux 合并成了一个大的响应式
//          Mono<Void> result = Flux.defer(() -> {
//                Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError();
//                System.out.println("初始化了inputChannel");
//                MessageHandler messageHandler = message -> {
//                    System.out.println("处理信息");
//                    sink.tryEmitNext((Message<String>) message);
//                };
//                inputChannel.subscribe(messageHandler);
//                return sink.asFlux().doOnCancel(() -> {
//                    // ...
//                });
//            }).map(message -> {
//                System.out.println("接收到了: " + message);
//                return message;
//            }).then()
//            .doOnSubscribe(message -> {
//                System.out.println("在Wrapper.apply我加入了");
//            });

        result.subscribe();
        inputChannel.handle(MessageBuilder.withPayload("aaaa").build());
    }
}

static class InputChannel {
    final List<MessageHandler> messageHandlers = new ArrayList<>();

    public void subscribe(MessageHandler messageHandler) {
        messageHandlers.add(messageHandler);
    }

    public void handle(Message<String> message) {
        messageHandlers.get(0).handleMessage(message);
    }
}

@Component
static class Wrapper {
    @Autowired
    Function<Flux<Message<String>>, Mono<Void>> demo;

    public Mono<Void> apply(Flux<Message<String>> input) {
        System.out.println("---------");
        return demo.apply(input).doOnSubscribe(message -> {
            System.out.println("在Wrapper.apply我加入了");
        });
    }
}

这一段简单的响应式, 是functionInitializer核心的部分.

先组装flux然后调用我们注册的Bean把初始化的东西传入并生成一个总的响应式, 类似于合体一样. 上面注释部分的result就是最终生成的响应式.

functionInitializer就是把注册的Function Bean的调用某些注册方法加入到channel中和增加一些响应式的钩子达到统一处理某些信息的注册.

下面我们一起来看看源码

functionInitializer

初始化了一个这样的Bean--new FunctionConfiguration.FunctionToDestinationBinder

public void afterPropertiesSet() throws Exception {
    Map<String, BindableProxyFactory> beansOfType = this.applicationContext.getBeansOfType(BindableProxyFactory.class);
}

首先把BindableProxyFactory.class的Bean都取出来了.

看到BindableProxyFactory是不是很熟悉, 点进去发现, 他是BindableFunctionProxyFactory的父类.

BindableFunctionProxyFactory是不是(二)中用definition注册的Bean.

接着我们看到下面的这个bindFunctionToDestinations函数

只有这个函数不是提供者的时候才能绑定函数到目的地

if (function != null && !function.isSupplier()) {
    this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
}

从下述代码发现inputs/outputs 就是(二)中注册的Input/Output

Set<String> inputBindingNames = bindableProxyFactory.getInputs();
Set<String> outputBindingNames = bindableProxyFactory.getOutputs();

public Set<String> getInputs() {
    return this.inputHolders.keySet();
}

我们看到其中有一段关键的代码

SubscribableChannel, 是不是在(二)中注册的DirectWithAttributesChannel的Bean.
把对应inputBindingName的取了出来并做了对应的封装.
组合成一个Publisher

SubscribableChannel inputChannel = (SubscribableChannel)this.applicationContext.getBean(inputBindingName, SubscribableChannel.class);

IntegrationReactiveUtils.messageChannelToFlux(inputChannel);

进入messageChannelToFlux方法我们发现会调用adaptSubscribableChannelToPublisher

 private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
    return Flux.defer(() -> {
        Many<Message<T>> sink = Sinks.many().unicast().onBackpressureError();
        MessageHandler messageHandler = (message) -> {
            while(true) {
                switch(sink.tryEmitNext(message)) {
                case FAIL_NON_SERIALIZED:
                case FAIL_OVERFLOW:
                    LockSupport.parkNanos(1000L);
                    break;
                case FAIL_ZERO_SUBSCRIBER:
                    throw new IllegalStateException("The [" + sink + "] doesn't have subscribers to accept messages");
                case FAIL_TERMINATED:
                case FAIL_CANCELLED:
                    throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink for message channel: " + inputChannel);
                default:
                    return;
                }
            }
        };
        inputChannel.subscribe(messageHandler);
        return sink.asFlux().doOnCancel(() -> {
            inputChannel.unsubscribe(messageHandler);
        });
    });
}

会发现有一行

inputChannel.subscribe(messageHandler);

把处理message的处理器注册进了inputChannel中

因为这个inputChannel就是DirectWithAttributesChannel, 所以我们直接关注到DirectWithAttributesChannel的subscibe方法.

 MessageDispatcher dispatcher = this.getRequiredDispatcher();
        boolean added = dispatcher.addHandler(handler);

把这个handler加进了dispatcher中, 那这个dispatcher是一个什么呢?

我们查阅继承关系发现DirectChannel这个类初始化的时候初始化了一个dispathcher

public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
    this.dispatcher = new UnicastingDispatcher();
    ...
}

这样 messageHander 就注册进了DirectWithAttributesChannel的dispatcher中.

我们回到bindFunctionToDestinations中, 然后我们关注到这一行代码

Object resultPublishers = ((Function)functionToInvoke).apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray(inputPublishers));

functionToInvoke 就是FunctionWrapper, 所以我们看看FunctionInvocationWrapper的apply方法
点进去看看

public Object apply(Object input) {
    // ...
    Object result = this.doApply(input);
    // ...
    return result;
}

看到doApply中, 因为我们注册的Bean是Function类型的, 所以我们直接看到 invokeFunction
发现有关键的一行 invokeFunctionAndEnrichResultIfNecessary

result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput);

private Object invokeFunctionAndEnrichResultIfNecessary(Object value) {
    //...

    // target就是注册的Function Bean的函数.
    // 在此处我们对他进行调用并把输入传入.
    // intputValue是对inputChannel内的信息进行了处理并封装成了Message
    // 想知道怎么处理的朋友可以看看源码, 就在这个函数里
    Object result = ((Function)this.target).apply(inputValue);

    //...
}

那这个target是什么呢, 这个是时候我们可以打个断点看看, 发现他就是我们注册的Function.

然后他调用了apply, 证明调用了这个方法, 并且传入了inputValue

然后我们发现functionToInvoke.apply这个函数将上述封装的inputChannel响应式进行传入, 并调用对应的function Bean, 得到完整的响应式函数. 合并了两段响应式函数.

这里的resultPublishers实际上就是我们配置的Function调用后的返回的值.

接着对resultPublishers进行判断, 是否有输出需要处理, 有的话做个doOnNext的钩子, 并封装对应的发送和错误处理逻辑.
没有则进行subscribe, 让之前的inputChannel的调用进行消费注册.

((Iterable)resultPublishers).forEach((publisher) -> {
    Flux flux = Flux.from((Publisher)publisher);
    if (!CollectionUtils.isEmpty(outputBindingNames)) {
       //  ...发送逻辑
    }
    // 如果不是消费者 则消费.
    // 这会subscribe上面配置的Flux, 进行对应的初始化.
    // 但是doOn的方法是钩子, 这边只是简单的subscribe所以不会被触发
    if (!function.isConsumer()) {
        flux.subscribe();
    }
});

至此, 我们才完整的注册了一个Function Bean.

总结

  1. 找到(二)中注册的Bean
  2. 找到(二)中注册的对应的Input/Output的Bean
  3. 将channel和这个Function bean绑定到一起, 并加入统一的处理方法

ps 响应式其实不是直接调用, 是配置了一堆东西, 等同 于配置文件. 等到一个命令来的时候例如类被new的时候, 再进行统一的执行.

好, 这能这篇文章比较干, 也可能比较乱, 如果有不好的地方, 欢迎讨论改进, 谢谢!

然后channel的注册 --- (二)
function Bean 和 channel 的绑定 (三) 已经说完了
那是不是还缺一个channel 和外部消息中间件的绑定呢, 我们下一篇文章继续!

Wish. Do.

这篇关于spring cloud stream 3.1.2 源码搭配rocketmq学习 (三)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!