Net Core教程

.net core 内存队列,解决并发

本文主要是介绍.net core 内存队列,解决并发,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Siia.Model.Infrastructure;
using Siia.RedisCache;
using Siia.Utility;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Siia.Service.Message
{
    class MessageQueueWorker
    {
        private readonly Task _task;
        private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        private readonly EventWaitHandle _readyToStart = new EventWaitHandle(false, EventResetMode.AutoReset);
        private readonly ConcurrentQueue<MessageQueueRequest> _messagesQueue = new ConcurrentQueue<MessageQueueRequest>();
        private readonly ILogger<MessageQueueWorker> _logger;
        private readonly IRedisService _redisService;
        public MessageQueueWorker(IServiceProvider serviceProvider)
        {
            _redisService = serviceProvider.GetService<IRedisService>();
            this._logger = serviceProvider.GetService<ILogger<MessageQueueWorker>>();
            this._task = Task.Factory.StartNew(async () =>
            {
                this._readyToStart.WaitOne();
                while (!this._cancellationTokenSource.IsCancellationRequested)
                {
                    if (this._messagesQueue.TryDequeue(out var message))
                    {
                        switch (message.MessageQueueType)
                        {
                            case MessageQueueType.SheepRecord:
                                message.Waiter.TrySetResult(await CheckSheepRecord(message.Body as MQ_CreateSheepRecordQueueRequest));
                                break;
                            default:
                                break;
                        }
                    }
                    else Thread.Sleep(1);
                }
            }, this._cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        public void TryProcessMessage(MessageQueueRequest message) => this._messagesQueue.Enqueue(message);
        public void Start() => this._readyToStart.Set();
        public void Shutdown()
        {
            this._cancellationTokenSource.Cancel();
            _task?.Wait();
            this._readyToStart.Dispose();
            this._cancellationTokenSource.Dispose();
        }

        public async Task<ActionResponse> CheckSheepRecord(MQ_CreateSheepRecordQueueRequest request)
        {
            var existTagList = new List<string>();
            foreach (var item in request.EarTagCodeList)
            {
                var codeNum = await _redisService.HashIncrementAsync("EarCode_Record_Item", item, async () => { await Task.CompletedTask; return 0; }, 1);
                if (codeNum > 1) existTagList.Add(item);
            }
            await _redisService.KeyExpireAsync("EarCode_Record_Item", DateTime.Now.AddHours(1));
            if (existTagList.Any()) return ActionResponse.Succeed(new { ReturnCode = -1, Message = "耳标已建档", Data = existTagList });
            return ActionResponse.Fail(-1,"");
        }
    }


    public class MessageQueueFunc
    {
        private readonly int _workerCount = 2;
        private readonly List<MessageQueueWorker> _workerList = new List<MessageQueueWorker>();

        public MessageQueueFunc(IServiceProvider serviceProvider)
        {
            for (var i = 0; i < _workerCount; i++)
            {
                var worker = new MessageQueueWorker(serviceProvider);
                this._workerList.Add(worker);
            }
        }
        public void Start()
        {
            for (int i = 0; i < this._workerCount; i++)
            {
                this._workerList[i].Start();
            }
        }
        public void Shutdown()
        {
            for (int i = 0; i < this._workerCount; i++)
            {
                this._workerList[i].Shutdown();
            }
        }
        public async Task<ActionResponse> MessageQueue(MessageQueueRequest request)
        {
            var index = (int)(Farmhash.Hash32(request.RoutingKey) % this._workerCount);
            var worker = this._workerList[index];
            var body = request;
            worker.TryProcessMessage(body);
            return await body.Waiter.Task;
        }

    }

    public class MessageQueueRequest
    {
        public MessageQueueRequest()
        {
            Waiter = new TaskCompletionSource<ActionResponse>();
        }
        public MessageQueueType MessageQueueType { get; set; }
        public string RoutingKey { get; set; }
        public object Body { get; set; }
        public TaskCompletionSource<ActionResponse> Waiter { get; set; }
    }

    public enum MessageQueueType : byte
    {
        /// <summary>
        /// 羊只建档
        /// </summary>
        SheepRecord = 1
    }
    public class MQ_CreateSheepRecordQueueRequest
    {
        public List<string> EarTagCodeList { get; set; }
    }

}
View Code

 

这篇关于.net core 内存队列,解决并发的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!