C/C++教程

RocketMQ——总结(一)

本文主要是介绍RocketMQ——总结(一),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
官网:https://rocketmq.apache.org/ 源码地址:https://github.com/apache/rocketmq  

一、RocketMQ的优点

1、天然支持集群模式、负载均衡、水平扩展能力 2、上亿级别的消息堆积能力 3、采用零拷贝的原理、顺序写盘、随机读(借鉴kafka) 4、丰富的API使用,支持顺序消息,事务消息,rabbitmq不支持 5、代码优秀,底层通信通过Netty NIO框架 6、NameServer代替Zookeeper(2.x时还是zk) 7、强调集群无单点,可扩展,任意一点高可用,水平扩展 8、消息失败重试机制(rabbit没有)、消息可查询 9、开源社区活跃、成熟度高(经过双十一的考验) 10,支持类sql表达式过滤,可能借鉴activemq  

二、概念

Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息。 Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。 Push Consumer: Consumer的一 种,需要向Consumer对象注册监听。 Pull Consumer: Consumer的一种,需要主动请求Broker拉取消息。 Producer Group:生产者集合, 一般用于发送一类消息,可用于事务消息 Consumer Group:消费者集合,一般用于接受一类消息进行消费 Broker : MQ消息服务(中转角色,用于消息存储与生产消费转发)   其他核心概念参见:https://rocketmq.apache.org/docs/core-concept/ 说明:推拉两种消费模式,类似rabbitmq,但推送实现机制不同,这里的推模式实际是基于长轮询  

三、集群架构

整体集群架构

 

常见部署模式: 1.单点模式 2.主从模式 3.双主模式 4.双主双从模式、多主多从模式  

四、生产者

使用说明: 1.创建生产者对象DefaultMQProducer 2.设置NamesrvAddr 3.启动生产者服务 4.创建消息并发送 5.关闭生产者   其他说明: topic是某一类主题, tag是标签用来过滤,key一般是用来作为用户自定义的Key,唯一标识 topic对消息队列是一对多的关系,默认队列有4个,没有主题的可自动创建 rocketmq的异常区分较细,异常区分细说明rocketmq的源码较为细致   示例代码 发送消息的示例代码:org.apache.rocketmq.example.quickstart.Producer
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);
                /*
                 * There are different ways to send message, if you don't care about the send result,you can use this way
                 * {@code
                 * producer.sendOneway(msg);
                 * }
                 */

                /*
                 * if you want to get the send result in a synchronize way, you can use this send method
                 * {@code
                 * SendResult sendResult = producer.send(msg);
                 * System.out.printf("%s%n", sendResult);
                 * }
                 */

                /*
                 * if you want to get the send result in a asynchronize way, you can use this send method
                 * {@code
                 *
                 *  producer.send(msg, new SendCallback() {
                 *  @Override
                 *  public void onSuccess(SendResult sendResult) {
                 *      // do something
                 *  }
                 *
                 *  @Override
                 *  public void onException(Throwable e) {
                 *      // do something
                 *  }
                 *});
                 *
                 *}
                 */

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

 

 

五、消费者

使用说明: 1.创建消费者对象DefaultMQPushConsumer 2.设置NamesrvAddr及其消费位置ConsumeFromWhere 3.进行订阅主题subscribe 4.注册监听并消费registerMessageListener   其他说明: rocketmq的订阅模式不是服务端真的推,而是客户端长轮询的机制 消费成功返回:ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消费失败了可以自动重试,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER 生产环境下重试多次失败后,需要自己做补偿和记录日志等 重试策略:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9r 10m 20m 30m 1h 2h   示例代码 消费者的示例代码:org.apache.rocketmq.example.quickstart.Consumer
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Specify where to start in case the specific consumer group is a brand-new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more topic to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

 

这篇关于RocketMQ——总结(一)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!