在上一篇文章已经演示了RocketMQ 入门使用,接下来通过一些简单例子,深入了解下怎么使用。
RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。这种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 可靠同步发送 */ public class SyncProducer { public static void main(String[] args) { // 1.创建一个生产者对象,并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3.启动生产者 producer.start(); for(int i=0; i<10; i++){ int num = i + 1; // 4.创建一个消息,参数分别为:主题、标签、消息体 String sendContent = "测试消息,你好,我是消息"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5.发送消息 SendResult send = producer.send(message); System.out.printf("第"+num+"条消息:%s%n", send); } // 6.关闭资源 producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
package com.linhuaming.rocketmq.demo.producer; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 可靠异步发送 */ public class ASyncProducer { public static void main(String[] args) { // 1.创建一个生产者对象,并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3.启动生产者 producer.start(); for(int i=0; i<5; i++){ int num = i + 1; // 4.创建一个消息,参数分别为:主题、标签、消息体 String sendContent = "测试消息,你好,我是消息"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5.发送同步消息到一个Broker producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); System.out.println(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { System.out.println("消息发送失败"+e.getMessage()); System.out.println("处理失败消息"); } }); } // 让线程不要终止,否则会报错 Thread.sleep(30000000); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3、单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
package com.linhuaming.rocketmq.demo.producer; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; /** * 单向消息 */ public class OneWayProducer { public static void main(String[] args) { // 1.创建一个生产者对象,并且指定一个生产者组 DefaultMQProducer producer = new DefaultMQProducer("xiaolin-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); try { // 3.启动生产者 producer.start(); for(int i=0; i<5; i++){ int num = i + 1; // 4.创建一个消息,参数分别为:主题、标签、消息体 String sendContent = "测试消息,你好,我是消息"+num; Message message = new Message("topic1","tag1",sendContent.getBytes("utf-8")); // 5.发送单向消息到一个Broker producer.sendOneway(message); } // 6.如果不再发送消息,关闭Producer实例。 producer.shutdown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
发送方式 | 发送的时间 | 发送反馈结果 | 是否丢失数据 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向消息 | 较快 | 无 | 可能丢失 |
虽然RocketMQ的数据结构是队列,看起来天生支持顺序消息,当只有一个队列的时候,他就天生支持顺序消息,但是Brocket内部有多个队列,发送多条消息的时候,Broker会按照轮询的方式将多个消息放在不同的队列,消费者采用多线程的方式去消费消息,所以无法保证消费消息的方式和发送消息的方式一样的。解决方式是将消息全部发送到一个队列里面。
比如一个订单的流程是:创建、付款、推送、完成。订单号相同的
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
OrderStep类
package com.linhuaming.rocketmq.demo.domain; import java.util.ArrayList; import java.util.List; /** * 订单构建者 */ public class OrderStep { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}'; } public static List<OrderStep> buildOrders() { // 1039L : 创建 付款 推送 完成 // 1065L : 创建 付款 // 7235L :创建 付款 List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }
OrderProducer类
package com.linhuaming.rocketmq.demo.producer; import com.linhuaming.rocketmq.demo.domain.OrderStep; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; /** * 顺序消息 */ public class OrderProducer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2.指定Nameserver地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 3.启动producer producer.start(); // 构建消息集合 List<OrderStep> orderSteps = OrderStep.buildOrders(); // 遍历发送消息 for (int i = 0; i < orderSteps.size(); i++) { // 4.创建一个消息 String body = orderSteps.get(i) + ""; Message message = new Message("topic1", "tag1", "i" + i, body.getBytes()); // 5.发送消息 SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) { long orderId = (long) arg; long index = orderId % mqs.size(); return mqs.get((int) index); } },orderSteps.get(i).getOrderId()); System.out.println("发送结果:" + sendResult); } // 6.关闭资源 producer.shutdown(); } }
OrderConsumer类
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class OrderConsumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2.指定Nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 3.订阅主题Topic和Tag consumer.subscribe("topic1", "*"); // 4.注册消息监听器,MessageListenerOrderly是按照顺序来消费的 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) { for (MessageExt msg : msgs) { System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); //5.启动消费者 consumer.start(); System.out.println("消费者启动中..."); } }
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。事务消息交互的过程如下:
事务消息的基本概念:
事务消息的发送步骤:
事务消息回查步骤:
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。我们就可以使用延时消息来完成这个功能。
延时消息的使用限制,现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关。
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class ScheduledMessageProducer { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; public static void main(String[] args) throws Exception { // 1.实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 3.启动生产者 producer.start(); Message message = new Message("topic1", ("delay message").getBytes()); // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel) message.setDelayTimeLevel(3); // 发送消息 producer.send(message); // 关闭生产者 producer.shutdown(); } }
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2.指定mq服务器地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 3.订阅Topics consumer.subscribe("topic1", "*"); // 4.注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者 consumer.start(); } }
在消费消息的时候,我们可以指定消费哪些消息,这个时候就需要用到消息的过滤,他分为两种过滤:
生产者
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * 标签过滤-生产者 */ import java.util.concurrent.TimeUnit; public class FilterProducer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 3.启动producer producer.start(); for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 Message msg = new Message("topic1", "tag2", ("消息的过滤" + i).getBytes()); // 5.发送消息 SendResult result = producer.send(msg); System.out.printf(result.toString()); //线程睡1秒 TimeUnit.SECONDS.sleep(1); } //6.关闭生产者producer producer.shutdown(); } }
消费者
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 标签过滤-消费者 */ public class FilterConsumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2.指定mq服务器地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 3.订阅Topics consumer.subscribe("topic1", "tag1 || tag2"); // 4.注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者 consumer.start(); } }
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
常量支持类型为:
生产者
发送消息时,你能通过putUserProperty来设置消息的属性。
package com.linhuaming.rocketmq.demo.producer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; /** * SQL过滤-生产者 */ public class SQLFilterProducer { public static void main(String[] args) throws Exception { // 1.创建消息生产者producer,并指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("xiaoxi-producer"); // 2.指定mq服务器地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 3.启动producer producer.start(); for (int i = 0; i < 3; i++) { // 4.创建消息对象,指定主题Topic、Tag和消息体 Message msg = new Message("topic1", "tag1", ("消息的过滤" + i).getBytes()); // 给消息对象设置一些属性 msg.putUserProperty("a", String.valueOf(i)); // 5.发送消息 SendResult result = producer.send(msg); System.out.println(result.toString()); } //6.关闭生产者producer producer.shutdown(); } }
消费者
用MessageSelector.bySql来使用sql筛选消息。
package com.linhuaming.rocketmq.demo.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * SQL过滤-消费者 */ public class SQLFilterConsumer { public static void main(String[] args) throws Exception { // 1.创建消费者Consumer,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaoxi-consumer"); // 2.指定mq服务器地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 3.订阅Topics consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 1")); // 此处使用sql来过滤消息 // 4.注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者 consumer.start(); } }
“127.0.0.1:9876”);
// 3.订阅Topics consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 1")); // 此处使用sql来过滤消息 // 4.注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt message : messages) { System.out.println("Receive message[msgId=" + message.getMsgId() + "]:"+ new String(message.getBody()) ); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5.启动消费者 consumer.start(); }
}