核心依赖:
<!--引入rabbitmq相关依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱, 可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者:
package com.eddie.helloworld; import com.eddie.utiils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider { //生产消息 @Test public void testSendMessage() throws IOException, TimeoutException { // //创建连接mq的连接工厂对象 // ConnectionFactory connectionFactory = new ConnectionFactory(); // //设置连接rabbitmq主机 // connectionFactory.setHost("192.168.2.2"); // //设置端口号 // connectionFactory.setPort(5672); // //设置连接那个虚拟主机 // connectionFactory.setVirtualHost("/ems"); // //设置访问虚拟主机的用户名和密码 // connectionFactory.setUsername("ems"); // connectionFactory.setPassword("123"); // // //获取连接对象 // Connection connection = connectionFactory.newConnection(); //通过工具类获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); //通道绑定对应消息队列 //参数1: 队列名称 如果队列不存在自动创建 //参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化(服务重启后队列还保存,但不保存消息) //参数3: exclusive 是否独占队列 true 独占队列 false 不独占 //参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 //参数5: 额外附加参数 channel.queueDeclare("hello",true,false,true,null); //发布消息 //参数1: 交换机名称 参数2:队列名称 参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费) 参数4:消息的具体内容 channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); // channel.close(); // connection.close(); //调用工具类 RabbitMQUtils.closeConnectionAndChanel(channel,connection); } }
消费者:
package com.eddie.helloworld; import com.eddie.utiils.RabbitMQUtils; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { //生产消息 public static void main(String[] args) throws IOException, TimeoutException { // //创建连接mq的连接工厂对象 // ConnectionFactory connectionFactory = new ConnectionFactory(); // //设置连接rabbitmq主机 // connectionFactory.setHost("192.168.2.2"); // //设置端口号 // connectionFactory.setPort(5672); // //设置连接那个虚拟主机 // connectionFactory.setVirtualHost("/ems"); // //设置访问虚拟主机的用户名和密码 // connectionFactory.setUsername("ems"); // connectionFactory.setPassword("123"); // // //获取连接对象 // Connection connection = connectionFactory.newConnection(); // //通过工具类获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); //通道绑定对应消息队列 //参数1: 队列名称 如果队列不存在自动创建 //参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化 //参数3: exclusive 是否独占队列 true 独占队列 false 不独占 //参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 //参数5: 额外附加参数 channel.queueDeclare("hello",true,false,true,null); channel.basicConsume("hello", true, new DefaultConsumer(channel){ //最后一个参数:消息队列取出的消息 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body): " + new String(body)); } }); // channel.close(); // connection.close(); } }
封装的公共类:
package com.eddie.utiils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQUtils { private static ConnectionFactory connectionFactory; //private static Properties properties; static{ //重量级资源 类加载执行之执行一次 //创建连接mq的连接工厂对象 connectionFactory = new ConnectionFactory(); //设置连接rabbitmq主机 connectionFactory.setHost("192.168.2.2"); //设置端口号 connectionFactory.setPort(5672); //设置连接那个虚拟主机 connectionFactory.setVirtualHost("/ems"); //设置访问虚拟主机的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("123"); } //定义提供连接对象的方法 public static Connection getConnection() { try { return connectionFactory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } //关闭通道和关闭连接工具方法 public static void closeConnectionAndChanel(Channel channel, Connection conn) { try { if(channel!=null) channel.close(); if(conn!=null) conn.close(); } catch (Exception e) { e.printStackTrace(); } } }
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会
远远大于消息的消费速度。长此以往,消息就会堆积
越来越多,无法及时处理。此时就可以使用work 模型:
让多个消费者绑定到一个队列,共同消费队列中的消息。
队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给 下一个使用者。平均而言,每个消费者都会收到相同数量 的消息。这种分发消息的方式称为循环。
假如生产者发送了10条消息,每个消费者将获取5条数据,其中一个消费者读取到第三条的时候发生宕机,则剩下的消息就会丢失,这样我们就需要关闭rabbitmq自动确认机制,手动去给一个确认标识,即可以解决宕机消息丢失问题,又可以产生能者多劳的效果
生产者:
package com.eddie.workqueues; import com.eddie.utiils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider2 { //生产消息 @Test public void testSendMessage() throws IOException, TimeoutException { //通过工具类获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); //通道绑定对应消息队列 //参数1: 队列名称 如果队列不存在自动创建 //参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化(服务重启后队列还保存,但不保存消息) //参数3: exclusive 是否独占队列 true 独占队列 false 不独占 //参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除 //参数5: 额外附加参数 channel.queueDeclare("hello",true,false,true,null); //发布消息 //参数1: 交换机名称 参数2:队列名称 参数3:传递消息额外设置(如下参数保证服务重启后消息还存在,可继续消费) 参数4:消息的具体内容 for(int i = 0; i<20; i++){ channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+"hello rabbitmq").getBytes()); } //调用工具类 RabbitMQUtils.closeConnectionAndChanel(channel,connection); } }
消费者1:
package com.eddie.workqueues; import com.eddie.utiils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws IOException { //通过工具类获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); channel.basicQos(1); //每次只能消费一条消息 channel.queueDeclare("hello",true,false,true,null); //参数1:队列名称 参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认 channel.basicConsume("hello", false, new DefaultConsumer(channel){ //最后一个参数:消息队列取出的消息 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("111new String(body): " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
消费者2:
package com.eddie.workqueues; import com.eddie.utiils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import static java.lang.Thread.sleep; public class Consumer2 { public static void main(String[] args) throws IOException { //通过工具类获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); channel.basicQos(1); //每次只能消费一条消息 channel.queueDeclare("hello",true,false,true,null); //参数1:队列名称 参数2:消息确认 true消费者自动向rabbitmq确认消费 false不会自动确认 channel.basicConsume("hello", false, new DefaultConsumer(channel){ //最后一个参数:消息队列取出的消息 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("222new String(body): " + new String(body)); //手动确认 参数1:手动确认消息标识 参数2:是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
设置通道一次只能消费一个消息 关闭消息的自动确认,开启手动确认消息