Spring Cloud Stream provides unified abstractions of message middleware configurations, and puts forward concepts such as publish-subscribe, consumer groups and partition.Spring Cloud Stream 提供了统一的抽象消息中间件配置,提出了发布订阅、消费组、和分区概念。
Binder: A component used to integrate with external message middleware, and is used to create binding. Different message middleware products have their own binder implementations.For example, Kafka uses KafkaMessageChannelBinder, RabbitMQ uses RabbitMessageChannelBinder, while RocketMQ uses RocketMQMessageChannelBinder.一个用来集成外部消息中间件的组件,用来创建 Binding,不同的消息中间件产品有它们自己的 binder 实现。Kafka uses KafkaMessageChannelBinder, RabbitMQ uses RabbitMessageChannelBinder, while RocketMQ uses RocketMQMessageChannelBinder.
Binding: Includes Input Binding and Output Binding.Binding serves as a bridge between message middleware and the provider and consumer of the applications. Developers only need to use the Provider or Consumer to produce or consume data, and do not need to worry about the interactions with the message middleware.Binding 服务是一个在消息中间件和应用程序生产和消费的桥梁,开发者只需要使用生产者生产数据、消费者消费数据,不需要担心消息中间件的交互,在 SpringBoot 分布式应用 Spring Cloud Stream 和消息中间件流程大致如下:
RocketMQMessageChannelBinder实现了 Binder 的标准,它的内部构建了 RocketMQInboundChannelAdapter 和 RocketMQMessageHandler。而RocketMQMessageHandler将基于Springboot 配置文件 application.yml 中的 binding配置构建RocketMQTemplate。RocketMQTemplate模板类可以发送会将Spring-messaging 中封装的Message 转换为RocketMQ API 中的 Message,并且发送它。RocketMQInboundChannelAdapter将基于 配置文件(Springboot 中application.yml文件)中binding配置构RocketMQListenerBindingContainer,容器将会启动 RocketMQ 中的Consumer消费消息。RocketMQMessageChannelBinder is a standard implementation of Binder, it will build RocketMQInboundChannelAdapterand RocketMQMessageHandler internally. RocketMQMessageHandler will construct RocketMQTemplate based on the Binding configuration. RocketMQTemplate will convert the org.springframework.messaging.Message message class of spring-messaging module to the RocketMQ message class org.apache.rocketmq.common .message.Message internally, then send it out. RocketMQInboundChannelAdapter will also construct RocketMQListenerBindingContainer based on the Binding configuration, and RocketMQListenerBindingContainer will start the RocketMQ Consumer to receive the messages.
Binder实现规范接口说明
package org.springframework.cloud.stream.binder; /** * A strategy interface used to bind an app interface to a logical name. The name is * intended to identify a logical consumer or producer of messages. This may be a queue, a * channel adapter, another message channel, a Spring bean, etc. * * @param <T> the primary binding type (e.g. MessageChannel). * @param <C> the consumer properties type. * @param <P> the producer properties type. * @author Mark Fisher * @author David Turanski * @author Gary Russell * @author Jennifer Hickey * @author Ilayaperumal Gopinathan * @author Marius Bogoevici * @since 1.0 */ public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> { /** * Bind the target component as a message consumer to the logical entity identified by * the name. * @param name the logical identity of the message source * @param group the consumer group to which this consumer belongs - subscriptions are * shared among consumers in the same group (a <code>null</code> or empty String, must * be treated as an anonymous group that doesn't share the subscription with any other * consumer) * @param inboundBindTarget the app interface to be bound as a consumer * @param consumerProperties the consumer properties * @return the setup binding */ Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties); /** * Bind the target component as a message producer to the logical entity identified by * the name. * @param name the logical identity of the message target * @param outboundBindTarget the app interface to be bound as a producer * @param producerProperties the producer properties * @return the setup binding */ Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties); }
package com.alibaba.cloud.stream.binder.rocketmq; /** * @author <a href="mailto:fangjian0423@gmail.com">Jim</a> */ public class RocketMQMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<RocketMQConsumerProperties>, ExtendedProducerProperties<RocketMQProducerProperties>, RocketMQTopicProvisioner> implements ExtendedPropertiesBinder<MessageChannel, RocketMQConsumerProperties, RocketMQProducerProperties> { private RocketMQExtendedBindingProperties extendedBindingProperties = new RocketMQExtendedBindingProperties(); private final RocketMQBinderConfigurationProperties rocketBinderConfigurationProperties; private final RocketMQProperties rocketMQProperties; private final InstrumentationManager instrumentationManager; //创建生产消息处理器 @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<RocketMQProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception { ... } //创建消息消费端点 @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<RocketMQConsumerProperties> consumerProperties) throws Exception { ... } }
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager);RocketMQ中RocketMQMessageHandler中启动消息处理器,handleMessageInternal()方法处理 RocketMQ消息,部分代码如下:
public class RocketMQMessageHandler extends AbstractMessageHandler implements Lifecycle { @Override public void start() { if (!transactional) { instrumentationManager .addHealthInstrumentation(new Instrumentation(destination)); try { rocketMQTemplate.afterPropertiesSet(); instrumentationManager.getHealthInstrumentation(destination) .markStartedSuccessfully(); } catch (Exception e) { instrumentationManager.getHealthInstrumentation(destination) .markStartFailed(e); log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage()); throw new MessagingException(MessageBuilder.withPayload( "RocketMQTemplate startup failed, Caused by " + e.getMessage()) .build(), e); } } running = true; } @Override protected void handleMessageInternal(org.springframework.messaging.Message<?> message) throws Exception { try { final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") .toString(); if (!StringUtils.isEmpty(tags)) { topicWithTags.append(":").append(tags); } SendResult sendRes = null; if (transactional) { sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, topicWithTags.toString(), message, message.getHeaders() .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); log.debug("transactional send to topic " + topicWithTags + " " + sendRes); } else { int delayLevel = 0; try { Object delayLevelObj = message.getHeaders() .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); if (delayLevelObj instanceof Number) { delayLevel = ((Number) delayLevelObj).intValue(); } else if (delayLevelObj instanceof String) { delayLevel = Integer.parseInt((String) delayLevelObj); } } catch (Exception e) { // ignore } if (sync) { sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { rocketMQTemplate.asyncSend(topicWithTags.toString(), message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " + sendResult); } @Override public void onException(Throwable e) { log.error( "RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage( new MessagingException( message, e), null)); } } }); } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { if (getSendFailureChannel() != null) { this.getSendFailureChannel().send(message); } else { throw new MessagingException(message, new MQClientException("message hasn't been sent", null)); } } } catch (Exception e) { log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send(this.errorMessageStrategy .buildErrorMessage(new MessagingException(message, e), null)); } else { throw new MessagingException(message, e); } } } }
//RocketMQMessageChannelBinder中createConsumerEndpoint方法部分代码: RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager);RocketMQInboundChannelAdapter适配器中的容器,启动RocketMQ 消费者,截取代码如如下:
package com.alibaba.cloud.stream.binder.rocketmq.consuming; /** * @author <a href="mailto:fangjian0423@gmail.com">Jim</a> */ public class RocketMQListenerBindingContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle { private final static Logger log = LoggerFactory .getLogger(RocketMQListenerBindingContainer.class); private String nameServer; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private RocketMQListener rocketMQListener; private DefaultMQPushConsumer consumer; private boolean running; @Override public void start() { if (this.isRunning()) { throw new IllegalStateException( "container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } }