RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。
1.使用docker环境,下载rabbitmq:management镜像
有压缩包的直接使用即可
docker pull rabbitmq:management
将压缩包放入root目录下并进行导入镜像:
docker load -i rabbit-image.gz #导入rabbit镜像
docker images #查看
2.关闭防火墙
systemctl stop firewalld systemctl disable firewalld # 重启 docker 系统服务 systemctl restart docker
3.配置管理员用户名和密码
mkdir /etc/rabbitmq vim /etc/rabbitmq/rabbitmq.conf # 添加两行配置: default_user = admin default_pass = admin
4.启动Rabbitmq
docker run -d --name rabbit \ -p 5672:5672 \ -p 15672:15672 \ -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \ --restart=always \ rabbitmq:management
访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin
添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency>
生产者发送消息:
package rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { //创建连接工厂,并设置连接信息 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.141"); f.setPort(5672);//可选,5672是默认端口 f.setUsername("admin"); f.setPassword("admin"); /* * 与rabbitmq服务器建立连接, * rabbitmq服务器端使用的是nio,会复用tcp连接, * 并开辟多个信道与客户端通信 * 以减轻服务器端建立连接的开销 */ Connection c = f.newConnection(); //建立信道 Channel ch = c.createChannel(); /* * 声明队列,会在rabbitmq中创建一个队列 * 如果已经创建过该队列,就不能再使用其他参数来创建 * * 参数含义: * -queue: 队列名称 * -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在 * -exclusive: 排他,true表示限制仅当前连接可用 * -autoDelete: 当最后一个消费者断开后,是否删除队列 * -arguments: 其他参数 */ ch.queueDeclare("helloworld", false,false,false,null); /* * 发布消息 * 这里把消息向默认交换机发送. * 默认交换机隐含与所有队列绑定,routing key即为队列名称 * * 参数含义: * -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null * -routingKey: 对于默认交换机,路由键就是目标队列名称 * -props: 其他参数,例如头信息 * -body: 消息内容byte[]数组 */ ch.basicPublish("", "helloworld", null, "Hello world!".getBytes()); System.out.println("消息已发送"); c.close(); } }
消费者接收消息:
package rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { //连接工厂 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.141"); f.setUsername("admin"); f.setPassword("admin"); //建立连接 Connection c = f.newConnection(); //建立信道 Channel ch = c.createChannel(); //声明队列,如果该队列已经创建过,则不会重复创建 ch.queueDeclare("helloworld",false,false,false,null); System.out.println("等待接收数据"); //收到消息后用来处理消息的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); } }; //消费者取消时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume("helloworld", true, callback, cancel); } }
生产者发送消息时会发送给每一个接收者
生产者:
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //当你生产者有一个队列时,消费者也必须一个队列进行接收。 //通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息 ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接 f.setHost("192.168.64.141"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务的端口号 f.setUsername("admin"); //账号密码 f.setPassword("admin"); Connection connection = f.newConnection(); Channel c = connection.createChannel(); //通信通道 //创建Fanout交换机: logs为消息队列的名字 发布和订阅模式 群发机制 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); while (true){ System.out.println("输入传递的信息:"); String s = new Scanner(System.in).nextLine(); c.basicPublish("logs","",null,s.getBytes()); } } }
消费者:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接 f.setHost("192.168.64.141"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务的端口号 f.setUsername("admin"); //账号密码 f.setPassword("admin"); Connection connection = f.newConnection(); Channel c = connection.createChannel(); //通信通道 //1.创建队列 2.创建交换机 3.进行消息队列的绑定 String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息 //队列名 非持久 独占 自动删除 c.queueDeclare(queue,false,true,true,null); //创建交换机 生产者是什么消息模式就创造什么消息模式 fanout交换机 c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //进行绑定 对fanout交换机来说 第三个参数是无效的 c.queueBind(queue,"logs",""); //正常接收消息 创建回调对象 DeliverCallback deliverCallback =(consumerTag,message) -> { //此处处理消息 String s = new String(message.getBody()); System.out.println("收到:"+s); }; CancelCallback cancelCallback =consumerTag -> { }; //开始接受消息,把消息传递给一个回调对象进行处理 c.basicConsume(queue,true,deliverCallback,cancelCallback); } }
路由模式,设定关键词,会根据生产者发送的关键词进行接收消息。如果关键词不匹配是不会接收消息的。
生产者:
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class Producer1 { public static void main(String[] args) throws IOException, TimeoutException { //当你生产者有一个队列时,消费者也必须一个队列进行接收。 //通过交换机进行发送消息,可以定义消息是否持久。交换机不存储消息需要队列接收消息 ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接 f.setHost("192.168.64.141"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务的端口号 f.setUsername("admin"); //账号密码 f.setPassword("admin"); Connection connection = f.newConnection(); Channel c = connection.createChannel(); //通信通道 //创建路由模式交换机 : Direct c.exchangeDeclare("direct_logs",BuiltinExchangeType.DIRECT); //向交换机发送消息,并携带路由关键词 while (true){ System.out.println("输入消息:"); String s = new Scanner(System.in).nextLine(); System.out.println("输入路由键:"); String k = new Scanner(System.in).nextLine(); //对默认交换机“”,会自动使用队列名作为路由键 c.basicPublish("direct_logs",k,null,s.getBytes()); } } }
消费者:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory f = new ConnectionFactory(); //通过连接工厂进行连接 f.setHost("192.168.64.141"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务的端口号 f.setUsername("admin"); //账号密码 f.setPassword("admin"); Connection connection = f.newConnection(); Channel c = connection.createChannel(); //通信通道 //1.创建队列 2.创建交换机 3.进行消息队列的绑定 String queue = UUID.randomUUID().toString();//创建随机队列 保存得到的消息 //队列名 非持久 独占 自动删除 c.queueDeclare(queue,false,true,true,null); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);//创建路由交换机 System.out.println("输入绑定的关键词,用空格隔开"); String s = new Scanner(System.in).nextLine(); String[] a = s.split("\\s+"); // \s是空白格 + 指一到多个 for (String k : a){ c.queueBind(queue,"direct_logs",k); //进行遍历绑定 } //从队列接收消息 DeliverCallback deliverCallback =(consumerTag, message) -> { //此处处理消息 String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey();//得到路由键 System.out.println(key+"----"+msg); }; CancelCallback cancelCallback =consumerTag -> { }; //开始接受消息,把消息传递给一个回调对象进行处理 c.basicConsume(queue,true,deliverCallback,cancelCallback); } }
发送到Topic交换机的消息,它的的routingKey,必须是由点分隔的多个单词。单词可以是任何东西,但通常是与消息相关的一些特性。几个有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的单词,最多255个字节。
bindingKey也必须采用相同的形式。Topic交换机的逻辑与直连交换机类似——使用特定routingKey发送的消息将被传递到所有使用匹配bindingKey绑定的队列。bindingKey有两个重要的特殊点:
* 可以通配单个单词。
# 可以通配零个或多个单词
生产者:
public class Producer5 { public static void main(String[] args) throws IOException, TimeoutException { //生产者创建交换机 消费者创建队列 //连接 //连接 ConnectionFactory f = new ConnectionFactory();//连接工厂 f.setHost("192.168.64.129"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务端口号 f.setUsername("admin"); //账号 f.setPassword("admin"); //密码 Connection con = f.newConnection(); Channel c = con.createChannel();//通信通道 //创建Topic交换机 :topic_logs 会自动使用队列作为关键词 c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC); //向交换机发送消息,并携带路由关键词 while (true) { System.out.println("输入信息:"); String s = new Scanner(System.in).nextLine(); System.out.println("输入路由键:"); String k = new Scanner(System.in).nextLine(); //对默认交换机“”,会自动使用队列名作为路由键 c.basicPublish("topic_logs", k, null, s.getBytes()); } } }
消费者:
public class Consumer5 { public static void main(String[] args) throws IOException, TimeoutException { //连接 ConnectionFactory f = new ConnectionFactory();//连接工厂 f.setHost("192.168.64.141"); //进行连接对应服务的ip f.setPort(5672); //访问的消息服务端口号 f.setUsername("admin"); //账号 f.setPassword("admin"); //密码 Connection con = f.newConnection(); Channel c = con.createChannel();//通信通道 //1.创建随机队列 2.创建交换机 3.使用绑定建关键词绑定 String queue = UUID.randomUUID().toString(); //非持久,独占,自动删除 c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);//创建交换机 System.out.println("输入绑定建关键词,用空格隔开:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split("\\s+"); // \s是空白字符 + 指一到多个 for (String k : a) { c.queueBind(queue, "topic_logs", k); //进行循环遍历绑定 } //正常接收消息 //正常从队列接收消息 DeliverCallback deliverCallback = (consumerTag, message) -> { //此处处理消息 String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey();//得到路由键 System.out.println(key + "---" + msg); }; CancelCallback cancelCallback = consumerTag -> { }; c.basicConsume(queue, true, deliverCallback, cancelCallback); } }
将创建的队列第二个false改成true即可变成持久操作。
已经创建的队列参数是不能修改的,因为已经上传到服务器中了,可以创建一个新的队列再进行修改后面的参数。生产者与消费者必须一直。