为了测试方便代码复用这里封装了一个简单的连接mq的工具类
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName RabbitMqUtil * @Author ZC * @Date 2022/7/2 21:21 * @Version 1.0 * @Description */ public class RabbitMqUtil { private final String host = "192.168.232.119"; private final int port = 5672; private final String username = "test"; private final String password = "test12"; private final String virtualHost = "test_mq"; /** * 连接RabbitMq * @return * @throws IOException * @throws TimeoutException */ public Connection getConnection() throws IOException, TimeoutException { //连接Mq服务器、主机、端口、用户名、密码 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); //设置虚拟主机 factory.setVirtualHost(virtualHost); Connection connection = factory.newConnection(); return connection; } }
默认
direct
类型交换机,队列名充当路由键。
默认模式-------------消费者间交替接收消息
弊端:
无法做到性能好的消费者多消费、性能差点的少消费(资源的浪费)
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName SimpleProvider * @Author ZC * @Date 2022/7/2 21:33 * @Version 1.0 * @Description 消息提供者 */ public class SimpleProvider { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { RabbitMqUtil rabbitMqUtil = new RabbitMqUtil(); //1、连接mq服务 Connection connection = rabbitMqUtil.getConnection(); //2、通过连接对象获得一个连接通道 Channel channel = connection.createChannel(); //3、声明队列 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, * Map<String, Object> arguments) * queue: 队列名称 * durable: 是否持久化(即服务重启时是否还存在) * exclusive: 是否独占(即当前队列是否只被这一个队列消费) * autoDelete: 是否自动删除(即当该队列没有被连接使用后是否删除) * arguments: 队列其他参数的设置 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); //4、发送信息到mq String message = "hello rabbitMq"; /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * exchange:交换机 * routingKey: 如果交换机是默认交换机那么这个相当于队列名称 * props:消息相关参数信息 * body: 具体消息 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("send: message<"+message+">发送成功"); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName SimpleConsumer * @Author ZC * @Date 2022/7/2 22:02 * @Version 1.0 * @Description */ public class SimpleConsumer { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1、获取连接 Connection connection = new RabbitMqUtil().getConnection(); //2、获取一个连接通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //3、获取队列中的消息 //3。1、处理消息-回调之后使用 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; //监听队列 /** * basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) * * queue: 队列名称 * autoAck: * deliverCallback: 消息传递时回调 * cancelCallback: 消费者取消时回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); //非lambada表达式接收消息 // Consumer consumer = new DefaultConsumer(channel){ // public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // String message = new String(body,"utf-8"); // System.out.println(" [x] Received '" + message + "'"); // } // }; // //监听队列 // channel.basicConsume(QUEUE_NAME,true,consumer); } }
同一队列不同消费者的消息分发机制。
解决简单队列模式中无法实现能者多劳的问题
简单队列和work 模式的不同:
简单队列只要消息从队列中获取,无论消费者获取到消息后是否成功消费,比如遇到状况:断电,都认为是消息已经成功消费;
work模式消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者反馈,如果消费这一直没有反馈,则该消息一直处于不可用状态。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName WorkProvider * @Author ZC * @Date 2022/7/3 10:11 * @Version 1.0 * @Description */ public class WorkProvider { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发送消息 for (int i=0;i<30;i++){ String message = "hellwork"+i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); Thread.sleep(1000); System.out.println("send ["+(i+1)+"]:"+message); } } }
消费者1,接收一次消息休息1s
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.sql.SQLOutput; import java.util.concurrent.TimeoutException; /** * @ClassName WorkConsumer_1 * @Author ZC * @Date 2022/7/3 10:24 * @Version 1.0 * @Description */ public class WorkConsumer_1 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //工作模式的话,设置一直只接收一条,接收后手动回复 channel.basicQos(1); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(body,"utf-8"); System.out.println("消费者(1)接收到的消息:"+message); //接收消息完成后需要手动回复 channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听消息队列-手动回复时候关闭自动回复 channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者2,接收消息后休息2s
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName WorkConsumer_2 * @Author ZC * @Date 2022/7/3 10:38 * @Version 1.0 * @Description */ public class WorkConsumer_2 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicQos(1);//跟简单模式的区别 //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(body,"utf-8"); System.out.println("消费者(2)接收到的消息:"+message); //手动回复 channel.basicAck(envelope.getDeliveryTag(),false);//跟简单模式的区别 } }; //监听队列 channel.basicConsume(QUEUE_NAME,false,consumer); //跟简单模式的区别 } }
消费者1比消费者2性能高,所以消费者处理消息快
同一个交换机绑定多个队列,实现广播消息。
交换机类型为:fanout
一个消息同时发送给多个消费者消费,
消息发送到交换机(变换机中绑定了多个队列)服务消费者绑定了队列,
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Provider * @Author ZC * @Date 2022/7/3 16:17 * @Version 1.0 * @Description 发布订阅模式下的 */ public class Provider { private final static String EXCHANGE_NAME="test_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); //发布订阅模式,消费者的消息发送到交换机上,routingkey为空 //声明交换机,RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。 /** * 路由模式使用: direct * 通配符模式使用: topic */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //发送消息 for(int i=0;i<5;i++){ String message = "测试订阅/发布模式"+i; System.out.println("send [x]"+message); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); } } }
消费者1
import com.rabbitmq.client.*; import com.sun.javaws.jnl.RContentDesc; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Consumer1 * @Author ZC * @Date 2022/7/3 16:35 * @Version 1.0 * @Description */ public class Consumer1 { private final static String QUEUE_NAME = "ex_queue_1"; private final static String EXCHANGE_NAME="test_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); //创建连接通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //队列绑定交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者1接收到消息:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
消费者2
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Consumer2 * @Author ZC * @Date 2022/7/3 16:45 * @Version 1.0 * @Description */ public class Consumer2 { private final static String QUEUE_NAME="ex_queue_2"; private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者2接收到消息:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
交换机根据分发规则将消息分发到不同的队列。
交换机类型为:direct
发布订阅模式下:提供者发送消息时的routingkey为
“ ”
,消费者队列绑定交换机时也没有指定routingkey,那么绑定在交换机上的所有队列都将消费消息ps: 路由模式下,指定routingkey ,消费者消费绑定的队列与发布时绑定的routingkey对应时才能消费消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import jdk.nashorn.internal.ir.CallNode; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName RoutingProvider * @Author ZC * @Date 2022/7/3 17:31 * @Version 1.0 * @Description */ public class RoutingProvider { private final static String EXCHANGE_NAME = "test_routing"; private final static String ROUTING_KEY = "rk"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //发送消息 for(int i = 0;i<5;i++){ String message = "测试routing模式"+i; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes()); System.out.println("send [x]:"+message); } } }
消费者1 routingkey = rk,根据routingkey指定绑定交换机中的队列
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Routingconsumer1 * @Author ZC * @Date 2022/7/3 17:32 * @Version 1.0 * @Description */ public class Routingconsumer1 { private final static String QUEUE_NAME = "rk"; private final static String EXCHANGE_NAME = "test_routing"; private final static String ROUTING_KEY = "rk"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"direct"); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者1接收到消息:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
消费者2 routingkey=rk2,
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Routingconsumer2 * @Author ZC * @Date 2022/7/3 17:32 * @Version 1.0 * @Description */ public class Routingconsumer2 { private final static String QUEUE_NAME = "rk1"; private final static String EXCHANGE_NAME = "test_routing"; private final static String ROUTING_KEY = "rk1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //队列绑定交换机 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); //接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者2接收到的消息:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
交换机可以根据模糊的分发规则将消息分发到不同的队列。
交换机类型为: topic
消费者routingKey中使用*或#,实现交换机跟队列中的模糊匹配
实例:test.add、test.del、test.other.other。
*单层匹配
- test.*,只能匹配到test.add、test.del
多成匹配
- test.# 能匹配所有以test开头的
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName provider * @Author ZC * @Date 2022/7/3 21:02 * @Version 1.0 * @Description */ public class Provider { private final static String EXCHANGE_NAME="ex_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); String message = "测试通配符模式"; channel.basicPublish(EXCHANGE_NAME,"test.topic",null,message.getBytes()); channel.basicPublish(EXCHANGE_NAME,"test.routing",null,message.getBytes()); channel.basicPublish(EXCHANGE_NAME,"test.other.other",null,message.getBytes()); System.out.println("send[x]:"+message); } }
消费者1,*匹配
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Consumer1 * @Author ZC * @Date 2022/7/3 21:13 * @Version 1.0 * @Description */ public class Consumer1{ private final static String EXCHANGE_NAME="ex_topic"; private final static String QUEUE_NAME="queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.*"); //*匹配 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者1:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
消费者2,#匹配
import com.rabbitmq.client.*; import org.example.utils.RabbitMqUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @ClassName Consumer2 * @Author ZC * @Date 2022/7/3 21:13 * @Version 1.0 * @Description */ public class Consumer2 { private final static String EXCHANGE_NAME="ex_topic"; private final static String QUEUE_NAME="queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = new RabbitMqUtil().getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"test.#"); //#匹配 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("消费者2:"+message); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
客户端发送请求到请求队列,并设置临时的响应队列,服务端订阅请求队列,并发送响应到临时的响应队列。
消息可靠发送。