十年河东,十年河西,莫欺少年穷
学无止境,精益求精
上篇博客介绍了RabbitMQ的六种工作模式 RabbitMQ的六种工作模式
RabbitMQ的简单模式和Work工作模式请参考:NetCore RabbitMQ 简介及兔子生产者、消费者 【简单模式,work工作模式,竞争消费】
RabbitMQ发布订阅模式之消息广播请参考:NetCore RabbitMQ 发布订阅模式,消息广播
本篇博客使用NetCore完成RabbitMQ发布订阅模式中的定向模式
何为定向模式?
ExchangeType.Fanout【广播模式】
ExchangeType.Direct【定向模式】
ExchangeType.Topic【通配符模式】
ExchangeType.Headers 【参数匹配模式】
广播模式中,我们通过设定相同的RoutingKey,完成消息的广播,定向模式中,我们只需在队列绑定交换机时赋值不同的RoutingKey且生产者生产消息时指定响应的RoutingKey即可完成消息定向到对应队列。
定向模式创建生产者,分为如下步骤,
1、声明一个交换机
2、声明广播的队列
3、交换机和队列进行绑定【创建不同的routingKey,用于区分绑定规则】
4、生产消息,生产消息时,指定生产消息的routingKey,已达到消息定向的作用
以上步骤用NetCore 实现如下:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqProducer { class Program { /// <summary> /// 本示例红色预警为最高预警级别,蓝色预警次之,黄色预警等级最差 /// 红色预警队列中只展示红色预警消息, /// 蓝色预警队列中展示红色预警和蓝色预警 /// 黄色预警队列展示红、蓝、黄三种预警 /// </summary> /// <param name="args"></param> static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 using (var connection = factory.CreateConnection()) { //rabbitMQ 基于信道进行通信,因此,我们需要实例化信道Channel using (var channel = connection.CreateModel()) { //exchange 交换机名称 //type 交换机类型 ExchangeType.Direct【定向模式】 ExchangeType.Fanout【广播模式】 ExchangeType.Topic【通配符模式】 ExchangeType.Headers 【参数匹配模式】 //durable 是否持久化 //autoDelete 队列是否为临时队列 //arguments 其他配置 详见博客:https://www.cnblogs.com/chenwolong/p/RabbitMQ_S.html //void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments); //声明一个交换机 类型 : Direct string Ename = "ExRabbitMQ_Direct"; channel.ExchangeDeclare(Ename, ExchangeType.Direct, false, false, null); //声明广播的队列 string Qname_Red = "RabbitMQ_Queue_Red"; string Qname_Blue = "RabbitMQ_Queue_Blue"; string Qname_Yellow = "RabbitMQ_Queue_Yellow"; channel.QueueDeclare(Qname_Red, false, false, false, null); channel.QueueDeclare(Qname_Blue, false, false, false, null); channel.QueueDeclare(Qname_Yellow, false, false, false, null); //交换机 队列 绑定 //queue 队列名称 //exchange 交换机名称 //routingKey 路由规则 //void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments); string routingKey_Red = "Red"; // 红色预警 string routingKey_Blue = "Blue"; // 蓝色预警 string routingKey_Yellow = "Yellow"; // 黄色预警 //只接受红色预警 channel.QueueBind(Qname_Red, Ename, routingKey_Red); // Qname_Blue接受两种消息类型 channel.QueueBind(Qname_Blue, Ename, routingKey_Red); channel.QueueBind(Qname_Blue, Ename, routingKey_Blue); // Qname_Yellow 介绍三种消息类型 channel.QueueBind(Qname_Yellow, Ename, routingKey_Red); channel.QueueBind(Qname_Yellow, Ename, routingKey_Blue); channel.QueueBind(Qname_Yellow, Ename, routingKey_Yellow); //发送消息 for(int i = 0; i < 10 ; i++) { var messages_Red = "I am Red Police"; //传递的消息内容 var messages_Blue = "I am Blue Police"; //传递的消息内容 var messages_Yellow = "I am Yellow Police"; //传递的消息内容 //exchange 交换机,如果使用默认的交换机,那么routingKey要和队列的名称一致 //routingKey:路由 //basicProperties : 用于基础属性设置 ///BasicPublish(this IModel model, string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body); channel.BasicPublish(Ename, routingKey_Red, null, Encoding.UTF8.GetBytes(messages_Red + "_" + i)); //生产红色预警消息 channel.BasicPublish(Ename, routingKey_Blue, null, Encoding.UTF8.GetBytes(messages_Blue + "_" + i)); //生产蓝色预警消息 channel.BasicPublish(Ename, routingKey_Yellow, null, Encoding.UTF8.GetBytes(messages_Yellow + "_" + i)); //生产黄色预警消息 } } } Console.Read(); } } }View Code
执行上述代码,得到如下信息
此时:三个队列中红色预警队列有10条消息,蓝色预警队列有20条消息,黄色预警队列有30条消息
交换机信息,注意交换机的类型为定向模式
【红色预警队列消费者】
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"红色预警队列消费者收到红色预警消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "RabbitMQ_Queue_Red"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }View Code
因为红色预警队列只绑定了红色预警的RoutingKey,因此,该队列只接受红色预警消息
【蓝色预警队列消费者】
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"蓝色预警队列消费者收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "RabbitMQ_Queue_Blue"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }View Code
【黄色预警消费者】
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMqConsumer { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //主机名 factory.UserName = "guest";//使用的用户 factory.Password = "guest";//用户密码 factory.Port = 5672;//端口号 factory.VirtualHost = "/"; //虚拟主机 factory.MaxMessageSize = 1024; //消息最大字节数 //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"黄色预警队列消费者收到消息: {message}"); //确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); Thread.Sleep(100); }; //启动消费者 string Qname = "RabbitMQ_Queue_Yellow"; channel.BasicConsume(Qname, false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); } } }View Code
定向模式中的核心为路由Key的声明、路由Key的绑定、发送带有路由Key的消息,这3点为实现定向发送的核心。
@陈大六的博客