RabbitMQ 是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com
在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景:
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
我们可能会想到这么做:
这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立
原则。
这时,我们就会采用另外一种解决办法,那就是 消息队列!
商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向MQ发送一条消息(比如包含商品id的消息),也不关心消息被谁接收。 搜索服务和静态页面服务监听MQ,接收消息,然后分别去处理索引库和静态页面(根据商品id去更新索引库和商品详情静态页面)。
MQ 全称为 Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
开发中消息队列通常有如下应用场景
1.1 任务异步处理:
高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
1.2 应用程序解耦合:
MQ相当于一个中介,生产者通过MQ与消费者交互,它将应用程序进行解耦合。
MQ是消息通信的模型,并发具体实现。目前实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS 消息服务支持两种消息模型:
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到对应的队列。这是一种点对点的消息模型,这种模式被概括为:
发布者/订阅者模型支持向一个特定的消息主题发布消息,消费者则可以定义自己感兴趣的主题,这是一种点对面的消息模型,这种模式可以被概括为:
这张图中涉及到如下一些概念:
咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:
生产者
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringApplication.class) public class MQt { private static final String QUEUE_NAME = "q1"; @Autowired private Connection connection; @Test public void producer() throws IOException, TimeoutException { // 1. 建立连接 // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建队列 // 参数1: 队列名称 如果队列不存在自动创建 // 参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化 // 参数3: exclusive 是否独占队列 true 独占队列 false 不独占 // 参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 // 参数5: 额外附加参数 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 4. 生成消息 String message = "消费者 hello world "; // 5. 向指定的队列发送消息 // exchange 指定交换机,不指定,mq 默认交换机 “” // routingKey 路由Key,交换机根据路由key将消息发送到指定的队列,如果使用默认交换机,routingKey 就设置为队列名称 // props 消息的属性 // body 消息内容 channel.basicPublish("",QUEUE_NAME, null,message.getBytes()); channel.close(); connection.close();; } }
消费者
public class Consumer1 { private static final String QUEUE_NAME = "q1"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] received : " + msg + "!"); // 手动进行ACK // void basicAck(long deliveryTag, boolean multiple) throws IOException; // deliveryTag:用来标识消息的id // multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。 channel.basicAck(envelope.getDeliveryTag(), true); } }; // 参数明细: // 1、queue 队列名称 // 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复 // 3、callback,消费方法,当消费者接收到消息要执行的方法 channel.basicConsume(QUEUE_NAME, false, consumer); } }
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上,例如消息队列名为 “q1”,则 routingkey 为 “q1” 的消息会被该消息队列接收。
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
再来看发布订阅模式,这种情况是这样:
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
这种情况下,我们有四种交换机可供选择,分别是:
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上;
生产者
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringApplication.class) public class MQDirect { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY = "k1"; // 1. 注入连接 @Autowired private Connection connection; @Test public void producer() throws IOException, TimeoutException { // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建交换机 // 参数说明 1. 交换机名称 2 交换机类型 3 是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // 4. 生成消息 String message = " 发送消息 Direct 直连交换机 "; // 5. 向指定的队列发送消息 // exchange 指定交换机,不指定,mq 默认交换机 “” // routingKey 路由Key,交换机根据路由key将消息发送到指定的队列,如果使用默认交换机,routingKey 就设置为队列名称 // props 消息的属性 // body 消息内容 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, null,message.getBytes()); channel.close(); connection.close(); } }
消费者1
public class Consumer1 { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY = "k1"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare("q1", true, false, false, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,ROUTING_KEY); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] direct 消费端 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue, false, consumer); } }
消费者2
public class Consumer2 { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY = "k1"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare("q1", true, false, false, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,ROUTING_KEY); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] direct 消费端 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue, false, consumer); } }
在广播模式下,消息发送流程是这样的:
可以有多个消费者
每个消费者有自己的 queue(队列)
每个队列都要绑定到 FanoutExchange(交换机)
生产者发送的消息,只能发送到交换机,
交换机来决定要发给哪个队列,生产者无法决定。
交换机把消息发送给绑定过的所有队列
队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringApplication.class) public class MQFanout { // 交换机名称 private static final String EXCHANGE_NAME = "Fanout"; // 1. 注入连接 @Autowired private Connection connection; @Test public void producer() throws IOException, TimeoutException { // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建交换机 参数说明 1. 交换机名称 2 交换机类型 3 是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true); // 4. 生成消息 String message = " 广播: fanout 扇形交换机 "; // 5. 向指定的队列发送消息,扇形交换机不需要指定路由KEY channel.basicPublish(EXCHANGE_NAME,"", null,message.getBytes()); channel.close(); connection.close(); } }
消费者1 其他消费者类似
public class Consumer1 { // 交换机名称 private static final String EXCHANGE_NAME = "Fanout"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare("q1", true, false, false, null).getQueue(); // 绑定交换机 channel.queueBind(queue,EXCHANGE_NAME,""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" 消费者 1 号 : " + msg + "!"); } }; channel.basicConsume(queue, true, consumer); } }
注意: 这里发送消息时不需要 routingkey
,指定 exchange
即可,routingkey
可以直接传一个 null
。
在Fanout模式中,一条消息,会被所有订阅的队列都消费。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到 DirectExchange。
在Direct模型下:队列与交换机的绑定,不能是任意绑定了,
而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,
也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,
而是根据消息的Routing Key进行判断,
只有队列的Routingkey与消息的 Routing key完全一致,
才会接收到消息;
生产者
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringApplication.class) public class MQDirect { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY = "info"; // 1. 注入连接 @Autowired private Connection connection; @Test public void producer() throws IOException, TimeoutException { // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建交换机 // 参数说明 1. 交换机名称 2 交换机类型 3 是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); // 4. 生成消息 String message = " 发送消息 Direct 直连交换机 "; // 5. 向指定的队列发送消息 // exchange 指定交换机,不指定,mq 默认交换机 “” // routingKey 路由Key,交换机根据路由key将消息发送到指定的队列,如果使用默认交换机,routingKey 就设置为队列名称 // props 消息的属性 // body 消息内容 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, null,message.getBytes()); channel.close(); connection.close(); } }
消费者1
public class Consumer1 { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY = "error"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare("q1", true, false, false, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,ROUTING_KEY); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] direct 消费端 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue, false, consumer); } }
消费者2
public class Consumer2 { // 交换机名称 private static final String EXCHANGE_NAME = "E1"; // 路由 KEY private static final String ROUTING_KEY1 = "info"; private static final String ROUTING_KEY2 = "error"; public static void main(String[] args) throws Exception { Connection connection = ConnUtil.getConnection(); Channel channel = connection.createChannel(); String queue = channel.queueDeclare("q2", true, false, false, null).getQueue(); // 根据 路由 KEY 接受消息 绑定多个Routing Key, channel.queueBind(queue,EXCHANGE_NAME,ROUTING_KEY1); channel.queueBind(queue,EXCHANGE_NAME,ROUTING_KEY2); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [x] direct 消费端 : " + msg + "!"); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(queue, false, consumer); } }
注意:必须是两个消费者在各自的队列中,否则 RoutingKey 不起作用
结果:发送了三条消息
TopicExchange 与 DirectExchange 相比,都是可以根据 RoutingKey 把消息路由到不同的队列,不过 TopicExchange 可以让队列在绑定Routing key的时候使用 通配符 !这种模型 Routingkey 一般都是由一个或多个单词组成,多个单词之间以" . "分割;
通配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词 # (hash) can substitute for zero or more words. 匹配零个、一个或多个词
如:
audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等 audit.* 只能匹配 audit.irs
生产者
@Slf4j @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringApplication.class) public class MQTopic { // 交换机名称 private static final String EXCHANGE_NAME = "Topic"; // 路由 KEY private static final String ROUTING_KEY1 = "user.info"; private static final String ROUTING_KEY2 = "del.user.info"; private static final String ROUTING_KEY3 = "user.info.save"; // 1. 注入连接 @Autowired private Connection connection; @Test public void producer() throws IOException, TimeoutException { // 2. 创建通道 Channel channel = connection.createChannel(); // 3. 创建交换机 // 参数说明 1. 交换机名称 2 交换机类型 3 是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true); // 4.向指定的队列发送消息 // exchange 指定交换机,不指定,mq 默认交换机 “” // routingKey 路由Key,交换机根据路由key将消息发送到指定的队列,如果使用默认交换机,routingKey 就设置为队列名称 // props 消息的属性 // body 消息内容 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY1, null,"KEY : user.info.update ".getBytes()); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2, null,"KEY : user.info.del ".getBytes()); channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY3, null,"KEY : user.info.save ".getBytes()); channel.close(); connection.close(); } }
消费者1
public class Consumer1 { // 交换机名称 private static final String EXCHANGE_NAME = "Topic"; public static void main(String[] args) throws Exception { Connection conn = ConnUtil.getConnection(); Channel channel = conn.createChannel(); String queue = channel.queueDeclare("q1", true, false, true, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,"user.*"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" 消费者 1 号 : " + msg + "!"); } }; channel.basicConsume(queue, true, consumer); } }
消费者2
public class Consumer2 { // 交换机名称 private static final String EXCHANGE_NAME = "Topic"; public static void main(String[] args) throws Exception { Connection conn = ConnUtil.getConnection(); Channel channel = conn.createChannel(); String queue = channel.queueDeclare("q2", true, false, true, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,"user.#"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" 消费者 2 号 : " + msg + "!"); } }; channel.basicConsume(queue, true, consumer); } }
消费者3
public class Consumer3 { // 交换机名称 private static final String EXCHANGE_NAME = "Topic"; public static void main(String[] args) throws Exception { Connection conn = ConnUtil.getConnection(); Channel channel = conn.createChannel(); String queue = channel.queueDeclare("q3", true, false, true, null).getQueue(); channel.queueBind(queue,EXCHANGE_NAME,"#.info.#"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" 消费者 3 号 : " + msg + "!"); } }; channel.basicConsume(queue, true, consumer); } }
结果: