using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Siia.Model.Infrastructure; using Siia.RedisCache; using Siia.Utility; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Siia.Service.Message { class MessageQueueWorker { private readonly Task _task; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); private readonly EventWaitHandle _readyToStart = new EventWaitHandle(false, EventResetMode.AutoReset); private readonly ConcurrentQueue<MessageQueueRequest> _messagesQueue = new ConcurrentQueue<MessageQueueRequest>(); private readonly ILogger<MessageQueueWorker> _logger; private readonly IRedisService _redisService; public MessageQueueWorker(IServiceProvider serviceProvider) { _redisService = serviceProvider.GetService<IRedisService>(); this._logger = serviceProvider.GetService<ILogger<MessageQueueWorker>>(); this._task = Task.Factory.StartNew(async () => { this._readyToStart.WaitOne(); while (!this._cancellationTokenSource.IsCancellationRequested) { if (this._messagesQueue.TryDequeue(out var message)) { switch (message.MessageQueueType) { case MessageQueueType.SheepRecord: message.Waiter.TrySetResult(await CheckSheepRecord(message.Body as MQ_CreateSheepRecordQueueRequest)); break; default: break; } } else Thread.Sleep(1); } }, this._cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } public void TryProcessMessage(MessageQueueRequest message) => this._messagesQueue.Enqueue(message); public void Start() => this._readyToStart.Set(); public void Shutdown() { this._cancellationTokenSource.Cancel(); _task?.Wait(); this._readyToStart.Dispose(); this._cancellationTokenSource.Dispose(); } public async Task<ActionResponse> CheckSheepRecord(MQ_CreateSheepRecordQueueRequest request) { var existTagList = new List<string>(); foreach (var item in request.EarTagCodeList) { var codeNum = await _redisService.HashIncrementAsync("EarCode_Record_Item", item, async () => { await Task.CompletedTask; return 0; }, 1); if (codeNum > 1) existTagList.Add(item); } await _redisService.KeyExpireAsync("EarCode_Record_Item", DateTime.Now.AddHours(1)); if (existTagList.Any()) return ActionResponse.Succeed(new { ReturnCode = -1, Message = "耳标已建档", Data = existTagList }); return ActionResponse.Fail(-1,""); } } public class MessageQueueFunc { private readonly int _workerCount = 2; private readonly List<MessageQueueWorker> _workerList = new List<MessageQueueWorker>(); public MessageQueueFunc(IServiceProvider serviceProvider) { for (var i = 0; i < _workerCount; i++) { var worker = new MessageQueueWorker(serviceProvider); this._workerList.Add(worker); } } public void Start() { for (int i = 0; i < this._workerCount; i++) { this._workerList[i].Start(); } } public void Shutdown() { for (int i = 0; i < this._workerCount; i++) { this._workerList[i].Shutdown(); } } public async Task<ActionResponse> MessageQueue(MessageQueueRequest request) { var index = (int)(Farmhash.Hash32(request.RoutingKey) % this._workerCount); var worker = this._workerList[index]; var body = request; worker.TryProcessMessage(body); return await body.Waiter.Task; } } public class MessageQueueRequest { public MessageQueueRequest() { Waiter = new TaskCompletionSource<ActionResponse>(); } public MessageQueueType MessageQueueType { get; set; } public string RoutingKey { get; set; } public object Body { get; set; } public TaskCompletionSource<ActionResponse> Waiter { get; set; } } public enum MessageQueueType : byte { /// <summary> /// 羊只建档 /// </summary> SheepRecord = 1 } public class MQ_CreateSheepRecordQueueRequest { public List<string> EarTagCodeList { get; set; } } }View Code