基于 ErLang
语言开发有高可用高并发的优点,适合集群。
开源、稳定、易用、跨平台、支持多种语言、文档齐全。
有消息确认机制和持久化机制,可靠性高。
Producer
:消息的生产者
Consumer
:消息的消费者
消息队列提供了 FIFO
的处理机制,具有缓存消息的能力。在 RabbitMQ
中,队列消息可以设置为持久化,临时或者自动删除。
如果是持久化的队列,Queue
中的消息会在 Server
本地硬盘存储一份,防止系统 Crash
数据丢失。
如果是临时的队列,Queue
中的数据在系统重启之后就会丢失。
如实是自动删除的队列,当不存在用户连接到 Server
,队列中的数据会被自动删除。
ExChange
类似于数据通信网络中的交换机,提供消息路由策略。
在 RabbitMQ
中,生产者不是将消息直接发送给 Queue
,而是先发送给 ExChange
,ExChange
根据生产者传递的 key
按照特定的路由算法将消息给指定的 Queue
。一个 ExChange
可以绑定多个 Queue
。和 Queue
一样,ExChange
也可以设置为持久化、临时或者自动删除。
所谓绑定就是将一个特定的 ExChange
和一个特定的 Queue
绑定起来。ExChange
和 Queue
的绑定可以是多对多的关系。
在 RabbitMQ Server
上可以创建多个虚拟的 Message Broker
(又叫做 Virtual Hosts
)。每一个 vhost
本质上是一个迷你的 RabbitMQ Server
,分别管理各自的 ExChange
和 binding
。生产者和消费者连接 RabbitMQ Server
需要指定一个 Virtual Host
。
客户端连接到消息队列服务器,打开一个 Channel
。
客户端声明一个 ExChange
,并设置相关属性。
客户端声明一个 Queue
,并设置相关属性。
客户端使用 Routing Key
,在 ExChange
和 Queue
之间建立好绑定关系。
客户端投递消息到 ExChange
。
ExChange
接收到消息后,就根据消息的 key
和已经设置的 bingding
,进行消息路由,将消息投递到一个或多个队列里。
创建 docker-compose.yml
version: '3.1’
services:
rabbitmq:
restart: always
image: rabbitmq:management
container_name: rabbitmq
ports:
访问地址:http://{ip}:15672
首页
Global counts
页
.png)
交换机页
.png)
队列页
.png)
Name
:消息队列的名称,这里是通过程序创建的
Features
:消息队列的类型,durable:true 为会持久化消息
Ready
:准备好的消息
Unacked
:未确认的消息
Total
:全部消息
如果都为 0 则说明全部消息处理完成
创建一个名为 spring-boot-amqp-provider
的生产者项目。
创建 application.yml 文件
spring:
application:
name: spring-boot-amqp
rabbitmq:
host: 192.168.75.133
port: 5672
username: rabbit
password: 123456
创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
@Bean
public Queue queue() {
return new Queue(“helloRabbitMQ”);
}
}
创建消息提供者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
String context = “hello” + new Date();
System.out.println("Provider: " + context);
amqpTemplate.convertAndSend(“helloRabbitMQ”, context);
}
}
创建测试用例
import com.lusifer.spring.boot.amqp.Application;
import com.lusifer.spring.boot.amqp.provider.HelloRabbitProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class AmqpTest {
@Autowired
private HelloRabbitProvider helloRabbitProvider;
@Test
public void testSender() {
for (int i = 0; i < 10; i++) {
RabbitMQProvider.send();
}
}
}
创建一个名为 spring-boot-amqp-consumer
的消费者项目。
创建 application.yml
文件
spring:
application:
name: spring-boot-amqp-consumer
rabbitmq:
host: 192.168.75.133
port: 5672
username: rabbit
password: 123456
创建消息的消费监听组件
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = “helloRabbitMQ”)
public class HelloRabbitConsumer {
@RabbitHandler
public void process(String message) {
System.out.println("Consumer: " + message);
}
}
文章作者:彭超
本文首发于个人博客:https://antoniopeng.com/2020/07/18/mq/%E6%B7%B1%E5%85%A5%E6%B5%85%E5%87%BA%20RabbitMQ/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 彭超 | Blog!