https://www.rabbitmq.com/getstarted.html
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
在上图的模型中,有以下概念:
- 生产者,也就是要发送消息的程序
- 消费者:消息的接受者,会一直等待消息到来。
- 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
所有的中间件技术都是基于
tcp/ip
协议基础之上构建新型的协议规范,只不过rabbitmq
遵循的是amqp
实现步骤:
- 创建连接工程
- 创建连接 connection
- 通过连接获取通道 Channel
- 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
- 准备消息内容
- 发送消息给队列 queue
- 关闭连接
- 关闭通道
public class Producer { public static void main(String[] args) { //1. 创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); //这里要使用自己的IP地址 connectionFactory.setHost("192.168.57.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2. 创建连接 connection connection = connectionFactory.newConnection("生产者"); //3. 通过连接获取通道 Channel channel = connection.createChannel(); //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息 String quequeName = "queuel"; /** * @params1 队列的名称 * @params2 是否要持久化 durable-false * @params3 排他性,是否是独占独立 * @params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除 * @params5 携带的附属参数 */ channel.queueDeclare(quequeName,false,false,false,null); //5. 准备消息内容 String message = "Hello,Consumer"; //6. 发送消息给队列 queue channel.basicPublish("",quequeName,null,message.getBytes()); System.out.println("消息发送成功"); } catch (Exception e) { e.printStackTrace(); }finally { //7. 关闭连接 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } //8. 关闭通道 if (connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
public class Consumer { public static void main(String[] args) { //1. 创建连接工程 ConnectionFactory connectionFactory = new ConnectionFactory(); //这里要使用自己的IP地址 connectionFactory.setHost("192.168.57.129"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2. 创建连接 connection connection = connectionFactory.newConnection("消费者"); //3. 通过连接获取通道 Channel channel = connection.createChannel(); //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息 String quequeName = "queue1"; channel.queueDeclare(quequeName,false,false,false,null); //5.监听消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /* consumerTag:消息者标签,channel.basicConsume可以指定 envelope:消息包内容,可从中获取消息id,消息routing key,交换机,消息和重装标记(收到消息失败后是否需要重新发送) properties:消息属性 body;消息 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key为:"+ envelope.getRoutingKey()); //交换机 System.out.println("交换机为:"+ envelope.getExchange()); //消息id System.out.println("消息id为:"+ envelope.getDeliveryTag()); //收到的消息 System.out.println("接收到的消息:"+ new String(body,"UTF-8")); System.out.println(""); System.out.println("======================================================"); System.out.println(""); } }; channel.basicConsume("queue1", true, consumer); } catch (Exception e) { e.printStackTrace(); }finally { //6. 不关闭资源,一直监听 } } }
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP是一个二进制协议,拥有一些现代化特点:
多信道
、协商式
,异步
,安全
,扩平台
,中立
,高效
。RabbitMQ 是 AMQP协议 的 Erlang的实现。
概念 | 说明 |
---|---|
连接 Connection | 一个网络连接,例如:TCP/IP套接字连接。 |
会话 Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 |
信道 Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端 Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点Broker | 消息中间件的服务节点。一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者 Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者 Producer | 一个向交换机发布消息的客户端应用程序。 |
以 入门案例 为例
newConnection()
方法 , 会进一步封装 Protocol Header 0-9-1
的报文头发送给 Broker
,以此通知Broker
本次交互采用的是 AMQP 0-9-1
协议,紧接着 Broker
返回 Connection.Start
来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK
、Connection.Tune/.Tune-Ok
,Connection.Open/ .Open-Ok
这6 个命令的交互。connection.createChannel
方法。此方法开启信道,其包装的 channel.open
命令发送给 Broker
, 等待 channel.basicPublish
方法,对应的AMQP命令为 Basic.Publish
, 这个命令包含了content Header
和content Body()
。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。Channel.Close和Channl.Close-Ok
与Connetion.Close和Connection.Close-Ok
的命令交互。newConnection()
方法,这个方法会进一步封装 Protocol Header 0-9-1
的报文头发送给Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1
协议,紧接着Broker 返回Connection.Start
来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok
这6 个命令的交互。connection.createChannel
方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok
命令。Basic.Consume
命令(即调用channel.basicConsume
方法〉将Channel 置为接收模式,之后Broker 回执 Basic . Consume - Ok
以告诉消费者客户端准备好消费消息。Basic.Deliver
命令,这个命令和 Basic.Publish
命令一样会携带 Content Header 和Content Body。
Basic.Ack
命令。Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok
的命令交互。个人博客为:
MoYu's HomePage