小提示:阿里云打开namesrv和broker的端口
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>mq3</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.4.4</version> </parent> <dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> </dependencies> </project>
[root@iZ2ze5v2vdwv6veyksylhxZ /]# cd /usr/local/ [root@iZ2ze5v2vdwv6veyksylhxZ local]# ls aegis bin include libexec nginx rocketmq src apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28 apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz [root@iZ2ze5v2vdwv6veyksylhxZ local]# cd rocketmq [root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# ls benchmark bin conf lib LICENSE NOTICE README.md [root@iZ2ze5v2vdwv6veyksylhxZ rocketmq]# cd bin/ [root@iZ2ze5v2vdwv6veyksylhxZ bin]# ls cachedog.sh mqadmin mqbroker.numanode1 mqshutdown play.sh runserver.sh cleancache.sh mqadmin.cmd mqbroker.numanode2 mqshutdown.cmd README.md setcache.sh cleancache.v1.sh mqbroker mqbroker.numanode3 nohup.out runbroker.cmd startfsrv.sh dledger mqbroker.cmd mqnamesrv os.sh runbroker.sh tools.cmd hs_err_pid20448.log mqbroker.numanode0 mqnamesrv.cmd play.cmd runserver.cmd tools.sh [root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqnamesrv Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON
[root@iZ2ze5v2vdwv6veyksylhxZ bin]# ./mqbroker -n 8.131.84.120:9876 -c ../conf/broker.conf The broker[broker-a, 8.131.84.120:10911] boot success. serializeType=JSON and name server is 8.131.84.120:9876
[root@iZ2ze5v2vdwv6veyksylhxZ ~]# cd /usr/local/ [root@iZ2ze5v2vdwv6veyksylhxZ local]# ls aegis bin include libexec nginx rocketmq src apache-maven-3.6.3 curl jdk1.8.0_281 libiconv openssl rocketmq-console-ng-1.0.1.jar tengine apache-tomcat-7.0.61 etc lib man redis_bloom sbin yum-3.2.28 apache-tomcat-7.0.61.tar.gz games lib64 mysql redisbloom.so share yum-3.2.28.tar.gz [root@iZ2ze5v2vdwv6veyksylhxZ local]# java -jar rocketmq-console-ng-1.0.1.jar
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; /** * 生产者、消费者、namesrv、broker 之间关系 * * 1、namesrv 类似注册中心,生产者、消费者、broker 都会注册到namesrv上 * 2、生产者 发送消息给 namesrv * 3、namesrv 将消息转发给 broker的topic上 * 4、broker的topic 再将消息传给 消费者 */ public class Producer { public static void main(String[] args) throws Exception{ //1、先创建了生产者 DefaultMQProducer producer = new DefaultMQProducer("mygroup"); //2、生产者要主动联系namesrvAddr producer.setNamesrvAddr("8.131.84.120:9876"); //3、连接成功后要启动生产者 producer.start(); //4、创建消息类,包含topic和body Message message = new Message("mytopic","hello world".getBytes()); //5、生产者将消息发送出去 System.out.println(producer.send(message)); //6、关闭生产者 producer.shutdown(); } }
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws Exception { //1、创建DefaultMQPushConsumer DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("demo-consumer-group"); //2、设置namesrv地址 mqPushConsumer.setNamesrvAddr("8.131.84.120:9876"); //3、设置subscribe读取主题信息 /** * 生产者类似一个作者,namesrv类似杂志社,消费者必须先订阅某家报社,才可以收到生产者给报社写的文章 * 每个消费者只能订阅一个topic * topic:关注消息的地址 * 过滤器 * :表示不过滤 */ mqPushConsumer.subscribe("mytopic","*"); //4、消费者注册个监听器,这样namesrv里传进来生产者提供的消息后,就可以及时知道了 //MessageListenerConcurrently 是普通消息接收,MessageListenerOrderly 是顺序消息接收 mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { try { //获取主题 System.out.println(msg.getTopic()); //获取标签 System.out.println(msg.getTags()); //获取消息 System.out.println(msg.getBody().toString()); } catch (Exception e) { e.printStackTrace(); //重新再消费一次 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } //5、默认情况下,这条消息只会被一个consumer消费到,点对点消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启Consumer mqPushConsumer.start(); System.out.println("消费者启动..."); } }