rocketmq-spring的consumer的相关属性配置有两种方式:
关于注解中的属性可以查看:org.apache.rocketmq.spring.annotation.RocketMQMessageListener,而在文件中可以配置的属性只有如下几个(并不遵守spring boot自动配置规范,所以在idea中不会有相关提示)
说明如下:
配置项 | 说明 |
---|---|
rocketmq.name-server | rocketmq的name server地址,格式:`主机:端口;主机:端口`,多个地址以英文分号分隔 |
rocketmq.consumer.secret-key | ACL的secret-key属性 |
rocketmq.consumer.access-key | ACL的access-key属性 |
rocketmq.consumer.customized-trace-topic | 自定义消费轨迹topic,不使用忽略 |
rocketmq.access-channe | 枚举类型,值为:【LOCAL, CLOUD】,值为CLOUD表示设置接入阿里云。忽略。 |
如果想要设置最大重试次数等一些相关初始化参数配置,很明显是不支持的。
同时,看一下构造consumer的源码,可以看到只配置了固定的几个属性:
private void initRocketMQPushConsumer() throws MQClientException { RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); } }
但是看代码的最后几行,rocketMQListener如果实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListener的prepareStart(consumer)方法,很明显,可以在这里设置consuemr的参数。
说明:rocketMQListener就是类上带有RocketMQMessageListener的bean。
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_topic_consumer", selectorExpression = "*") class StringConsumer implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { LOGGER.info("receive message: {}", message); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 设置最大重试次数 consumer.setMaxReconsumeTimes(5); // 如下,设置其它consumer相关属性 consumer.setPullBatchSize(16); } }
我是在翻源码的才想到这个解决方案,我想既然提供有这个接口进行自定义配置,官方文档应该会有示例说明,然后翻了下github,是有类似的使用方式的,源码上还有其它示例,如果有其它问题,建议还是先看官方示例是否提供了相关解决方案。github地址:https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer