RabbitMQ
镜像rabbitmq:3-management 默认安装并启用 rabbitmq_management
docker pull rabbitmq:3.10-management
RabbitMQ
容器docker run -d -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --hostname myRabbit \ --name rabbitmq \ rabbitmq:3.10-management
参数说明:
查看启动情况:
➜ bin docker ps -l CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 21dec23292a9 rabbitmq:3.10-management "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
设置 docker 启动的时候自动启动(可选):
docker update rabbitmq --restart=always
RabbitMQ
后台管理浏览器输入地址:http://ip:15672
即可访问后台管理页面,这里的 ip
为运行 RabbitMQ 所在的服务器的 IP 地址;
账号密码是你创建容器时指定的账号密码
如果访问失败,请尝试关闭防火墙
public class ConnectionUtils { /** * 获取 RabbitMq 连接 * @return * @throws Exception */ public static Connection getConnection() throws Exception{ // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("dev.lzscxb.cn"); factory.setPort(5672); factory.setVirtualHost("/lzscxb"); factory.setUsername("admin"); factory.setPassword("admin"); return factory.newConnection(); } public static void main(String[] args) throws Exception { Connection connection = getConnection(); System.out.println(connection); connection.close(); } }
输出结果:
amqp://admin@1.14.160.174:5672//lzscxb
/** * 发送者 */ public class Sender { public static void main(String[] args) throws Exception { String msg = "hello 你好"; // 获取连接 Connection connection = ConnectionUtils.getConnection(); // 创建信道 Channel channel = connection.createChannel(); /** * 声明队列 * 参数1:队列的名称 * 参数2:队列中的数据是否持久化 * 参数了:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用) * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据) * 参数5:队列参数(没有参数为nuLL) */ channel.queueDeclare("queue1", false, false, false, null); /** * 向指定队列发送消息 * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为"" * 参数2:目标队列的名称 * 参数了:设置消息的属性(没有属性则为nuLL) * 参数4:消息的内容(只接收字节数组) */ channel.basicPublish("", "queue1", null, msg.getBytes()); // 释放资源 channel.close(); connection.close(); } }
运行结果:
public class Receiver { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); // 声明队列 // 从信道中获取信息 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * @param consumerTag * @param envelope * @param properties 协议 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("接收 = " + msg); } }; // 监听队列 true 自动确认消息 channel.basicConsume("queue1",true,consumer); } }
运行结果:
通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
RabbitMQ如何得知消息被消费者接收?
// false:手动消息确认 channel.basicConsume("queue1", false, consumer);
完整代码:
public class Receiver { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 从信道中获取信息 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * @param consumerTag * @param envelope * @param properties 协议 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("接收 = " + msg); // 手动确认(收件人信息,是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 监听队列 false手动确定消息 channel.basicConsume("queue1", false, consumer); } }
如果有两个员工,当所有 奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。 这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它 只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉 RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并 确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的 worker。
/** * 消息接收者1 */ public class Receiver2 { static Integer i = 0; // 统计吃掉羊肉串的数量 public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", false, false, false, null); // 可以理解为快递一个一个送,送完在送下一个 channel.basicQos(1); // 从信道中获取信息 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag * @param envelope * @param properties 协议 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.printf("【顾客2】吃掉 = %s!总共吃[%d]串 \n", msg, ++i); // 模拟网络延迟 try { Thread.sleep(900); } catch (Exception e) { e.printStackTrace(); } // 手动确认(收件人信息,是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(), false); } }; // 手动确定消息 channel.basicConsume("work_queue", false, consumer); } }
必须使用 ACK 确认消息才能生效
/** * 消息发送者 */ public class Sender { public static void main(String[] args) throws Exception { // 获取连接 Connection connection = ConnectionUtils.getConnection(); // 创建信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数1:队列的名称 * 参数2:队列中的数据是否持久化 * 参数了:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用) * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据) * 参数5:队列参数(没有参数为nuLL) */ channel.queueDeclare("work_queue", false, false, false, null); // 发送 100 条数据 for (int i = 0; i < 100; i++) { String msg = "羊肉串 --> " + i; /** * 向指定队列发送消息 * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为"" * 参数2:目标队列的名称 * 参数了:设置消息的属性(没有属性则为nuLL) * 参数4:消息的内容(只接收字节数组) */ channel.basicPublish("", "work_queue", null, msg.getBytes()); System.out.println("新鲜出炉:" + msg); } // 释放资源 channel.close(); connection.close(); } }
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视 频通知
上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列
X队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
运行程序的顺序:
1. MessageSender
MessageReceiver1和MessageReceiver2
MessageSender
/** * 消息发送者 */ public class Sender { public static void main(String[] args) throws Exception { // 获取连接 Connection connection = ConnectionUtils.getConnection(); // 创建信道 Channel channel = connection.createChannel(); // 声明路由 (路由名,路由类型) // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上 channel.exchangeDeclare("exchange_fanout", "fanout"); String msg = "Hello Java!"; /** * 向指定队列发送消息 * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为"" * 参数2:目标队列的名称 * 参数了:设置消息的属性(没有属性则为nuLL) * 参数4:消息的内容(只接收字节数组) */ channel.basicPublish("exchange_fanout", "", null, msg.getBytes()); System.out.println("生产者:" + msg); // 释放资源 channel.close(); connection.close(); } }
/** * 消息接收者1 (粉丝1) */ public class Receiver1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_fanout_queue_1", false, false, false, null); // 绑定队列(关注) channel.queueBind("test_exchange_fanout_queue_1", "exchange_fanout", ""); // 从信道中获取信息 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * * @param consumerTag * @param envelope * @param properties 协议 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.printf("【消费者1】 %s \n", msg); } }; // 监听队列 true 自动确认消息 channel.basicConsume("test_exchange_fanout_queue_1", true, consumer); } }
路由会根据类型进行定向分发消息给不同的队列,如图所示
可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
public class Sender { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明路由(路由名,路由类型) // direct:根据路由键进行定向分发消息 channel.exchangeDeclare("test_exchange_direct", "direct"); String msg = "用户注册,【userid=S101】"; channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes()); System.out.println("[用户系统]:" + msg); channel.close(); connection.close(); } }
public class Receiver1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_direct_queue_1", false, false, false, null); // 绑定路由(如果路由键的类型是 添加,删除,修改 的话,绑定到这个队列1上) // 2.3.4.3 消费者2 // 1. 记住运行程序的顺序,先运行一次sender(创建路由器), // 2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定 // 3. 再次运行sender,发出消息 // 2.3.5 通配符模式 channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert"); channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "update"); channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "delete"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_1", true, consumer); } }
/** * 消息接收者2 */ public class Receiver2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_direct_queue_2", false, false, false, null); // 绑定路由(如果路由键的类型是 查询 的话,绑定到这个队列2上) channel.queueBind("test_exchange_direct_queue_2", "test_exchange_direct", "select"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_2", true, consumer); } }
和路由模式90%是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
public class Sender { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明路由(路由名,路由类型,持久化) channel.exchangeDeclare("test_exchange_topic", "topic", true); String msg = "商品降价"; // 发送消息(第三个参数作用是让消息持久化) channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); System.out.println("[用户系统]:" + msg); channel.close(); connection.close(); } }
public class Receiver { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列( 第二个参数为true:支持持久化) channel.queueDeclare("test_exchange_topic_queue_1", true, false, false, null); channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; channel.basicConsume("test_exchange_topic_queue_1", true, consumer); } }