声明:此文是小白本人学习Spring所写,主要参考(搬运)了:
MQ(Message Quene) : 翻译为消息队列,就是指存储消息的一个容器。它是一个典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,可以轻松的实现系统间解耦。别名为:消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
下面是MQ最简单的模型,它包含了四个关键词:生产者、消费者、消息和队列。
使用消息中间件最主要的目的:
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
引入消息队列后的做法:
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
(1) 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2) 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
(3) 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
应用场景:系统其他时间A系统每秒请求量就100个,系统可以稳定运行。系统每天晚间八点有秒杀活动,每秒并发请求量增至1万条,但是系统最大的处理能力只能每秒处理1000个请求,于是系统崩溃,服务器宕机。
传统架构:大量用户(100万用户)通过浏览器在晚上八点高峰期同时参与秒杀活动。大量的请求涌入我们的系统中,高峰期达到每秒钟5000个请求,大量的请求打到MySQL上,每秒钟预计执行3000条SQL。但是一般的MySQL每秒钟扛住2000个请求就不错了,如果达到3000个请求的话可能MySQL直接就瘫痪了,从而系统无法被使用。但是高峰期过了之后,就成了低峰期,可能也就1万用户访问系统,每秒的请求数量也就50个左右,整个系统几乎没有任何压力。
引入MQ:100万用户在高峰期的时候,每秒请求有5000个请求左右,将这5000请求写入MQ里面,系统A每秒最多只能处理2000请求,因为MySQL每秒只能处理2000个请求。系统A从MQ中慢慢拉取请求,每秒就拉取2000个请求,不要超过自己每秒能处理的请求数量即可。MQ,每秒5000个请求进来,结果只有2000个请求出去,所以在秒杀期间(将近一小时)可能会有几十万或者几百万的请求积压在MQ中。这个短暂的高峰期积压是没问题的,因为高峰期过了之后,每秒就只有50个请求进入MQ了,但是系统还是按照每秒2000个请求的速度在处理,所以说,只要高峰期一过,系统就会快速将积压的消息消费掉。我们在此计算一下,每秒在MQ积压3000条消息,1分钟会积压18万,1小时积压1000万条消息,高峰期过后,1个多小时就可以将积压的1000万消息消费掉。
消息队列的优点:
消息队列的缺点:
市场上常见的消息队列有如下:
注:每种MQ没有绝对的好坏,主要依据使用场景,扬长避短,利用其优势,规避其劣势。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
厂商 | Rabbit | Apache | Alibaba | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | AMQP,OpenWire,STOMP,XMPP,REST | 自定义 | 自定义(基于TCP),社区封装了Http协议支持 |
优点 | 基于erlang,并发能力强,性能极好,延迟极低,稳定性和安全性很高,对性能和吞吐量的要求在其次,管理界面美观,社区非常活跃。 | 遵循JMS规范,安装方便,业界成为老牌,丰富的API和参考文档 | 在阿里被广泛应用于交易、充值、流计算、消 息推送、日志流式处理、binglog分发等场景。 | 依赖zk,可动态扩展节点,提供超高的吞吐量、极高的可用性以及可靠性 |
缺点 | Erlang语言难较大,不支持动态扩展;吞吐量会低一些,Rabbitmq的集群动态扩展很麻烦 | 有可能会丢失消息,该MQ不再维护,重心在下一代产品apolle | 社区活跃一般,随时会被阿里抛弃 | 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移 |
时效 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
可用性 | 高,基于主从架构实现高可用 | 高,基于主从架构实现高可用 | 非常高,分布式架构 | 非常高,分布式架构 |
可靠性 | 基本不丢数据 | 有较低的概率丢失数据 | 通过参数优化,可以做到0丢失 | 通过参数优化,可以做到0丢失 |
应用 | 适合对稳定性要求高的企业级应用 | 适合中小企业,不适合上千个队列的应用 | 适合大型企业和大规模分布式系统应用 | 应用在大数据日志处理或对实时性、可靠性要求较低的应用场景(少量数据丢失) |
[1]、QPS(Queries Per Second):即每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。每秒查询率:在因特网上,经常用每秒查询率来衡量域名系统服务器的机器的性能,即为QPS。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。
[2]、TPS(Transactions Per Second):每秒事务数,每秒系统能够处理的事务次数。
[3]、PV(page view):即页面浏览量,或点击量;通常是衡量一个网络新闻频道或网站甚至一条网络新闻的主要指标。对PV的解释是,一个访问者在24小时(0点到24点)内到底看了你网站几个页面。这里需要强调:同一个人浏览你网站同一个页面,不重复计算PV量,点100次也算1次。说白了,PV就是一个访问者打开了你的几个页面。PV之于网站,就像收视率之于电视,从某种程度上已成为投资者衡量商业网站表现的最重要尺度。
PV的计算:当一个访问着访问的时候,记录他所访问的页面和对应的IP,然后确定这个IP今天访问了这个页面没有。如果你的网站到了23点,单纯IP有60万条的话,每个访问者平均访问了3个页面,那么pv表的记录就要有180万条。
[4]、UV(Unique Visitor):指访问某个站点或点击某条新闻的不同IP地址的人数。在同一天内,uv只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。独立IP访问者提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
[5]、PR值:即PageRank,网页的级别技术,用来标识网页的等级/重要性。级别从1到10级,10级为满分。PR值越高说明该网页越受欢迎(越重要)。例如:一个PR值为1的网站表明这个网站不太具有流行度,而PR值为7到10则表明这个网站非常受欢迎(或者说极其重要)。
[6]、计算关系:
参考资料:
RabbitMQ是由erlang语言开发,基于AMQP协议实现的消息队列,它的并发能力强,性能极好,延迟极低,稳定性和安全性很高,同时还支持集群。RabbitMQ在分布式系统开发中应用非常广泛,是最受欢迎的开源消息中间件之一。
注意:由于RabbitMQ是采用erlang语言开发的,所以必须有erlang环境才可以运行
AMQP 协议:AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议),提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端、中间件等不同产品,不同开发语言等条件的限制。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
JMS:即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
RabbitMQ最初起源于金融系统,用在分布式系统中存储转发消息,在易用性、扩展性、高可用等方面表现不俗,具体特点包括:
RabbitMQ是AMQP协议的一个开源实现,所以其内部实际上也是AMQP中的基本概念,如下图所示:
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers,由于headers交换器和direct交换器完全一致,且性能差很多,目前几乎用不到。这里只看direct、fanout、topic这三种类型:
RabbitMQ提供了6种模式:
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
rabbitmq和erlang的版本对应关系(注意:它两版本关系必须对应):https://www.rabbitmq.com/which-erlang.html
由于rabbitmq是基于erlang语言开发的,所以必须先安装erlang。
这里使用包云进在线下载:https://packagecloud.io/rabbitmq
然后选择需要下载的版本,erlang的版本一定要与RabbitMQ的版本相对应才行。
进入之后分别复制最右边的两个命令执行。
①、首先执行安装脚本(注:每次下载前都需执行此脚本,可能有点慢,耐心等待)
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
②、然后下载安装erlang
#如果wget命令未找到,则执行下面的命令,有则忽略本命令 yum -y install wget ##注:下面的操作二选一 #[1]、下载并自动安装erlang sudo yum install erlang-23.3.4.4-1.el7.x86_64 -y #[2]、下载并手动安装erlang wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.4-1.el7.x86_64.rpm/download.rpm rpm -ivh erlang-23.3.4.4-1.el7.x86_64.rpm
③、检查erlang的版本号
erl -version
RabbitMQ的安装操作和erlang几乎一致。
①、执行安装脚本(注:每次下载前都需执行此脚本,可能有点慢,耐心等待)
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
②、下载并安装rabbitmq
##注:下面的操作二选一 #[1]、下载并自动安装rabbitmq sudo yum install rabbitmq-server-3.8.17-1.el7.noarch -y #[2]、下载并手动安装rabbitmq wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.17-1.el7.noarch.rpm/download.rpm rpm -ivh rabbitmq-server-3.8.17-1.el7.noarch.rpm
③、查看下载的安装包
rpm -qa | grep erlang rpm -qa | grep rabbitmq-server
④、启用管理平台插件,启用插件后,可以可视化管理RabbitMQ
rabbitmq-plugins enable rabbitmq_management
⑤、RabbitMQ的相关命令
#启动 systemctl start rabbitmq-server #重启 systemctl restart rabbitmq-server #状态 systemctl status rabbitmq-server #停止 systemctl stop rabbitmq-server #开机自启 systemctl enable rabbitmq-server
#查看已经开放的端口 firewall-cmd --list-ports #开放指定端口 firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --zone=public --add-port=15672/tcp --permanent #重启防火墙 firewall-cmd --reload
注:由于使用默认的用户名和密码guest需要修改配置文件,这里就不去改了,所以直接添加一个自定义登录用户,步骤如下。
①、添加一个用户名为admin,密码123456的用户
rabbitmqctl add_user admin 123456
②、设置admin为超级管理员
rabbitmqctl set_user_tags admin administrator
③、授权远程访问(也可以登录后,可视化配置)
rabbitmqctl set_permissions -p / admin "." "." ".*"
④、创建完成后,重启RabbitMQ
systemctl restart rabbitmq-server
⑤、查看当前可登录用户
rabbitmqctl list_users
⑥、删除相关用户
rabbitmqctl delete_user admin
#关闭rabbitmq systemctl stop rabbitmq-server #查看相关进程 ps aux | grep rabbitmq #查看下载的安装包 rpm -qa | grep erlang rpm -qa | grep rabbitmq-server #卸载MQ(下面二选一) #[1]推荐 rpm -qa | grep rabbitmq-server rpm -evh rabbitmq-server-3.8.17-1.el7.noarch --nodeps #[2] yum list|grep rabbitmq yum -y remove rabbitmq-server.noarch #卸载erlang(下面二选一) #[1]推荐 rpm -qa | grep erlang rpm -evh erlang-23.3.4.4-1.el7.x86_64 --nodeps #[2] yum list | grep erlang yum -y remove erlang.x86_64 #删除相关文件 rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmq rm -rf /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.17/ rm -rf /etc/rabbitmq/ rm -rf /var/log/rabbitmq
打开浏览器输入网址:http://192.168.43.128:15672/
输入用户名(admin)和密码(123456),进入后台管理页面:
注:RabbitMQ的Web页面多点点就熟悉了。
相关端口介绍:
简单模式:该模式是个一对一模式,只有一个生产者(用于生产消息),一个队列 Queue(用于存储消息),一个消费者 C (用于接收消息)。
注:简单模式也用到了交换机,使用的是默认的交换机(AMQP default)。
[1] 创建一个Maven项目
[2] 导入依赖
在父工程中的 pom.xml 文件导入如下依赖:
<!-- mq的依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency> <!-- 日志处理 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
[3] 封装工具类
在rabbitmq-commons模块中封装 rabbitmq 连接的工具类(注意:其它模块要使用工具类只需引入本模块即可!)
/** * 封装连接工具类 */ public class ConnectionUtils { public static Connection getConnection() throws Exception { // 1.定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.设置服务器地址 factory.setHost("192.168.43.128"); // 3.设置协议端口号 factory.setPort(5672); // 4.虚拟主机名称;默认为 / factory.setVirtualHost("/"); // 5.设置用户名称 factory.setUsername("admin"); // 6.设置用户密码 factory.setPassword("123456"); // 7.创建连接 Connection connection = factory.newConnection(); return connection; } }
[4] 创建生产者
生产者负责创建消息并且将消息发送至指定的队列中,简单分为5步:
/** * 生产者(简单模式) */ public class Producer { // 队列名称 private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明(创建)队列 /* * queue 参数1:声明通道中对应的队列名称 * durable 参数2:是否定义持久化队列,当mq重启之后队列还在 * exclusive 参数3:是否独占本次连接,为true则只能有一个消费者监听这个队列 * autoDelete 参数4:是否自动删除队列,如果为true表示没有消息也没有消费者连接自动删除队列 * arguments 参数5:队列其它参数(额外配置) */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4.发送消息 /* * exchange 参数1:交换机名称,如果没有指定则使用默认Default Exchange * routingKey 参数2:队列名称或者routingKey,如果指定了交换机就是routingKey路由key,简单模式可以传递队列名称 * props 参数3:消息的配置信息 * body 参数4:要发送的消息内容 */ String msg = "Hello RabbitMQ!!!"; System.out.println("生产者发送的消息:" + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); //关闭资源 channel.close(); connection.close(); } }
[5] 创建消费者
消费者实现和生产者实现过程差不多,但是没有关闭通道和连接,因为消费者要一直等待随时可能发来的消息,大致分为如下3步:
/** * 消费者(简单模式) */ public class Consumer { // 队列名称 private static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3. 创建队列Queue,如果没有一个名字叫simple_world的队列,则会创建该队列,如果有则不会创建. // 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { /* * handleDelivery回调方法,当收到消息后,会自动执行该方法 * consumerTag 参数1:消费者标识 * envelope 参数2:可以获取一些信息,如交换机,路由key... * properties 参数3:配置信息 * body 参数4:读取到的消息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); } }; /* * queue 参数1:队列名称 * autoAck 参数2:是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列移除。否则需要手动ack消息 * callback 参数3:回调对象,在上面定义了 */ channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
[6] 运行结果
把生产者的代码运行三次,表示向队列中发送了三次消息。
查看RabbitMQ控制台中的内容。
最后启动消费者,查看控制台打印的数据。
简单模式的不足之处:这种模式是一对一,一个生产者向一个队列中发送消息,一个消费者从绑定的队列中获取消息,这样耦合性过高,如果有多个消费者想消费队列中信息就无法实现了。
工作模式:也被称为任务模型(Task Queues)。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用 work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行。
这种模式只有一个生产者 P,一个用于存储消息的队列 Queue、多个消费者 C 用于接收消息。
工作队列模式的特点有三:
[1] 创建生产者
向队列中发送10条消息。
/** * 生产者(工作模式) */ public class Producer { // 队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、声明队列 queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加属性参数) channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、发送10条消息 for (int i = 1; i <= 10; i++) { String msg = "Hello RabbitMQ!!!~~~" + i; System.out.println("生产者发送消息:" + msg); // basicPublish(交换机名称-""表示不用交换机,队列名称或者routingKey, 消息的属性信息, 消息内容的字节数组); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } //释放资源 channel.close(); connection.close(); } }
[2] 创建消费者
下面分别创建两个消费者Consumer1和Consumer2。
消费者Consumer1:
/** * 消费者1(工作模式) */ public class Consumer1 { // 队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建. // 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
消费者Consumer2:和消费者1几乎一模一样。
/** * 消费者2(工作模式) */ public class Consumer2 { // 队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建. // 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume(QUEUE_NAME, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
[3] 运行结果
首先分别启动两个消费者(注意这里一定要先启动消费者)。
然后启动生产者,分别查看消费者控制台的打印信息,如下所示。
从结果来看,两个消费者对应的控制台是否竞争性的接收到消息。
上面的代码实现就是轮询分发的方式。现象:消费者1 处理完消息之后,消费者2 才能处理,它两这样轮着来处理消息,直到消息处理完成,这种方式叫轮询分发(round-robin)
,结果就是不管两个消费者谁忙,「数据总是你一个我一个」,不管消费者处理数据的性能,此时 autoAck = true。
注意:autoAck属性设置为true,表示消息自动确认。消费者在消费时消息的确认模式可以分为『自动确认和手动确认』。
使用轮询分发的方式会有一个明显的缺点,例如消费者1 处理数据的效率很慢,消费者2 处理数据的效率很高,正常情况下消费者2处理的数据应该多一点才对,而轮询分发则不管你的性能如何,反正就是每次处理一个消息,对于这种情况可以使用公平分发的方式来解决。
要实现公平分发
,操作分为两个步骤:
【1】、保证消息一次只分发一次,加一段关键性代码:
【2】、关闭自动确认,并且手动发送ACK给队列:
完整代码如下所示(分别修改两个消费者):
/** * 消费者1(工作模式) */ public class Consumer1 { // 队列名称 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、创建队列Queue,如果没有一个名字叫work_queue的队列,则会创建该队列,如果有则不会创建. // 这里可有可无,但是发送消息是必须得有该队列,否则消息会丢失 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 channel.basicQos(1); // 4、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { // handleDelivery(消费者标识, 消息包的内容, 属性信息(生产者的发送时指定), 读取到的消息) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者获取消息:" + new String(body)); // 模拟消息处理延时,加个线程睡眠时间 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // basicConsume(队列名称, 是否自动确认, 回调对象) channel.basicConsume(QUEUE_NAME, false, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
修改完成之后再次运行,由于消费者1 设置处理完一个消息后睡眠2秒,而消费者2 为1 秒,所以预计输出的结果为: 消费者2 处理的消息大概是消费者1 的两倍左右,结果如下图所示。
发布订阅模式(Publish/Subscribe):这种模式需要涉及到交换机了,也可以称它为广播模式,消息通过交换机广播到所有与其绑定的队列中。
详细介绍:一个消费者将消息首先发送到交换机上(这里的交换机类型为fanout),然后交换机绑定到多个队列,这样每个发到fanout类型交换器的消息会被分发到所有的队列中,最后被监听该队列的消费者所接收并消费。如下图所示:
[1] 创建生产者
/** * 生产者(发布订阅模式) */ public class Producer { // 交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、连续发送10条消息 for (int i = 1; i <= 10; i++) { String msg = "Hello RabbitMQ!!!~~~" + i; System.out.println("生产者发送的消息:" + msg); //basicPublish(交换机名称[默认Default Exchage],路由key[简单模式可以传递队列名称],消息其它属性,发送的消息内容) channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } //关闭资源 channel.close(); connection.close(); } }
[2] 创建消费者
由于从这里开始涉及到交换机了,使用这里介绍一下四种交换机的类型:
消费者1:
注意:在发送消息前,RabbitMQ服务器中必须的有队列,否则消息可能会丢失,如果还涉及到交换机与队列绑定,那么就得先声明交换机、队列并且设置绑定的路由值(Routing Key),以免程序出现异常,由于本例所有的声明都是在消费者中,所以我们首先要启动消费者。如果RabbitMQ服务器中已经存在了声明的队列或者交换机,那么就不在创建,如果没有则创建相应名称的队列或者交换机。
/** * 消费者1(发布订阅模式) */ public class Consumer1 { // 队列名称 private static final String QUEUE_NAME1 = "fanout_queue1"; // 交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); /* 3、声明交换机 * exchange 参数1:交换机名称 * type 参数2:交换机类型 * durable 参数3:交换机是否持久化 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true); // 4、声明队列Queue queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // 5、绑定队列和交换机 queueBind(队列名, 交换机名, 路由key[交换机的类型为fanout ,routingKey设置为""]) channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ""); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("交换机名称:" + exchange + ",消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME1, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
消费者2:和消费者1几乎一模一样
/** * 消费者2(发布订阅模式) */ public class Consumer2 { // 队列名称 private static final String QUEUE_NAME2 = "fanout_queue2"; // 交换机名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明交换机,如果没有名称为EXCHANGE_NAME的交换机则创建,有则不创建 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true); // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME2, true, false, false, null); // 5、绑定队列和交换机。channel.queueBind(队列名, 交换机名, 路由key[fanout交换机的routingKey设置为""]) channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, ""); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("交换机名称:" + exchange + ",消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME2, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达『广播』的效果,如下所示。
在执行完测试代码后,可以到RabbitMQ的管理后台找到Exchanges
选项卡,点击说明的 fanout_exchange
交换机,可以查看到如下的绑定:
[4] 简单总结
发布订阅模式引入了交换机的概念,所以相对前面的类型更加灵活广泛一些。这种模式需要设置类型为fanout的交换机,并且将交换机和队列进行绑定,当消息发送到交换机后,交换机会将消息发送到所有绑定的队列,最后被监听该队列的消费者所接收并消费。发布订阅模式也可以叫广播模式,不需要RoutingKey的判断。
发布订阅模式与工作队列模式的区别:
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
路由模式(Routing)的特点:
RoutingKey
(路由key)RoutingKey
。RoutingKey
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息。详细介绍:生产者将消息发送到direct交换器,同时生产者在发送消息的时候会指定一个路由key,而在绑定队列和交换器的时候又会指定一个路由key,那么消息只会发送到相应routing key相同的队列,然后由监听该队列的消费者进行消费消息。模型如下图所示:
[1] 创建生产者
/** * 生产者(路由模式) */ public class Producer { // 交换机名称 private static final String EXCHANGE_NAME = "routing_exchange"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、发送消息,连续发3条 for (int i = 0; i < 3; i++) { String routingKey = ""; //发送消息的时候根据相关逻辑指定相应的routing key。 switch (i) { case 0: //假设i=0,为error消息 routingKey = "error"; break; case 1: //假设i=1,为info消息 routingKey = "info"; break; case 2: //假设i=2,为warning消息 routingKey = "warning"; break; } // 要发送的消息 String message = "Hello Message!!!~~~" + routingKey; // 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容) channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8")); System.out.println("生产者发送的消息:" + message); } //释放资源 channel.close(); connection.close(); } }
[2] 创建消费者
消费者1:
/** * 消费者1(路由模式) */ public class Consumer1 { // 队列名称 private static final String QUEUE_NAME1 = "routing_queue1"; // 交换机名称 private static final String EXCHANGE_NAME = "routing_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key) channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "error"); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME1, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
消费者2:
/** * 消费者2(路由模式) */ public class Consumer2 { // 队列名称 private static final String QUEUE_NAME2 = "routing_queue2"; // 交换机名称 private static final String EXCHANGE_NAME = "routing_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME2, true, false, false, null); // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key) channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning"); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME2, true, defaultConsumer); } }
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达『按照需要接收』的效果。
消费者1绑定的交换机和队列的路由Key为error,所以只要生产者发送消息时带有error的routingKey它都能够获取到消息。
消费者2绑定的交换机和队列的路由Key为error、info、warning,所以只要生产者发送消息时带有这3种的routingKey它都能够获取到消息。
[4] 简单总结
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。但是Topic类型的Exchange可以让队列在绑定Routing key 的时候使用通配符进行匹配,也就是模糊匹配,这样与之前的模式比起来,它更加的灵活!
Topic主题模式的Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: log.insert ,它的通配符规则如下:
简单举例:
log.*:只能匹配log.error,log.info 等 log.#:能够匹配log.insert,log.insert.abc,log.news.update.abc 等
图解:
usa.#
,因此凡是以 usa.
开头的routing key
都会被匹配到#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配[1] 创建生产者
/** * 生产者(Topic主题模式) */ public class Producer { // 交换机名称 private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { // 1、创建连接 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、发送消息 for (int i = 0; i < 4; i++) { String routingKey = ""; //发送消息的时候根据相关逻辑指定相应的routing key。 switch (i) { case 0: //假设i=0,为select消息 routingKey = "log.select"; break; case 1: //假设i=1,为info消息 routingKey = "log.delete"; break; case 2: //假设i=2,为log.news.add消息 routingKey = "log.news.add"; break; case 3: //假设i=3,为log.news.update消息 routingKey = "log.news.update"; break; } // 要发送的消息 String message = "Hello Message!!!~~~" + routingKey; // 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容) channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8")); System.out.println("生产者发送的消息:" + message); } // 关闭资源 channel.close(); connection.close(); } }
[2] 创建消费者
消费者1:接收所有与log.*
相匹配的路由key队列中的消息
/** * 消费者(Topic模式) */ public class Consumer1 { // 队列名称 private static final String QUEUE_NAME1 = "topic_queue1"; // 交换机名称 private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 4、声明队列Queue channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME1, true, false, false, null); // 5、根据指定的routingKey绑定队列和交换机,设置路由key channel.queueBind(队列名, 交换机名, 路由key) channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "log.*"); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME1, true, defaultConsumer); //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态 } }
消费者2:接收所有与log.#
相匹配的路由key队列中的消息
/** * 消费者(Topic模式) */ public class Consumer2 { // 队列名称 private static final String QUEUE_NAME2 = "topic_queue2"; // 交换机名称 private static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { // 1、获取连接对象 Connection connection = ConnectionUtils.getConnection(); // 2、创建通道(频道) Channel channel = connection.createChannel(); // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数) channel.queueDeclare(QUEUE_NAME2, true, false, false, null); // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key) channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "log.#"); // 6、监听队列,接收消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //获取路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息信息 String message = new String(body, "utf-8"); System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message); } }; channel.basicConsume(QUEUE_NAME2, true, defaultConsumer); } }
[3] 运行结果
首先分别启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达『按照需要接收』的效果。
消费者1的路由key匹配规则为log.*
,所有该路由规则的绑定的队列应该只有2条信息,结果如下所示:
消费者2的路由key匹配规则为log.#
,它能够匹配以log.
开头的所有路由key,所有该路由规则的绑定的队列应该只有4条信息,结果如下所示:
最后查看一下交换机与队列绑定的相关信息。
[4] 简单总结
Topic主题模式
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。Topic主题模式
可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,所以显得更加灵活。