最近在做彩信下发,需要下发的内容是以消息的形式存放在rocektMQ,遇上彩信消息未下发的情况,需要实时去查各topic的消息积压量
1、启动时装配监控客户端的bean
@Component public class MQAdminExtConfig { private static final Logger log = LoggerFactory.getLogger(MQAdminExtConfig.class); @Value("${rocketmq.name-server}") private String nameServer; public static DefaultMQAdminExt defaultMQAdminExt; /** * 启动监控客户端 */ @PostConstruct public void initMqAdminExtConfig(){ //初始化一个生产者,用于初始化参数 log.info("init rocketMQ monitoer client,nameServer:{}....",nameServer); try { DefaultMQProducer producer = new DefaultMQProducer("GRP_P_MSG_PRIORITY_HIGH_BeiJing_8000"); producer.setNamesrvAddr(nameServer); producer.start(); } catch (MQClientException e) { e.printStackTrace(); } try { defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setNamesrvAddr(nameServer); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); defaultMQAdminExt.start(); } catch (Exception e) { e.printStackTrace(); } } }
/** * * @param consumerGroup 消费者组 * @param topic topic * @return 当前topic的积压量 */ private static long getBackLogMsg(String consumerGroup,String topic){ long diff=0; log.info("BacklogMonitorUtil--getBackLogMsg param:consumerGroup:{},topic:{} ",consumerGroup,topic); try { ConsumeStats consumeStats = MQAdminExtConfig.defaultMQAdminExt.examineConsumeStats(consumerGroup); List<MessageQueue> mqList = new LinkedList(); mqList.addAll(consumeStats.getOffsetTable().keySet()); Collections.sort(mqList); for(MessageQueue queue :mqList){ if(topic.equals(queue.getTopic())){ OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(queue); log.info("getBrokerOffset----------------{}",offsetWrapper.getBrokerOffset()); log.info("getConsumerOffset-----------------{}",offsetWrapper.getConsumerOffset()); diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset(); } } } catch (Exception e) { //当消费者未消费时此除会报错 diff=0; log.error("get offset error -----------------{}",e); } return diff; }
这里本来想探究一下为什么当消费者不消费时会报错,先把错误贴出来
上面报的是一个topic路由找不到的错误,且topic是%RETRY%开头的,但是通过查看源码发现defaultMQAdminExt.examineConsumeStats的实现类,查询的topic直接就是
关于%RETRY%开头的topic,
consumer 消费失败,会把消息重新发往 %RETRY% + consumerGroup,这个 retry 消息会在一定时间后,真实送到 retry topic。
但是这里为什么会直接去查 %RETRY% + consumerGroup,没有搞明白,后续再继续跟踪~,有知道的老哥可以在评论区写下答案,感谢