自Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,目前最新版本为3.1.3。
自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。下面就介绍下以函数式编程的方式代替StreamListener的方法
来自Spring的博客文章(https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive)上面写着a functional programming model in Spring Cloud Stream (SCSt). It’s less code, less configuration. Most importantly, though, your code is completely decoupled and independent from the internals of SCSt。这有利于使用Project Reactor提供的事件流抽象(如Flux和Mono)(https://projectreactor.io/). 命令函数在每个单独的事件上触发,而reactive函数只触发一次。
官方文档目前只集成了RabbitMQ和Kafka,查看Spring Cloud Alibaba的官方示例,没有提供集成方法。所以就按照RabbitMQ的方式,尝试在RocketMQ上测试验证,发现完全可用,说明rocketmq已经实现了Stream的函数式编辑的相关方法。这样就好办,直接开搞。
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
没有带版本号,需要先依赖
<dependencyManagement> <dependencies> <dependency> <groupId>vip.mate</groupId> <artifactId>mate-starter-dependencies</artifactId> <version>3.3.8</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
spring: cloud: stream: bindings: sms-out-0: destination: sms-topic content-type: application/json sms-in-0: destination: sms-topic content-type: text/plain group: sms-group
注意:
消费者和生产者的使用方式与之前有变化:采用名称-out-数字的方式,用于生产者,名称-in-数字的方式用于消费者。
参考:https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.1.3/reference/html/spring-cloud-stream-binder-rabbit.html
以下摘录rabbitmq的官方示例:
@Autowired private StreamBridge bridge; @Bean Consumer<List<String>> input() { return list -> { List<MyCorrelationData> results = new ArrayList<>(); list.forEach(str -> { log.info("Received: " + str); MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str); results.add(corr); this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase()) .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr) .build()); }); results.forEach(correlation -> { try { Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS); log.info(confirm + " for " + correlation.getPayload()); if (correlation.getReturnedMessage() != null) { log.error("Message for " + correlation.getPayload() + " was returned "); // throw some exception to invoke binder retry/error handling } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); } catch (ExecutionException | TimeoutException e) { throw new IllegalStateException(e); } }; }
package vip.mate.message.service.impl; import lombok.AllArgsConstructor; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Service; import vip.mate.core.rocketmq.constant.MessageConstant; import vip.mate.message.service.ISmsService; /** * 发送短信实现类 * * @author xuzhanfu */ @Service @AllArgsConstructor public class SmsServiceImpl implements ISmsService { private final StreamBridge streamBridge; /** * 采用StreamBridge的发送方式 * * @param message 短消息 * @link https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names */ @Override public void sendSms(String message) { streamBridge.send(MessageConstant.SMS_MESSAGE_OUTPUT, message); } }
package vip.mate.message.service; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import java.util.function.Consumer; /** * 短信消费者业务 * * @author xuzhanfu */ @Slf4j @Service public class SmsConsumerService { /** * 函数式编辑接收消息 * * @return */ @Bean public Consumer<String> sms() { return message -> { log.info("接收的普通消息为:{}", message); }; } }
这就是函数式编程的方式,其中方法名,要与通道名的名称一致。
项目 | GITHUB | 码云 |
---|---|---|
MateCloud后端源码 | https://github.com/matevip/matecloud | https://gitee.com/matevip/matecloud |
Artemis前端源码 | https://github.com/matevip/artemis | https://gitee.com/matevip/artemis |