本文详细介绍了RabbitMQ入门知识,包括RabbitMQ的基本概念、安装配置、核心组件和常用插件,帮助读者快速掌握RabbitMQ入门技巧。
RabbitMQ简介RabbitMQ 是一个由Erlang语言开发的开源消息队列系统。它是实现高级消息队列协议(AMQP)的分布式消息中间件,广泛应用于异步通信、解耦微服务、分布式任务调度等场景。RabbitMQ以其可靠性和灵活性在业界获得了广泛认可。
RabbitMQ 主要用于在分布式系统中实现异步通信。它的作用不仅局限于简单的消息传递,还可以实现以下功能:
RabbitMQ 拥有一系列的特点和优势,使其成为企业级消息传递的首选方案:
在 Windows 上安装 RabbitMQ 需要先安装 Erlang 和 RabbitMQ 本身。
安装 Erlang
rabbitmq-plugins enable rabbitmq_management
命令启用管理插件,然后输入 rabbitmqctl start
启动 RabbitMQ 服务。在 Linux 上安装 RabbitMQ 通常使用包管理工具。
安装 Erlang
sudo apt-get update sudo apt-get install erlang
sudo yum install erlang
sudo apt-get update sudo apt-get install rabbitmq-server sudo service rabbitmq-server start sudo rabbitmq-plugins enable rabbitmq_management
sudo yum install rabbitmq-server sudo systemctl start rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management
RabbitMQ 的基本配置可以通过修改配置文件或使用命令行工具完成。
修改配置文件
/etc/rabbitmq/rabbitmq.conf
,可以编辑此文件来配置 RabbitMQ 的各种参数。listeners.tcp.default = 5672 default_vhost = /myvhost
rabbitmqctl
命令行工具可以方便地进行操作。例如:
rabbitmqctl set_vhost /myvhost rabbitmqctl set_user_tags myuser admin rabbitmqctl add_user myuser mypassword rabbitmqctl set_permissions -p /myvhost myuser ".*" ".*" ".*"
交换器是 RabbitMQ 的核心组件,负责将消息路由到队列。消息首先发送到交换器,由交换器根据路由键(Routing Key)和绑定规则(Binding)将消息转发到合适的队列中。
direct
:严格匹配路由键。fanout
:广播模式,将消息发送给所有绑定的队列。topic
:模糊匹配模式,基于路由键模式匹配队列。headers
:基于路由键的 headers 匹配。import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='my_routing_key', body=message)
队列是消息的临时存储区。消息由生产者发送到交换器,然后由交换器根据绑定规则将消息路由到队列。队列负责存储消息,直到消费者消费这些消息。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='my_queue') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()
绑定用于将队列与交换器关联起来。交换器根据绑定规则将消息路由到队列。绑定可以配置路由键来进一步细化消息路由规则。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') channel.queue_declare(queue='my_queue') channel.queue_bind(exchange='my_exchange', queue='my_queue', routing_key='my_routing_key')
消息是通过 RabbitMQ 进行传输的最小单位。每个消息都包含一个负载(Body)和一些元数据(如路由键、时间戳等)。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='my_routing_key', body=message)RabbitMQ基本操作
发送消息的基本步骤:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') channel.queue_declare(queue='my_queue') channel.queue_bind(exchange='my_exchange', queue='my_queue', routing_key='my_routing_key') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='my_routing_key', body=message) connection.close()
接收消息的基本步骤:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='my_queue') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()
RabbitMQ 提供了消息确认机制,确保消息已被正确接收和处理。消费者在接收到消息后,需显式地确认消息,否则 RabbitMQ 会将未确认的消息重新发送。
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='my_queue') def callback(ch, method, properties, body): print(f"Received {body}") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False) channel.start_consuming()RabbitMQ常用插件介绍
RabbitMQ 提供了一个 Web 管理界面,方便用户管理和监控 RabbitMQ 服务。可以通过 rabbitmq-plugins enable rabbitmq_management
命令启用管理插件。
启用后,可以在浏览器中访问 http://localhost:15672
,使用默认的用户名和密码(通常是 guest
)登录。
除了管理插件外,RabbitMQ 还提供了一些其他有用的插件,如:
这些插件可以通过命令行工具进行启用:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_stompRabbitMQ的简单案例演示
发布/订阅模式是 RabbitMQ 中最常用的模式之一。生产者将消息发送到交换器,交换器再将消息广播到所有绑定的队列中。
# 生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='fanout') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='', body=message) connection.close() # 消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='fanout') channel.queue_declare(queue='my_queue') channel.queue_bind(exchange='my_exchange', queue='my_queue') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()
点对点模式下,消息只会被发送到一个消费者。生产者将消息发送到队列,消费者从队列中消费消息。
# 生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='my_queue') message = "Hello, World!" channel.basic_publish(exchange='', routing_key='my_queue', body=message) connection.close() # 消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='my_queue') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()
除了发布/订阅模式和点对点模式外,RabbitMQ 还支持其他模式,如路由模式(Routing)、通配符模式(Topic)等。
路由模式下,生产者根据路由键将消息发送到交换器,交换器根据路由键将消息路由到特定队列。
# 生产者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='my_routing_key', body=message) connection.close() # 消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='direct') channel.queue_declare(queue='my_queue') channel.queue_bind(exchange='my_exchange', queue='my_queue', routing_key='my_routing_key') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()
通配符模式允许生产者根据路由键模糊匹配将消息发送到特定队列。
# 生产者 import pika connection = pikA.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='topic') message = "Hello, World!" channel.basic_publish(exchange='my_exchange', routing_key='my_routing_key', body=message) connection.close() # 消费者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='my_exchange', exchange_type='topic') channel.queue_declare(queue='my_queue') channel.queue_bind(exchange='my_exchange', queue='my_queue', routing_key='my_routing_key') def callback(ch, method, properties, body): print(f"Received {body}") channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=True) channel.start_consuming()