C/C++教程

rocketmq广播消息的(五)

本文主要是介绍rocketmq广播消息的(五),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、简介

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

二、代码

/**
 * 发布订阅消息生产者
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        // 1. 创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.32.128:9876");

        // 3. 启动生产者
        producer.start();

        // 4. 生产者发送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("发送结果:%s%n", result);
        }

        // 5. 停止生产者
        producer.shutdown();
    }
}

 

/**
 * 发布订阅消息生产者
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        // 1. 创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.32.128:9876");

        // 3. 启动生产者
        producer.start();

        // 4. 生产者发送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("发送结果:%s%n", result);
        }

        // 5. 停止生产者
        producer.shutdown();
    }
}

 

这篇关于rocketmq广播消息的(五)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!