参考地址:https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg
https://www.bilibili.com/video/BV1GU4y1w7Yq?p=7
https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html
在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理
应用场景: 一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况
实现12306短信通知用户
项目运行环境:windows10 vs2019 RabbitMQ
Demo
建立三个控制台项目(RabbitMQ.Producer,RabbitMQ.Consumer01,RabbitMQ.Consumer02)一个类库项目(RabbitMQ.Common)
1.RabbitMQ.Common
(1)RabbitConstant
using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.Common { public class RabbitConstant { public const string QUEUE_HELLO_WORLD = "Hello"; public const string QUEUE_SMS = "WorkQueue"; } }
(2)RabbitUtils.cs
using System; using RabbitMQ.Client; namespace RabbitMQ.Common { public class RabbitUtils { public static ConnectionFactory GetConnection() { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; factory.Port = 5672;//是服务端的端口号,与页面的端口号15672区分开 factory.UserName = "guest"; factory.Password = "guest"; //factory.VirtualHost = "/"; return factory; } } }
2.RabbitMQ.Producer项目
Producer下的SmsSender.cs
using System; using System.Text; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Common; namespace RabbitMQ.Producer.Producer { public class SmsSender { public static void SendMessage() { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { /* *创建队列,声明并创建一个队列,如果队列存在,则使用这个队列 *queue:队列名称ID *durable:是否持久化,false对应不持久化数据,MQ停掉数据就会数据丢失 *exclusive:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用 *exclusive:是否自动删除,false代表连接停掉后不自动删除这个队列 *arguments:其他额外参数为null */ channel.QueueDeclare(queue: RabbitConstant.QUEUE_SMS, durable: true, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 1000; i++) { Sms sms = new Sms($"乘客{i}", $"13600000{i}", "您的车票已预订成功"); string message = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(message); /* * exchange:交换机,暂时用不到,在进行发布订阅时才会用到 * routingKey:路由key * basicProperties:额外的设置属性 * body:要传递的消息字节数组 */ channel.BasicPublish(exchange: "", routingKey: RabbitConstant.QUEUE_SMS, basicProperties: null, body: body); Console.WriteLine($"正在发送内容{message}"); } Console.WriteLine("发送数据成功"); Console.WriteLine("Press [Enter] to exit"); Console.ReadLine(); } } } } }
Producer下的SmsSender.cs
using System; using System.Collections.Generic; using System.Text; namespace RabbitMQ.Producer.Producer { public class Sms { public Sms(string name,string mobile,string content) { Name =name; Moblie=mobile; Content=content; } public string Name { get; set; } public string Moblie { get; set; } public string Content { get; set; } } }
Producer下的Program.cs
using RabbitMQ.Producer.Producer; using System; namespace RabbitMQ.Producer { class Program { static void Main(string[] args) { SmsSender.SendMessage(); } } }
3.RabbitMQ.Consumer01和RabbitMQ.Consumer02(项目代码一致)
Consumer下SmsReceive.cs
using System; using System.Text; using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Common; namespace RabbitMQ.Consumer01.Consumer { public class SmsReceive { public static void ReceiveMessage() { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: RabbitConstant.QUEUE_SMS, durable: true, exclusive: false, autoDelete: false, arguments: null); //如果不写BasicQos,则自动MQ会将所有请求平均发送给消费者 //BasicQos,MQ不再对消费者发送多个请求,而是消费者处理完一个消息后(确认后),在队列中获取一个新的 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine("[*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //Thread.Sleep(30); Console.WriteLine($"SmsSender-发送短信成功:{message}"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: RabbitConstant.QUEUE_SMS, autoAck: false, consumer: consumer); Console.WriteLine("Press [Enter] to exit"); Console.ReadLine(); } } }
Program.cs
using RabbitMQ.Consumer01.Consumer; using System; namespace RabbitMQ.Consumer1 { class Program { static void Main(string[] args) { SmsReceive.ReceiveMessage(); } } }
4.运行
设置多项目运行 右键解决方案--属性--多个启动项目