多线程一知半解?看这篇就够了!转自https://www.cnblogs.com/HaoYangkun/p/14185008.html
<1> thread的内核数据结构,其中有osid,context => CPU寄存器的里面的一些变量。 30 ms
<2>. thread 环境块 :
tls【thread本地存储】, execptionList 的信息。。。。
WinDbg 来给大家演示。。。 32,64 =可以达到clr的层面给大家展示底层知识
.loadby sos clr
<3> 用户模式堆栈 内存溢出的一个异常 【堆栈溢出】
一个线程 分配 1M的堆栈空间,,【参数,局部变量】
<4> 内核模式堆栈
在CLR的线程操作,包括线程同步,大多都是调用底层的win32 函数 ,用户模式的参数需要传递到内核模式。。。
<1> 我们进程启动的时候,会加载很多的dll [托管和非托管的], exe,资源,元数据。。。。
进程启动的时候,我怎么没有看到应用程序域。。。
进程启动的时候,默认会有三个应用程序域。system domain, shared domain[int,long....] ,domain1.
开启一个thread,销毁一个thread 都会通知进程中的dll,attach,detach 标志位。。。
通知dll的目的就是 给thread做准备工作,比如销毁,让这些dll做资源清理。。。。
<2> 时间片切换
8个逻辑处理器,可供8个thread并行执行。。。。
比如说9个thread并发执行。 必然会有一个thread休眠30 ms。。。。
for => palleral for
了解Thread的实例方法。。。 【id,ThreadState】
管理Thread生命周期 Start, Suspend, Resume, Intterupt,Abort。。。 Join 在使用Thread的时候是用的非 常多的。。。
thread = new Thread(new ThreadStart(() => { while (true) { try { Thread.Sleep(1000); textBox1.Invoke(new Action(() =>{ textBox1.AppendText(string.Format("{0},", index++)); })); } catch (Exception ex) { MessageBox.Show(string.Format("{0}, {1}", ex.Message, index)); } } })); thread.Start();
0:017> !ThreadState ab024 User Suspend Pending Legal to Join CLR Owns CoInitialized In Multi Threaded Apartment Fully initialized Sync Suspended
用来中断处于WaitSleepJoin状态的线程。。。。
while(true){ continue.... 效果}
当你调用interrupt的时候,会抛出一个interrupt的异常。。。。
0:007> !ThreadState 202b020 Legal to Join CLR Owns CoInitialized In Multi Threaded Apartment Fully initialized Interruptible
[线程本地存储]
《1》 t1 ,t2 共享 变量 public注意有“锁”的概念
《2》 t1 , t2 各自有一个 变量 internel 没有锁争用的概念
static void Main(string[] args) { var slot = Thread.AllocateNamedDataSlot("username"); //主线程 上 设置槽位,, 也就是hello world 只能被主线程读取,其他线程无法读取 Thread.SetData(slot, "hello world!!!"); var t = new Thread(() => { var obj = Thread.GetData(slot); Console.WriteLine("当前工作线程:{0}", obj); }); t.Start(); var obj2 = Thread.GetData(slot); Console.WriteLine("主线程:{0}", obj2); Console.Read(); }
[ThreadStatic] static string username = string.Empty; static void Main(string[] args) { username = "hello world!!!"; var t = new Thread(() => { Console.WriteLine("当前工作线程:{0}", username); }); t.Start(); Console.WriteLine("主线程:{0}", username); Console.Read();
}
static void Main(string[] args) { ThreadLocal<string> local = new ThreadLocal<string>(); local.Value = "hello world!!!"; var t = new Thread(() => { Console.WriteLine("当前工作线程:{0}", local.Value); }); t.Start(); Console.WriteLine("主线程:{0}", local.Value); Console.Read(); }
【如何禁止编译器优化和Cache读取】
MemoryBarrier、VolatileRead/Write 这些方法到底有什么用处。。。。
在实际项目中,我们都喜欢用Release版本,而不是Debug。。。。
因为Release中做了一些代码和缓存的优化。。。 比如说将一些数据从memory中读取到CPU高速缓存中。
冒泡排序 O(N)2 1w * 1w = 1亿
从结果中可以看到,大概有5倍的差距。。。
在任何时候,不见得release都是好的。。有可能会给你引入一些bug。。。
class Program { static void Main(string[] args) { var path = Environment.CurrentDirectory + "//1.txt"; var list = System.IO.File.ReadAllLines(path).Select(i => Convert.ToInt32(i)).ToList(); for (int i = 0; i < 5; i++) { var watch = Stopwatch.StartNew(); var mylist = BubbleSort(list); watch.Stop(); Console.WriteLine(watch.Elapsed); } Console.Read(); } //冒泡排序算法 static List<int> BubbleSort(List<int> list) { int temp; //第一层循环: 表明要比较的次数,比如list.count个数,肯定要比较count-1次 for (int i = 0; i < list.Count - 1; i++) { //list.count-1:取数据最后一个数下标, //j>i: 从后往前的的下标一定大于从前往后的下标,否则就超越了。 for (int j = list.Count - 1; j > i; j--) { //如果前面一个数大于后面一个数则交换 if (list[j - 1] > list[j]) { temp = list[j - 1]; list[j - 1] = list[j]; list[j] = temp; } } } return list; } } }
static void Main(string[] args) { var isStop = false; var t = new Thread(() => { var isSuccess = false; while (!isStop) { isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = true; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); }
上面这段代码在release环境下出现问题了。。。主线程不能执行结束。。。。
从代码中可以发现,有两个线程在共同一个isStop变量。。。
就是t这个线程会将isStop加载到Cpu Cache中。。。 【release大胆的优化】
两种方法解决:
static void Main(string[] args) { var isStop = 0; var t = new Thread(() => { var isSuccess = false; while (isStop == 0) { Thread.VolatileRead(ref isStop); isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = 1; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); }
thread 我如果想做一个异步任务,就需要开启一个Thread。 具有专有性。。。
ThreadPool =》 如果想做异步任务 只需要向租车公司借用 =》 使用完了就要归还
static void Main(string[] args) { ThreadPool.QueueUserWorkItem((obj) => { var func = obj as Func<string>; Console.WriteLine("我是工作线程:{0}, content={1}", Thread.CurrentThread.ManagedThreadId,func()); }, new Func<string>(() => "hello world")); Console.WriteLine("主线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Console.Read(); }
现在有10个任务,如果用Thread来做,需要开启10个Thread
如果用ThreadPool来做,只需要将10个任务丢给线程池
windbg的角度来看一下两者的区别。。。。
1、区别: DeadThread: 10
static void Main(string[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() => { for (int j = 0; j < 10; j++) { Console.WriteLine("work:{0},tid={1}", i, Thread.CurrentThread.ManagedThreadId); } }); thread.Name = "main" + i; thread.Start(); } Console.Read(); }
2.threadPool解决同样的问题。。。
从windbg中可以看到,当前没有死线程,而是都是默认初始化的。。。
DeadThread: 0 0:014> !threadpool CPU utilization: 4% Worker Thread: Total: 8 Running: 0 Idle: 8 MaxLimit: 2047 MinLimit: 8 Work Request in Queue: 0 Number of Timers: 0 Completion Port Thread:Total: 0 Free: 0 MaxFree: 16 CurrentLimit: 0 MaxLimit: 1000 MinLimit: 8
看到了当前的threadpool,
其中有“工作线程” 和 “IO线程”
工作线程: 给一般的异步任务执行的。。其中不涉及到 网络,文件 这些IO操作。。。 【开发者调用】
IO线程: 一般用在文件,网络IO上。。。 【CLR调用】
8的又来就是因为我有 8个逻辑处理器,也就是说可以8个Thread 并行处理。。。。
总结:
1、threadPool 可以用8个线程 解决 thread 10个线程干的事情,
节省了空间和时间:
时间: 通过各个托管和非托管的dll。。。
空间:teb,osthread结构, 堆栈。
1、ThreadPool 定时器功能
static void Main(string[] args) { ThreadPool.RegisterWaitForSingleObject(new AutoResetEvent(true), new WaitOrTimerCallback((obj, b) => { //做逻辑判断,判断是否在否以时刻执行。。。 Console.WriteLine("obj={0},tid={1}, datetime={2}", obj, Thread.CurrentThread.ManagedThreadId, DateTime.Now); }), "hello world", 1000, false); Console.Read(); }
一般在使用Timer的时候,有一个延期执行的功能。
windbg 来看一下底层线程是什么样的。。。。
ID OSID ThreadOBJ State GC Mode GC Alloc Context Domain Count Apt Exception 0 1 3f54 01157bc8 2a020 Preemptive 02E8A3E4:00000000 01152258 1 MTA 5 2 2594 011678f8 2b220 Preemptive 00000000:00000000 01152258 0 MTA (Finalizer) 6 3 3c28 01189990 1020220 Preemptive 00000000:00000000 01152258 0 Ukn (Threadpool Worker) 7 4 121c 0118a2c0 8029220 Preemptive 02E8D8A4:00000000 01152258 0 MTA (Threadpool Completion Port) 8 5 28f4 0118bd70 8029220 Preemptive 00000000:00000000 01152258 0 MTA (Threadpool Completion Port) 0:009> !threadpool CPU utilization: 9% Worker Thread: Total: 0 Running: 0 Idle: 0 MaxLimit: 2047 MinLimit: 8 Work Request in Queue: 0 -------------------------------------- Number of Timers: 0 -------------------------------------- Completion Port Thread:Total: 2 Free: 2 MaxFree: 16 CurrentLimit: 2 MaxLimit: 1000 MinLimit: 8
System.threading 下面有timer
System.Timer 下面Timer。。。
System.Windows.Form 下面Timer。。。
System.Web.UI 下面Timer。。。
0:009> !threads ThreadCount: 4 UnstartedThread: 0 BackgroundThread: 3 PendingThread: 0 DeadThread: 0 Hosted Runtime: no Lock ID OSID ThreadOBJ State GC Mode GC Alloc Context Domain Count Apt Exception 0 1 2d74 00f785c8 2a020 Preemptive 02E360F0:00000000 00f72030 1 MTA 5 2 3784 00f87ea0 2b220 Preemptive 00000000:00000000 00f72030 0 MTA (Finalizer) 6 3 2dc4 00faae18 102a220 Preemptive 00000000:00000000 00f72030 0 MTA (Threadpool Worker) 7 4 3e34 00fab748 1029220 Preemptive 02E3D4D0:00000000 00f72030 0 MTA (Threadpool Worker)
底层有一个队列 TimerQueue instance2 = TimerQueue.Instance; internal class TimerQueue
Timer 首先是用 ThreadPool.UnsafeQueueUserWorkItem(waitCallback, timer); 来完成定时功能。。
因为处理的功能太少:
例:1、我希望早上8点执行。。。
2、我希望明天8点执行。。。
3、我希望每天8点执行。。。
4、我希望每个月的8号执行。。。
5、我希望下个月8号执行,排除双休日。。。
6、半个小时执行一次。。。
所以用第四种方法执行这些任务
↓
Quartz.dll(详情请查文档)
Task task = new Task(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); }); task.Start(); Console.Read();
//使用TaskFactory启动 var task = Task.Factory.StartNew(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); });
//使用Task的Run方法 var task = Task.Run(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); });
//这个是同步执行。。。。也就是阻塞执行。。。 var task = new Task(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); }); task.RunSynchronously();
我们的Task底层都是由不同的TaskScheduler支撑的。。。
TaskScheduler 相当于Task的CPU处理器。。。
默认的TaskScheduler是ThreadPoolTaskScheduler。。。
wpf中的TaskScheduler是 SynchronizationContextTaskScheduler
ThreadPoolTaskScheduler
this.m_taskScheduler.InternalQueueTask(this);
大家也可以自定义一些TaskScheduler。。。。
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
让Task具有返回值。。。 它的父类其实就是Task。。
具体的启动方式和Task是一样的。。。
【这些都是task的核心】
Thread t = new Thread(() => { System.Threading.Thread.Sleep(100); Console.WriteLine("我是工作线程1"); }); Thread t2 = new Thread(() => { System.Threading.Thread.Sleep(100); Console.WriteLine("我是工作线程2"); }); t.Start(); t2.Start(); t.Join(); // t1 && t2 都完成了 WaitAll操作。。。 WaitAny t1 || t2 t2.Join(); Console.WriteLine("我是主线程"); Console.Read();
Task:
static void Main(string[] args) { Task task1 = new Task(() =>{ System.Threading.Thread.Sleep(1000); Console.WriteLine("我是工作线程1:{0}", DateTime.Now); }); task1.Start(); Task task2 = new Task(() => { System.Threading.Thread.Sleep(2000); Console.WriteLine("我是工作线程2:{0}", DateTime.Now); }); task2.Start(); Task.WhenAll(task1, task2).ContinueWith(t => { //执行“工作线程3”的内容 Console.WriteLine("我是工作线程 {0}", DateTime.Now); }); Console.Read(); }
WhenAll
WhenAny
Task工厂中的一些延续操作。。。
ContinueWhenAll
Task.Factory.ContinueWhenAll(new Task[2] { task1, task2 }, (t) => { //执行“工作线程3”的内容 Console.WriteLine("我是主线程 {0}", DateTime.Now); });
ContinueWhenAny
介绍Task的7种阻塞方式 + 延续
如果会打组合拳,task异步任务还是写的非常漂亮。。。。
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main(string[] args) { Task task1 = new Task(() => { System.Threading.Thread.Sleep(1000); Console.WriteLine("我是工作线程1:{0}", DateTime.Now); }); task1.Start(); Task task2 = new Task(() => { System.Threading.Thread.Sleep(2000); Console.WriteLine("我是工作线程2:{0}", DateTime.Now); }); task2.Start(); //Task.WhenAny(task1, task2).ContinueWith(t => //{ // //执行“工作线程3”的内容 // Console.WriteLine("我是主线程 {0}", DateTime.Now); //}); //Task.WhenAll(task1, task2).ContinueWith(t => //{ // //执行“工作线程3”的内容 // Console.WriteLine("我是工作线程 {0}", DateTime.Now); //}); Console.Read(); } } }
public Task(Action action, TaskCreationOptions creationOptions); // // 摘要: // 指定可控制任务的创建和执行的可选行为的标志。 [Flags] public enum TaskCreationOptions { // // 摘要: // 指定应使用默认行为。 None = 0, // // 摘要: // 提示 System.Threading.Tasks.TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。 PreferFairness = 1, // // 摘要: // 指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。它会向 System.Threading.Tasks.TaskScheduler // 提示,过度订阅可能是合理的。您可以通过过度订阅创建比可用硬件线程数更多的线程。 LongRunning = 2, // // 摘要: // 指定将任务附加到任务层次结构中的某个父级。有关详细信息,请参阅 已附加和已分离的子任务。 AttachedToParent = 4, // // 摘要: // 如果尝试附有子任务到创建的任务,指定 System.InvalidOperationException 将被引发。 DenyChildAttach = 8, // // 摘要: // 防止环境计划程序被视为已创建任务的当前计划程序。这意味着像 StartNew 或 ContinueWith 创建任务的执行操作将被视为 System.Threading.Tasks.TaskScheduler.Default // 当前计划程序。 HideScheduler = 16 }
public Task ContinueWith(Action continuationAction, TaskContinuationOptions continuationOptions);
TaskCreationOptions :
Task task = new Task(() => { Task task1 = new Task(() => { Thread.Sleep(100); Console.WriteLine("task1"); }, TaskCreationOptions.AttachedToParent); Task task2 = new Task(() => { Thread.Sleep(10); Console.WriteLine("task2"); }, TaskCreationOptions.AttachedToParent); task1.Start(); task2.Start(); }); task.Start(); task.Wait(); //task.WaitAll(task1,task2); Console.WriteLine("我是主线程!!!!"); Console.Read();
DenyChildAttach: 不让子任务附加到父任务上去。。。
static void Main(string[] args) { Task task = new Task(() => { Task task1 = new Task(() => { Thread.Sleep(100); Console.WriteLine("task1"); }, TaskCreationOptions.AttachedToParent); Task task2 = new Task(() => { Thread.Sleep(10); Console.WriteLine("task2"); }, TaskCreationOptions.AttachedToParent); task1.Start(); task2.Start(); }, TaskCreationOptions.DenyChildAttach); task.Start(); task.Wait(); //task.WaitAll(task1,task2); Console.WriteLine("我是主线程!!!!"); Console.Read(); }
HideScheduler: 子任务默认不使用父类的Task的Scheduler。。。而是使用默认的。
LongRunning:如果你明知道是长时间运行的任务,建议你使用此选项。。建议使用 “Thread” 而不是“threadPool"。
如果长期租用不还给threadPool,threadPool为了满足市场需求,会新开一些线程。。。 满足当前使用,如果此时租用线程被归还,这会导致ThreadPool的线程过多,销毁和 调度都是一个很大的麻烦。。。
try
{
this.m_taskScheduler.InternalQueueTask(this);
}
catch (ThreadAbortException exceptionObject)
{
this.AddException(exceptionObject);
this.FinishThreadAbortedTask(true, false);
}
internal void InternalQueueTask(Task task)
{
task.FireTaskScheduledIfNeeded(this);
this.QueueTask(task);
}
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
PreferFairness: 给你的感觉就是一个”queue“的感觉。。。
会将Task放入到ThreadPool的全局队列中。。。。 让work thread进行争抢。。。。
默认情况会放到task的一个本地队列中。。。。
// System.Threading.ThreadPoolWorkQueue [SecurityCritical] public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals threadPoolWorkQueueThreadLocals = null; if (!forceGlobal) { threadPoolWorkQueueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; } if (this.loggingEnabled) { FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); } if (threadPoolWorkQueueThreadLocals != null) { threadPoolWorkQueueThreadLocals.workStealingQueue.LocalPush(callback); } else { ThreadPoolWorkQueue.QueueSegment queueSegment = this.queueHead; while (!queueSegment.TryEnqueue(callback)) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref queueSegment.Next, new ThreadPoolWorkQueue.QueueSegment(), null); while (queueSegment.Next != null) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, queueSegment.Next, queueSegment); queueSegment = this.queueHead; } } } this.EnsureThreadRequested(); }
// // 摘要: // Default = "Continue on any, no task options, run asynchronously" 指定应使用默认行为。默认情况下,完成前面的任务之后将安排运行延续任务,而不考虑前面任务的最终 // System.Threading.Tasks.TaskStatus。 None = 0, // // 摘要: // 提示 System.Threading.Tasks.TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。 PreferFairness = 1, // // 摘要: // 指定某个任务将是运行时间长、粗粒度的操作。它会向 System.Threading.Tasks.TaskScheduler 提示,过度订阅可能是合理的。 LongRunning = 2, // // 摘要: // 指定将任务附加到任务层次结构中的某个父级。 AttachedToParent = 4, // // 摘要: // 如果尝试附有子任务到创建的任务,指定 System.InvalidOperationException 将被引发。 DenyChildAttach = 8, // // 摘要: // 防止环境计划程序被视为已创建任务的当前计划程序。这意味着像 StartNew 或 ContinueWith 创建任务的执行操作将被视为 System.Threading.Tasks.TaskScheduler.Default // 当前计划程序。 HideScheduler = 16, // // 摘要: // 在延续取消的情况下,防止延续的完成直到完成先前的任务。 LazyCancellation = 32, // // 摘要: // 指定不应在延续任务前面的任务已完成运行的情况下安排延续任务。此选项对多任务延续无效。 NotOnRanToCompletion = 65536, // // 摘要: // 指定不应在延续任务前面的任务引发了未处理异常的情况下安排延续任务。此选项对多任务延续无效。 NotOnFaulted = 131072, // // 摘要: // 指定只应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。 OnlyOnCanceled = 196608, // // 摘要: // 指定不应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。 NotOnCanceled = 262144, // // 摘要: // 指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。此选项对多任务延续无效。 OnlyOnFaulted = 327680, // // 摘要: // 指定只应在延续任务前面的任务已完成运行的情况下才安排延续任务。此选项对多任务延续无效。 OnlyOnRanToCompletion = 393216, // // 摘要: // 指定应同步执行延续任务。指定此选项后,延续任务将在导致前面的任务转换为其最终状态的相同线程上运行。如果在创建延续任务时已经完成前面的任务,则延续任务将在创建此延续任务的线程上运行。只应同步执行运行时间非常短的延续任务。 ExecuteSynchronously = 524288折叠
Cancellation 判断任务的取消。。。 Thread abort
task1 -> continuewith task2 -> continuewith -> task3
就是说,continuewith的时候,预先判断了source.token的值,结果发现任务已经取消。
这个时候,task2就不会执行了。。,但是task3和task2有延续。。。
有因为task2 和task1已经没有延续关系了。。。所以 task1和task3可以并行,
看似continuewith的关系得不到延续。。。。【并行】
TaskContinuationOptions.LazyCancellation 它的本质就是:
需要等待task1执行完成之后再判断source.token的状态。。。。 这样的话,
首先就形成了一条链: task1 -> task2 -> task3...
static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); source.Cancel(); Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }, source.Token, TaskContinuationOptions.LazyCancellation, TaskScheduler.Current); var task3 = task2.ContinueWith(t => { Console.WriteLine("task3 tid={0}, dt={1} {2}", Thread.CurrentThread.ManagedThreadId,DateTime.Now, task2.Status); }); task1.Start(); Console.Read(); }
Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); },TaskContinuationOptions.ExecuteSynchronously);
前面表示延续任务必须在前面task非完成状态才能执行。。
后面表示延续任务必须在前面task完成状态才能执行。。。
Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); throw new Exception("hello world"); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }, TaskContinuationOptions.NotOnRanToCompletion);
static void Main(string[] args) { var isStop = false; var thread = new Thread(() => { while (!isStop) { Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }); thread.Start(); Thread.Sleep(1000); isStop = true; Console.Read(); }
CancellationTokenSource 远比 isStop这个变量强的多。。。
使用cancel实现isStop同样的功能。。。 强在哪里????
<1> 当任务取消的时候,我希望有一个函数能够被触发,这个触发可以做一些资源的清理,
又或者是更新数据库信息。。。
CancellationTokenSource source = new CancellationTokenSource(); source.Token.Register(() => { //如果当前的token被取消,此函数将会被执行 Console.WriteLine("当前source已经被取消,现在可以做资源清理了。。。。"); }); var task = Task.Factory.StartNew(() => { while (!source.IsCancellationRequested) { Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }, source.Token); Thread.Sleep(1000); source.Cancel(); Console.Read();
我想2秒之后自动取消,N秒。。。。 webservice。。。wcf。
《1》 CancelAfter
source.CancelAfter(new TimeSpan(0, 0, 0, 1));
《2》 CancellationTokenSource 的构造函数中进行取消
CancellationTokenSource source = new CancellationTokenSource(1000);
<3> 取消的组合 将CancellationTokenSource 组合成一个链表
其中任何一个CancellationTokenSource被取消,组合source也会被取消。。。 var s3= s1 && s2; static void Main(string[] args) { CancellationTokenSource source1 = new CancellationTokenSource(); //现在要让source1取消 //source1.Cancel(); CancellationTokenSource source2 = new CancellationTokenSource(); source2.Cancel(); var combineSource = CancellationTokenSource.CreateLinkedTokenSource(source1.Token, source2.Token); Console.WriteLine("s1={0} s2={1} s3={2}", source1.IsCancellationRequested, source2.IsCancellationRequested, combineSource.IsCancellationRequested); Console.Read(); }
//如果一个任务被取消,我希望代码抛出一个异常。。。 if(IsCancellationRequested) throw new Exception("adasdaf"); // == 等价操作 == throwIfCancellationRequested(); CancellationTokenSource source1 = new CancellationTokenSource(); //现在要让source1取消 //source1.Cancel(); CancellationTokenSource source2 = new CancellationTokenSource(); source2.Cancel(); var combineSource = CancellationTokenSource.CreateLinkedTokenSource(source1.Token, source2.Token); Console.WriteLine("s1={0} s2={1} s3={2}", source1.IsCancellationRequested, source2.IsCancellationRequested, combineSource.IsCancellationRequested); Console.Read(); //---------------------------------------------------------------------------------------- //例: using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); source.Token.Register(() => { //如果当前的token被取消,此函数将会被执行 Console.WriteLine("当前source已经被取消,现在可以做资源清理了。。。。"); }); var task = Task.Factory.StartNew(() => { while (true) { source.Token.ThrowIfCancellationRequested(); Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }, source.Token); Thread.Sleep(1000); source.Cancel(); Thread.Sleep(100); Console.WriteLine(task.Status); Console.Read(); } } }折叠
最近在看一个同事的代码,代码的本意是在main方法中开启10个线程,用这10个线程来处理一批业务逻辑,在某一时刻当你命令console退出的时候,这个时候不是立即让console退出,而是需要等待10个线程把检测状态之后的业务逻辑执行完之后再退出,这样做是有道理的,如果强行退出会有可能造成子线程的业务数据损坏,没毛病吧,业务逻辑大概就是这样。
一、现实场景
由于真实场景的代码比较复杂和繁琐,为了方便演示,我将同事所写的代码抽象一下,类似下面这样,看好了咯~~~
class Program { private static int workThreadNums = 0; private static bool isStop = false; static void Main(string[] args) { var tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = Task.Factory.StartNew((obj) => { Run(); }, i); } //是否退出 string input = Console.ReadLine(); while ("Y".Equals(input, StringComparison.OrdinalIgnoreCase)) { break; } isStop = true; while (workThreadNums != 0) { Console.WriteLine("正在等待线程结束,当前还在运行线程有:{0}", workThreadNums); Thread.Sleep(10); } Console.WriteLine("准备退出了。。。"); Console.Read(); Environment.Exit(0); } static void Run() { try { workThreadNums++; while (true) { if (isStop) break; Thread.Sleep(1000); //执行业务逻辑 Console.WriteLine("我是线程:{0},正在执行业务逻辑", Thread.CurrentThread.ManagedThreadId); } } finally { workThreadNums--; } } }折叠
其实扫一下上面的代码应该就知道是用来干嘛的,业务逻辑没毛病,基本可以实现刚才的业务场景,在console退出的时候可以完全确保10个线程都把自己的业务逻辑处理完毕了。不过从美观角度上来看,这种代码就太low了。。。一点档次都没有,比如存在下面两点问题:
第一点:局部变量太多,又是isStop又是workThreaNums,导致业务逻辑Run方法中掺杂了很多的非业务逻辑,可读性和维护性都比较low。
第二点:main函数在退出的时候用while检测workThreadNums是否为“0”,貌似没问题,但仔细想想这段代码有必要吗?
接下来我把代码跑一下,可以看到这个while检测到了在退出时的workThredNums的中间状态“7”,有点意思吧
二、代码优化
那上面这段代码怎么优化呢?如何踢掉业务逻辑方法中的非业务代码呢?当然应该从业务逻辑上考虑一下了,其实这个问题的核心就是两点:
1、 如何实现多线程中的协作取消?
2、如何实现多线程整体执行完毕通知主线程?
这种场景优化千万不要受到前人写的代码所影响,最好忘掉就更好了,不然你会下意识的受到什么workthreadnums,isstop这些变量的左右,不说废话了,如果你对task并发模型很熟悉的话,你的优化方案很快就会出来的。。。
1、协作取消:
直接用一个bool变量来判断子线程是否退出的办法其实是很没有档次的,在net 4.0中有一个类(CancellationTokenSource)专门来解决使用bool变量来判断的这种很low的场景,而且比bool变量具有更强大的功能,这个会在以后的文章中跟大家去讲。
2、多线程整体执行完毕通知主线程
目前我们看到的方式是主线程通过轮询workthreadnums这种没有档次的方式去做的,其实这种方式本质上就是任务串行,而如果你明白task的话,你就知道有很多的手段是执行任务串行的,比如什么ContinueWith,WhenAll,WhenAny等等方式,所以你只需要将一组task串联到WhenAll之后就可以了。好了,上面就是我的解决思路,接下来看一下代码吧:
class Program { static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); var tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = Task.Factory.StartNew((m) => { Run(source.Token); }, i); } Task.WhenAll(tasks).ContinueWith((t) => { Console.WriteLine("准备退出了。。。"); Console.Read(); Environment.Exit(0); }); string input = Console.ReadLine(); while ("Y".Equals(input, StringComparison.OrdinalIgnoreCase)) { source.Cancel(); } Console.Read(); } static void Run(CancellationToken token) { while (true) { if (token.IsCancellationRequested) break; Thread.Sleep(1000); //执行业务逻辑 Console.WriteLine("我是线程:{0},正在执行业务逻辑", Thread.CurrentThread.ManagedThreadId); } } }
单从代码量上面看就缩减了17行代码,而且业务逻辑也非常的简单明了,然后再看业务逻辑方法Run,其实你根本就不需要所谓的workThreadNums++,--的操作,而且多线程下不用锁的话,还容易出现竞态的问题,解决方案就是使用WhenAll等待一组Tasks完成任务,之后再串行要退出的Task任务,是不是很完美,而协作取消的话,只需将取消的token传递给业务逻辑方法,当主线程执行source.Cancel()方法取消的时候,子线程就会通过IsCancellationRequested感知到主线程做了取消操作。
protected List<long> ExecuteOR(IFilterCore filterCore, List<FilterValueItem> fieldValueItemList) { List<long> customerIDList = new List<long>(); try { //多线程处理 Task<List<long>>[] tasks = new Task<List<long>>[fieldValueItemList.Count]; for (int i = 0; i < fieldValueItemList.Count; i++) { tasks[i] = Task.Factory.StartNew((fieldValueItem) => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件:{0}", filterCore.GetType().Name))) { List<long> smallCustomerIDList = null; try { smallCustomerIDList = filterCore.Filter((FilterValueItem)fieldValueItem); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return smallCustomerIDList; } }, fieldValueItemList[i]); } Task.WhenAll(tasks).ContinueWith(t => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件 追加List时间: {0}", filterCore.GetType().Name))) { foreach (var task in tasks) { customerIDList.AddRange(task.Result); } } }, TaskContinuationOptions.OnlyOnRanToCompletion).Wait(); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return customerIDList; }折叠
1、Result 多了此属性
<1> 获取Task的返回值: Wait();
public class Task<TResult> : Task Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); task1.Wait(); Console.WriteLine(task1.Result); Console.Read(); // System.Threading.Tasks.Task [__DynamicallyInvokable] public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) { if (millisecondsTimeout < -1) { throw new ArgumentOutOfRangeException("millisecondsTimeout"); } if (!this.IsWaitNotificationEnabledOrNotRanToCompletion) { return true; } if (!this.InternalWait(millisecondsTimeout, cancellationToken)) { return false; } if (this.IsWaitNotificationEnabledOrNotRanToCompletion) { this.NotifyDebuggerOfWaitCompletionIfNecessary(); if (this.IsCanceled) { cancellationToken.ThrowIfCancellationRequested(); } this.ThrowIfExceptional(true); } return true; }
<2> 直接TResult。。。
// System.Threading.Tasks.Task<TResult> internal TResult GetResultCore(bool waitCompletionNotification) { if (!base.IsCompleted) { base.InternalWait(-1, default(CancellationToken)); } if (waitCompletionNotification) { base.NotifyDebuggerOfWaitCompletionIfNecessary(); } if (!base.IsRanToCompletion) { base.ThrowIfExceptional(true); } return this.m_result; }
2、ContinueWith 也可以具有返回值
static void Main() { Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); var task2 = task1.ContinueWith<string>(t => { int num = t.Result; var sum = num + 10; return sum.ToString(); }); Console.WriteLine(task2.Result); Console.Read(); }
3、Task.WhenAll/ WhenAny
static void Main() { Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); Task<int> task2 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 2; }); var task = Task.WhenAll<int>(new Task<int>[2] { task1, task2 }); var result = task.Result; Console.WriteLine(task2.Result); Console.Read(); }
public void Handle(Func<Exception, bool> predicate) { if (predicate == null) { throw new ArgumentNullException("predicate"); } List<Exception> list = null; for (int i = 0; i < this.m_innerExceptions.Count; i++) { if (!predicate(this.m_innerExceptions[i])) { if (list == null) { list = new List<Exception>(); } list.Add(this.m_innerExceptions[i]); } } if (list != null) { throw new AggregateException(this.Message, list); } } //********************************************************************************** //例: using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { var task = Task.Factory.StartNew(() => { var childTask1 = Task.Factory.StartNew(() => { throw new Exception("我是 childTask1 异常"); }, TaskCreationOptions.AttachedToParent); var childTask2 = Task.Factory.StartNew(() => { throw new Exception("我是 childTask2 异常"); }, TaskCreationOptions.AttachedToParent); }); try { try { task.Wait(); } catch (AggregateException ex) { ex.Handle(x => { if (x.InnerException.Message == "我是 childTask1 异常") { return true; } return false; }); } } catch (Exception ex) { } try { } catch (InvalidCastException ex) { throw; } Console.Read(); } } }折叠
Parallel for,foreach invoke
For
//串行计算 for (int i = 0; i < 100; i++) { Console.WriteLine(i); } //并行计算 Parallel.For(0, 100, (item) => { Console.WriteLine(item); });
3.从源码中看一下Parallel.For实现结构
<1> 我们在并行的时候,可以指定当前有几个线程参与计算。。。
<不让所有的thread参与计算,不让cpu过于消耗了。。。>
<2> RangeManager rangeManager = new RangeManager((long)fromInclusive, (long)toExclusive, 1L, nNumExpectedWorkers);
分区函数 【0-100】
8个thread去计算。
100/8 =12 100 % 8 = 4 t1 => 0-11 t2 => 12-24 t3 => 25-36 。。。 ParallelForReplicatingTask : Task
最后会使用ParallelForReplicatingTask 进行处理
不要在Parallel.For中使用break或者stop,或许会给你引入一些不必要的bug。。。
因为大家都是并行执行的,所以别的线程是刹不住车的。。。
static void Main() { ConcurrentStack<int> stack = new ConcurrentStack<int>(); //并行计算 Parallel.For(0, 100, (item, loop) => { if (item == 10) { loop.Stop(); return; } stack.Push(item); }); Console.WriteLine(string.Join(",", stack)); }
3.For的高级重载
public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
聚合函数是一样的。。。 其实就是一个并行的聚合计算。
比如说:我想做一个并行的从 1-100的累积计算。。。
1+2+3+4+5+。。。。+99 =4950
Parallel.For 可以实现一些数组的累计运算。。。
Parallel.ForEach 应对一些集合运算 【非数组】
// System.Threading.Tasks.Parallel private static ParallelLoopResult ForEachWorker<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally) { if (parallelOptions.CancellationToken.IsCancellationRequested) { throw new OperationCanceledException(parallelOptions.CancellationToken); } TSource[] array = source as TSource[]; if (array != null) { return Parallel.ForEachWorker<TSource, TLocal>(array, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); } IList<TSource> list = source as IList<TSource>; if (list != null) { return Parallel.ForEachWorker<TSource, TLocal>(list, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); } return Parallel.PartitionerForEachWorker<TSource, TLocal>(Partitioner.Create<TSource>(source), parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); }
就是说Parallel函数,第一点就是要分区。。
internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning) { this.m_sharedReader = sharedReader; this.m_sharedIndex = new Partitioner.SharedLong(-1L); this.m_hasNoElementsLeft = new Partitioner.SharedBool(false); this.m_sourceDepleted = new Partitioner.SharedBool(false); this.m_sharedLock = new object(); this.m_useSingleChunking = useSingleChunking; if (!this.m_useSingleChunking) { int num = (PlatformHelper.ProcessorCount > 4) ? 4 : 1; this.m_FillBuffer = new KeyValuePair<long, TSource>[num * Partitioner.GetDefaultChunkSize<TSource>()]; } if (isStaticPartitioning) { this.m_activePartitionCount = new Partitioner.SharedInt(0); return; } this.m_activePartitionCount = null; }
字典的分区函数 。。。。
Dictionary<int, int> dic = new Dictionary<int, int>() { {1,100}, {2,200 }, {3,300 } }; Parallel.ForEach(dic, (item) => { Console.WriteLine(item.Key); });
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { //ConcurrentStack<int> stack = new ConcurrentStack<int>(); ////并行计算 //Parallel.For(0, 100, new ParallelOptions() //{ // MaxDegreeOfParallelism = Environment.ProcessorCount - 1, //保证当前还有一个线程不会参与 //}, (item, loop) => //{ // System.Threading.Thread.Sleep(10000000); // stack.Push(item); //}); //Console.WriteLine(string.Join(",", stack)); //var totalNums = 0; //Parallel.For<int>(1, 100, () => { return 0; }, (current, loop, total) => //{ // total += (int)current; // return total; //}, (total) => //{ // Interlocked.Add(ref totalNums, total); //}); //Console.WriteLine(totalNums); //int[] nums = new int[10]; //Parallel.For(0, nums.Length, (item) => //{ // //do logic // var temp = nums[item]; //}); //Dictionary<int, int> dic = new Dictionary<int, int>() //{ // {1,100}, // {2,200 }, // {3,300 } //}; //Parallel.ForEach(dic, (item) => //{ // Console.WriteLine(item.Key); //}); //Parallel.Invoke(() => //{ // Console.WriteLine("我是并行计算1 " + Thread.CurrentThread.ManagedThreadId); //}, () => //{ // Console.WriteLine("我是并行计算2 " + Thread.CurrentThread.ManagedThreadId); //}); } } }折叠
为了能够达到最大的灵活度,linq有了并行的版本。。。
linq to object var nums = Enumerable.Range(0, 100); var query = from n in nums.AsParallel() select new { thread = Thread.CurrentThread.ManagedThreadId, num = n }; foreach (var item in query) { Console.WriteLine(item); }
AsParallel() 可以将串行的代码转换为并行
AsOrdered() 就是将并行结果还是按照 未排序的样子进行排序。。。
asOrdered => orderby
[10,1,2,3,4] => 并行计算.asOrderrd => [10,1,2,3,4]
[10,1,2,3,4] => orderby =>[1,2,3,4,10]
AsUnordered() 不按照原始的顺序排序。。。
AsSequential() <=> AsParallel() 是相对应的。。。。
前者将plinq转换为linq 后者将linq转换为plinq
0:010> !clrstack OS Thread Id: 0x579c (10) Child SP IP Call Site 053bf580 77c0e91c [HelperMethodFrame: 053bf580] System.Threading.Thread.SleepInternal(Int32) 053bf604 7029daba System.Threading.Thread.Sleep(Int32) 053bf60c 00ba18f4 *** WARNING: Unable to verify checksum for C:\1\ConsoleApplication2\ConsoleApplication2\bin\Debug\ConsoleApplication2.exe ConsoleApplication2.Program.GetThreadID() [C:\1\ConsoleApplication2\ConsoleApplication2\Program.cs @ 38] 053bf620 00ba188d ConsoleApplication2.Program+c.b__0_0(Int32) [C:\1\ConsoleApplication2\ConsoleApplication2\Program.cs @ 22] 053bf638 00ba184e System.Linq.Parallel.SelectQueryOperator`2+SelectQueryOperatorResults[[System.Int32, mscorlib],[System.__Canon, mscorlib]].GetElement(Int32) 053bf644 68466a7d System.Linq.Parallel.QueryResults`1[[System.__Canon, mscorlib]].get_Item(Int32) 053bf64c 6845712c System.Linq.Parallel.PartitionedDataSource`1+ListContiguousIndexRangeEnumerator[[System.__Canon, mscorlib]].MoveNext(System.__Canon ByRef, Int32 ByRef) 053bf668 00ba17cf System.Linq.Parallel.PipelineSpoolingTask`2[[System.__Canon, mscorlib],[System.Int32, mscorlib]].SpoolingWork() 053bf684 6846c222 System.Linq.Parallel.SpoolingTaskBase.Work() 053bf6bc 6845a7e0 System.Linq.Parallel.QueryTask.BaseWork(System.Object) 053bf6dc 6845aa42 System.Linq.Parallel.QueryTask+c.b__10_0(System.Object) 053bf6e0 7028dcff System.Threading.Tasks.Task.InnerInvoke() 053bf6ec 7028d934 System.Threading.Tasks.Task.Execute() 053bf710 7028dcba System.Threading.Tasks.Task.ExecutionContextCallback(System.Object) 053bf714 702e1512 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 053bf780 702e1446 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 053bf794 7028db38 System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 053bf7f8 7028da4c System.Threading.Tasks.Task.ExecuteEntry(Boolean) 053bf808 7028d98c System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() 053bf80c 7029b2d3 System.Threading.ThreadPoolWorkQueue.Dispatch() 053bf85c 7029b17a System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 053bfa80 7151ea96 [DebuggerU2MCatchHandlerFrame: 053bfa80]
plinq底层都是用task的。。。。 基于task的一些编程模型,让我们快速进行并行计算的。
WithDegreeOfParallelism:
WithDegreeOfParallelism(Environment.ProcessorCount) 告诉plinq当前8个线程都要参与。。。
WithCancellation: 如果执行之前被取消,那就不要执行了。。。
WithExecutionMode:此参数可以告诉系统当前是否强制并行。。。
public enum ParallelExecutionMode { Default = 0, ForceParallelism = 1 }
Plinq :主要是划分区块,然后对区块进行聚合计算。。。从而达到分而治之。。。
smallsum smallsum smallsum smallsum
-> mergesum <- -> mergesum <-
-> totalsum <-
最灵活的东西莫过于自己去写业务逻辑。。封装的越厉害,灵活性越差,性能自然也越差。。。
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { CancellationTokenSource source = new CancellationTokenSource(); var nums = Enumerable.Range(0, 2).ToList(); nums[0] = 10000; var query = from n in nums.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount) .WithCancellation(source.Token) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) select new { thread = GetThreadID(), num = n }; foreach (var item in query) { Console.WriteLine(item); } Console.Read(); } static int GetThreadID() { return Thread.CurrentThread.ManagedThreadId; } } }折叠
taskSchedule是干嘛的???
我们发现我们的任务执行都要经过Schedule。。。task的核心就是这个Schedule。。。因为他要把任务
安排在线程或者线程池中。。。
RunLonging... Thread。。。。
也就是Task的默认调度形式。。。。ThreadPool
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
在winform,或者wpf中如果给一个控件赋值,都是调用invoke方法。。。
private void button1_Click(object sender, EventArgs e) { Task task = new Task(() => { try { label1.Text = "你好"; } catch (Exception ex) { MessageBox.Show(ex.Message); } }); task.Start(TaskScheduler.FromCurrentSynchronizationContext()); }
var task = Task.Factory.StartNew(() => { //默认耗时操作 Thread.Sleep(1000 * 10); }); task.ContinueWith(t => { label1.Text = "你好"; }, TaskScheduler.FromCurrentSynchronizationContext());
我自定义的scheduler,需要将每一个task都委托到一个thread中去执行。。。
PerThreadTaskScheduler
通过windbg去查看是否真的使用Thread去执行的。。。
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { Task task = new Task(() => { //System.Threading.Thread.Sleep(1000000); Console.WriteLine("hello world!"); }); task.Start(new PerThreadTaskScheduler()); Console.Read(); } } public class PerThreadTaskScheduler : TaskScheduler { /// <summary> /// 给debug用的。 /// </summary> /// <returns></returns> protected override IEnumerable<Task> GetScheduledTasks() { return Enumerable.Empty<Task>(); } /// <summary> /// 执行task /// </summary> /// <param name="task"></param> protected override void QueueTask(Task task) { var thread = new Thread(() => { TryExecuteTask(task); }); thread.Start(); } /// <summary> /// 同步执行 /// </summary> /// <param name="task"></param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns></returns> protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return true; } } }折叠
xxxbegin xxxend 这么两个配对的经典方法。。。。 委托给线程池去执行的。。。 FileStream (ReadBegin, ReadEnd)配对方法 Action委托,都可以异步执行
xxxAsync这样的事件模型。。。。【WebClient】
微软大力推广Task的时候,APM和EAP都能包装成Task使用。。。。微软想用Task来统治我们的异步编程领域。
如果真的被大一统了,大家都可以使用Task的一切东西。。。
static void Main() { FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); var bytes = new byte[fs.Length]; var task = Task.Factory.FromAsync(fs.BeginRead, fs.EndRead, bytes, 0, bytes.Length, string.Empty); var nums = task.Result; Console.WriteLine(nums); }
《1》代码量比较小
《2》使用task更方便,更强大。。。
FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); var bytes = new byte[fs.Length]; fs.BeginRead(bytes, 0, bytes.Length, (aysc) => { var nums = fs.EndRead(aysc); Console.WriteLine(nums); }, string.Empty); Console.Read();
包装EAP的话呢,我们需要用TaskCompletionSource包装器进行包装。。。
aysc await 本质也是用了一个包装器。。。
DownloadDataTaskAsync: 看看这个是怎么用的。。
用task解决了大一统的问题。。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { //FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); //var bytes = new byte[fs.Length]; //var task = Task.Factory.FromAsync(fs.BeginRead, fs.EndRead, bytes, 0, bytes.Length, string.Empty); //var nums = task.Result; //Console.WriteLine(nums); //FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); //var bytes = new byte[fs.Length]; //fs.BeginRead(bytes, 0, bytes.Length, (aysc) => //{ // var nums = fs.EndRead(aysc); // Console.WriteLine(nums); //}, string.Empty); //Console.Read(); //Action action = () => //{ // System.Threading.Thread.Sleep(100000); // Console.WriteLine("hello world!"); //}; //var task = Task.Factory.FromAsync(action.BeginInvoke, action.EndInvoke, string.Empty); ////task.Start(); //Console.Read(); var task = GetTaskAsyc("http://cnblogs.com"); var nums = task.Result; Console.WriteLine(nums.Length); } public static Task<byte[]> GetTaskAsyc(string url) { TaskCompletionSource<byte[]> source = new TaskCompletionSource<byte[]>(); WebClient client = new WebClient(); client.DownloadDataCompleted += (sender, e) => { try { //如果下载完成了,将当前的byte[]个数给task包装器 source.TrySetResult(e.Result); } catch (Exception ex) { source.TrySetException(ex); } }; client.DownloadDataAsync(new Uri(url)); return source.Task; } } }折叠
【潜规则】 ThreadPool IOthread网络IO,文件IO都有一些异步方法。 MemoryStream,FileStream。WebRequest
如果你用同步的思维去理解,容易出问题。。。返回值对不上。 我们在编译器层面看到的代码,不见得是真的代码。。。
通过ILSpy去反编译这段代码。
如果返回值都是Task,你都可以用await进行等待。。
private static void InvokeMoveNext(object stateMachine) { ((IAsyncStateMachine)stateMachine).MoveNext(); }
是应用程序主动使用
是clr反向通知的。。 如果使用同步IO,会是什么样的呢??? ↓
.net锁机制太多了:时间锁,信号量,互斥锁,读写锁,互锁,易变构造
【就是通过一些cpu指令或者一个死循环】在达到thread等待和休眠
一个线程读,一个写,在release的某种情况下,会有debug。。。 Thread.MemoryBarrier , VolatileRead
1.不可以底层对代码进行优化。。 2.我的read和write都是从memrory中读取。。。【我读取的都是最新的】
class Program { public static volatile bool isStop = false; static void Main(string[] args) { //isStop = false; var t = new Thread(() => { var isSuccess = false; while (!isStop) { isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = true; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); } }
Interlocked 【只能做一些简单类型的计算】 Increment:自增操作
Decrement:自减操作
Add: 增加指定的值
Exchange: 赋值
CompareExchange: 比较赋值
特殊的业务逻辑让thread在用户模式下进行自选,欺骗cpu当前thread正在运行中。。。。 用户模式 -> 内核模式 -> 用户模式 数据递增
就是调用win32底层的代码,来实现thread的各种操作 Thread.Sleep
用户模式 + 内核模式 【场景是做多的】 xxxslim
多个线程对一个“共享资源”进行操作的时候,容易出问题。。。 共享资源混乱。。。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { public static SpinLock spinLock = new SpinLock(); static void Main(string[] args) { var sum = 5; var num1 = 10; var num2 = 10; //Interlocked.Exchange(ref sum, 10); // sum=10; //Interlocked.CompareExchange(ref num1, sum, num2); // num1==num2 ; num1=sum; //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { var b = false; spinLock.Enter(ref b); Console.WriteLine(nums++); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { spinLock.Exit(); } } } } }折叠
在万不得已的情况下,不要使用内核模式的锁,因为代价太大。。。 其实我们有更多的方式可以替代:混合锁机制, lock
事件锁
信号量
互斥锁
true:表示终止状态 false:表示非终止 现实中的场景: 进站火车闸机,我们用火车票来实现进站操作。 true: 终止表示: 闸机中没有火车票, 终止=> 初始状态 false: 非终止表示:闸机中此时有一张火车票 static AutoResetEvent areLock = new AutoResetEvent(false); static void Main(string[] args) { areLock.WaitOne(); //塞一张火车票到闸机中,因为此时有票在闸机,所以我只能等待 =》 mainthread Console.WriteLine("火车票检验通过,可以通行"); areLock.Set(); //从闸机中取火车票 }
WaitOne:用来将火车票塞入到闸机中 Set: 从闸机中把票取出来
static AutoResetEvent areLock = new AutoResetEvent(true); static void Main(string[] args) { areLock.WaitOne(); //塞一张火车票到闸机中 =》 mainthread Console.WriteLine("火车票检验通过,可以通行"); areLock.Set(); //从闸机中取火车票 }
ManualResetEvent :现实场景就是 => 有人看守的铁道栅栏
如果有火车马上要来了,这个栅栏会合围起来,阻止行人通过铁路。 如果火车走了,这个栅栏就会从合围状态转为两侧。 行人就可以通过了 true: 栅栏没有合围,没有阻止行人通过铁路 false:栅栏合围了, 阻止行人通过
两者 ManualResetEvent 和 AutoResetEvent 是不一样的,所以不能混用。。。
class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); static ManualResetEvent mreLock = new ManualResetEvent(false); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Thread.Sleep(5000); //5s中之后,火车开走了,这个时候就要撤销栅栏 mreLock.Set(); Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { mreLock.WaitOne(); Console.WriteLine(nums++); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
static Semaphore seLock = new Semaphore(1, 1); 我当前只能是一个线程通过。。 class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); static Semaphore seLock = new Semaphore(1, 10); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { seLock.WaitOne(); Console.WriteLine(nums++); seLock.Release(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); //static Semaphore seLock = new Semaphore(1, 10); static Mutex mutex = new Mutex(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { //seLock.WaitOne(); mutex.WaitOne(); Console.WriteLine(nums++); //seLock.Release(); mutex.ReleaseMutex(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
这三种锁,我们发现都有一个WaitOne方法。。。因为他们都是继承于WaitHandle。。。 三种锁都是同根生,其实底层都是通过SafeWaitHandle来对win32api的一个引用。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); //static Semaphore seLock = new Semaphore(1, 10); static Mutex mutex = new Mutex(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { //seLock.WaitOne(); mutex.WaitOne(); Console.WriteLine(nums++); //seLock.Release(); mutex.ReleaseMutex(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } } }折叠
不是从限定个数的角度出发。 而是按照读写的角度进行功能分区。。。。
sqllite: 库锁
sqlserver: 行锁 【我只锁住行】
多个线程可以一起读, 只能让要给线程去写。。。
模拟:多个线程读,一个线程写,那么写的线程是否会阻止读取的线程。。。。
读写 8/2 开。。。
如果你的写入线程时间太久。。。比如说:10s/20s
这个时候你的读线程会被卡死,从而超时。。。。
Ctrip。。。。。 机票db。。。
商旅事业部: orders,,,
机票事业部: orders。。。
给腾讯做对外接口【企业商旅】
order1 join order2...join plane 读取时间太长,也导致write线程长时间进不来,
同样也导致了写入线程超时。。。
namespace ConsoleApplication1 { class Program { static ReaderWriterLock rwlock = new ReaderWriterLock(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Read(); }); } Task.Factory.StartNew(() => { Write(); }); Console.Read(); } static int nums = 0; /// <summary> /// 线程读 /// </summary> static void Read() { while (true) { Thread.Sleep(10); rwlock.AcquireReaderLock(int.MaxValue); Thread.Sleep(10); Console.WriteLine("当前 t={0} 进行读取 {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); rwlock.ReleaseReaderLock(); } } /// <summary> /// 线程写 /// </summary> static void Write() { while (true) { //3s进行一次写操作 Thread.Sleep(3000); rwlock.AcquireWriterLock(int.MaxValue); Thread.Sleep(3000); Console.WriteLine("当前 t={0} 进行写入。。。。。。。。。。。。。。。。。。。。。。。{1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); rwlock.ReleaseWriterLock(); } } }折叠
限制线程数的一个机制。。。而且这个也非常实用。
Shopex。。。。
多个线程从某一张表中读取数据:
比如说:Orders Products Users
每张表我都喜欢通过多个线程去读取。。。。
比如说: Orders表 10w: 10个线程读取,一个线程1w
Products表:5w 5个线程 一个1w
Users 表 1w 2个线程 5w
xxxx.continuewithcontinuewith.... continuewith + TaskCreationOptions.AttachedToParent CountdownEvent cdeLock = new CountdownEvent(10);
初始化的时候设置一个 默认threadcount上线。。。
当你使用一个thread。这个threacount就会--操作。。直到为0之后,继续下一步
操作,相当于Task.Wait() 执行完成了。
Reset: 重置当前的threadcount上线
Signal:将当前的threadcount--操作
Wait: 相当于我们的Task.WaitAll
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static CountdownEvent cdeLock = new CountdownEvent(10); static void Main(string[] args) { //加载Orders搞定 cdeLock.Reset(10); for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { LoadOrders(); }); } cdeLock.Wait(); Console.WriteLine("所有的Orders都加载完毕。。。。。。。。。。。。。。。。。。。。。"); //加载Product搞定 cdeLock.Reset(5); for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { LoadProducts(); }); } cdeLock.Wait(); Console.WriteLine("所有的Products都加载完毕。。。。。。。。。。。。。。。。。。。。。"); //加载Users搞定 cdeLock.Reset(2); for (int i = 0; i < 2; i++) { Task.Factory.StartNew(() => { LoadUsers(); }); } cdeLock.Wait(); Console.WriteLine("所有的Users都加载完毕。。。。。。。。。。。。。。。。。。。。。"); Console.WriteLine("所有的表数据都执行结束了。。。恭喜恭喜。。。。"); Console.Read(); } /// <summary> /// 10 threads /// </summary> static void LoadOrders() { //具体的业务逻辑,不细说 Console.WriteLine("当前Orders正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } /// <summary> /// 5 threads /// </summary> static void LoadProducts() { //具体的业务逻辑,不细说 Console.WriteLine("当前Products正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } /// <summary> /// 2 threads /// </summary> static void LoadUsers() { //具体的业务逻辑,不细说 Console.WriteLine("当前Users正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } } }折叠
限定线程个数的一把锁 :Monitor Enter 锁住某一个资源
Exit 退出某一个资源
static object lockMe = new object(); static void Run() { for (int i = 0; i < 100; i++) { var b = false; try { //SpinLock Monitor.Enter(lockMe, ref b); Console.WriteLine(nums++); //seLock.Release(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { if (b) Monitor.Exit(lockMe); } } }
给一个什么感觉:为了严谨性,是不是为了加锁区域我都有trycatchfinally..而且还要用
if来判断。。太麻烦。
lock (lockMe) { Console.WriteLine(nums++); }
凡是简化我们编程的方式,基本上都叫语法糖
语法糖:
------------------------------------------ 编译器 ---------------------------------------
该是什么样的还是什么样。
.method private hidebysig static void Run () cil managed { // Method begins at RVA 0x20a0 // Code size 72 (0x48) .maxstack 3 .locals init ( [0] int32, [1] object, [2] bool, [3] bool ) IL_0000: nop IL_0001: ldc.i4.0 IL_0002: stloc.0 IL_0003: br.s IL_003e // loop start (head: IL_003e) IL_0005: nop IL_0006: ldsfld object ConsoleApplication1.Program::lockMe IL_000b: stloc.1 IL_000c: ldc.i4.0 IL_000d: stloc.2 .try { IL_000e: ldloc.1 IL_000f: ldloca.s 2 IL_0011: call void [mscorlib]System.Threading.Monitor::Enter(object, bool&) IL_0016: nop IL_0017: nop IL_0018: ldsfld int32 ConsoleApplication1.Program::nums IL_001d: dup IL_001e: ldc.i4.1 IL_001f: add IL_0020: stsfld int32 ConsoleApplication1.Program::nums IL_0025: call void [mscorlib]System.Console::WriteLine(int32) IL_002a: nop IL_002b: nop IL_002c: leave.s IL_0039 } // end .try finally { IL_002e: ldloc.2 IL_002f: brfalse.s IL_0038 IL_0031: ldloc.1 IL_0032: call void [mscorlib]System.Threading.Monitor::Exit(object) IL_0037: nop IL_0038: endfinally } // end handler IL_0039: nop IL_003a: ldloc.0 IL_003b: ldc.i4.1 IL_003c: add IL_003d: stloc.0 IL_003e: ldloc.0 IL_003f: ldc.i4.s 100 IL_0041: clt IL_0043: stloc.3 IL_0044: ldloc.3 IL_0045: brtrue.s IL_0005 // end loop IL_0047: ret } // end of method Program::Run折叠
因为众多的锁机制中,唯独只有Monitor有专用的语法糖。。。。所以说非常受重视。。。
本质就是利用堆上的同步块实现资源锁定。。。
Enter中添加的对象,相当于把对象的同步块索引和CLR的同步块数组了关联。
Exit中释放的资源,相当于把对象的同步快索引和CLR的同步块数组进行了解绑。
你锁住的资源一定要让你的可访问的线程必须能够访问到。。。
所以锁住的资源千万不要使用值类型。。。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static Person personLockMe = new Person(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { Person person = new Person(); for (int i = 0; i < 100; i++) { lock (person) { Console.WriteLine(nums++); } //var b = false; //try //{ // //SpinLock // Monitor.Enter(lockMe, ref b); // Console.WriteLine(nums++); // //seLock.Release(); //} //catch (Exception ex) //{ // Console.WriteLine(ex.Message); //} //finally //{ // if (b) Monitor.Exit(lockMe); //} } } } class Person { } }折叠
Thread.Sleep(1) :让线程休眠1ms
Thread.Sleep(0) :让线程放弃当前的时间片,让本线程更高或者同等线程得到时间片运行。
Thread.Yield() :让线程立即放弃当前的时间片,可以让更低级别的线程得到运行,当其他thread时间片用完,本thread再度唤醒。。
Yield < Sleep(0) < Sleep(1) 一个时间片 = 30ms。。
通常会用到用户模式锁。。。while + 这些Thread。。。
SemaphoreSlim:
ManualResetEventSlim: 有人看守的火车轨道标志,栅栏是合围状态
ReaderWriterLockSlim:不用说,他们比之前的内核版本,性能要高得多。。。
具体使用,前面的课程已经和大家聊过了,这次只是看一下不同。。。
// System.Threading.WaitHandle [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);
for (int i = 0; i < spinCount; i++) { if (this.IsSet) { return true; } if (i < num2) { if (i == num2 / 2) { Thread.Yield(); } else { Thread.SpinWait(PlatformHelper.ProcessorCount * (4 << i)); } } else { if (i % num4 == 0) { Thread.Sleep(1); } else { if (i % num3 == 0) { Thread.Sleep(0); } else { Thread.Yield(); } } } if (i >= 100 && i % 10 == 0) { cancellationToken.ThrowIfCancellationRequested(); } }
其他的方式基本上和原来的内核版本保持一致。。。
class Program { //默认1个线程同时运行,最大10个 static SemaphoreSlim slim = new SemaphoreSlim(1, 10); static void Main(string[] args) { for (int i = 0; i < 10; i++) { Task.Run(() => { Run(); }); } //某一个时刻,我像改变默认的并行线程个数,从默认的1 改成10 System.Threading.Thread.Sleep(2000); slim.Release(10); Console.Read(); } static void Run() { slim.Wait(); Thread.Sleep(1000 * 5); Console.WriteLine("当前t1={0} 正在运行 {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); slim.Release(); } }
用EnterReadLock 带代替AcquireReaderLock 方法,性能比内核版本要搞得多
ReaderWriterLockSlim slim = new ReaderWriterLockSlim(); slim.EnterReadLock(); slim.ExitReadLock(); slim.EnterWriteLock(); slim.ExitWriteLock(); ReaderWriterLock rwlock = new ReaderWriterLock(); //rwlock.AcquireReaderLock() Console.Read();
混合锁: 先在用户模式下内旋,如果超过一定的阈值,会切换到内核锁。。。
在内旋的情况下,我们会看到大量的Sleep(0),Sleep(1),Yield等语法。。。
我们目前的所有集合都是线程不安全。。。
// System.Collections.Concurrent 1. ConcurrentQueue => Queue 2. ConcurrentDictionary<TKey, TValue> => Dictionary 3. ConcurrentStack<T> => Stack 4. ConcurrentBag<T> !=> List/ LinkList ???
ThreadLocal 是什么意思??? 每个线程有一个自己的备份(线程不可见)
每一个线程分配一个“链表” 这个链表可以任务是list(ThreadLocalList)
当你Add操作的时候,locals里面有一份新增的数据,【只有本线程看得见】
同时head和next也是有数据的。。。。为什么有??因为我们的算法有一个“偷盗”的行为。。。
TryTake: 获取数据
如果有三个线程做Add操作,那么三个线程的数据槽中都有一份子集数据。。。
t1: 1,2,3 locals
t2: 1,3,2 locals
t3: 2,3,4 locals
这个时候,如果你在t3线程中执行了三个TryTake。。
t1: 1,2,3 locals
t2: 1,3,2 locals
t3: empty locals
如果这个时候我在t3线程上进行tryTake,怎么办???
这个时候就到Bag的下一级的ttl head 和 next中去找。。。。【steal 偷盗的时候使用的】
for (threadLocalList = this.m_headList; threadLocalList != null; threadLocalList = threadLocalList.m_nextList) { list.Add(threadLocalList.m_version); if (threadLocalList.m_head != null && this.TrySteal(threadLocalList, out result, take)) { return true; } }
ConcurrentBag的所有数据都是放置在多个插入线程的槽位中。。每个线程一个子集。。。
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); bag.Add(1); bag.Add(2); var result = 0; bag.TryTake(out result); }
线程安全的Stack是使用链表的形式,而同步版本是用 数组 实现的。。。
线程安全的Stack是使用Interlocked来实现线程安全。。 而没有使用 内核锁
static void Main(string[] args) { ConcurrentStack<int> stack = new ConcurrentStack<int>(); stack.Push(1); stack.Push(2); var result = 0; stack.TryPop(out result); Console.WriteLine(result); }
static void Main(string[] args) { ConcurrentQueue<int> queue = new ConcurrentQueue<int>(); queue.Enqueue(1); var result = 0; queue.TryDequeue(out result); Console.WriteLine(result); }
ConcurrentDictionary<int, int> dic = new ConcurrentDictionary<int, int>(); dic.TryAdd(1, 10); dic.ContainsKey(1); foreach (var item in dic) { Console.WriteLine(item.Key + item.Value); }
ConcurrentDictionary => 同步 + lock/其他锁机制 也是可以的。。。
1.现象: 门票的首页之后,页面一直在loading中。。。。
查下来是在一个while循环中做了一个 << >>操作,到时候while条件一直都是true。。。
定义为“一级事件” => 上报董事会 “二级事件” => 事业部CEO
while(true) { i=i<<2。。。 if(ddddd){ } }
如果用windbg去找到。。。 去调试dump文件
演示步骤:
<1> 生成release x64
<2> 在“任务管理器”中生成一个dump文件
<3> 需要用x64 的windbg。。。
<4> !runaway 查看当前托管线程已执行时间
Thread Time 9:5ca8 0 days 0:00:37.796 0:2a68 0 days 0:00:00.015 8:5600 0 days 0:00:00.000 7:46fc 0 days 0:00:00.000 6:33d4 0 days 0:00:00.000 5:3498 0 days 0:00:00.000 4:5644 0 days 0:00:00.000 3:398 0 days 0:00:00.000 2:2a60 0 days 0:00:00.000 1:63c0 0 days 0:00:00.000
<5> 切换到指定的线程 ~~[5ca8]s
<6> 查看当前线程的调用堆栈 !clrstack
000000f4d63ff2a8 00007ff8d50405f7 *** WARNING: Unable to verify checksum for ConsoleApplication51.exe ConsoleApplication51.Program+c.b__1_0() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 22] 000000f4d63ff2b0 00007ff932b10937 System.Threading.Tasks.Task.Execute() 000000f4d63ff2f0 00007ff932ac674e System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 000000f4d63ff3c0 00007ff932ac65e7 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 000000f4d63ff3f0 00007ff932b10bdd System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 000000f4d63ff4a0 00007ff932b10303 System.Threading.Tasks.Task.ExecuteEntry(Boolean) 000000f4d63ff4e0 00007ff932acfa10 System.Threading.ThreadPoolWorkQueue.Dispatch() 000000f4d63ff978 00007ff934626a53 [DebuggerU2MCatchHandlerFrame: 000000f4d63ff978]
从调用堆栈上来看,当前线程 在 Program+c.b__1_0() 方法之后就没有调用堆栈了,说明方法在这个地方
停滞不前了。
<7> 最后到指定的b__1_0方法去寻找一下是否有异常。。。
<8> 通过windbg自己生成dll 【!help】
//通过下面命令生成,并查找 //!dumpdomain //!savemodule 00007ff8d4f350f0 c:\2\1.dll class Program { static void Main(string[] args) { Run(); Console.Read(); } static void Run() { var task = Task.Factory.StartNew(() => { var i = true; //这个地方是一个非常复杂的逻辑。导致死循环 while (true) { i = !i; } }); } }
=> 乱用lock语句,或者“锁机制” [这是一种情况]
0:007> !syncblk Index SyncBlock MonitorHeld Recursion Owning Thread Info SyncBlock Owner 7 0000020bf7522eb8 3 1 0000020bf74c7910 4e04 0 0000020b80007808 ConsoleApplication51.Program
可以看得出“主线程”持有当前的同步锁
说先通过syncblk找到了持有锁的线程,那么肯定有其他的线程在执行Monitor.Enter的时候
进行不下去。。。也就是调用堆栈顶部到这个地方为止。。。。
0000005cf5ffea98 00007ff949476c24 [GCFrame: 0000005cf5ffea98] 0000005cf5ffebd8 00007ff949476c24 [GCFrame: 0000005cf5ffebd8] 0000005cf5ffec18 00007ff949476c24 [HelperMethodFrame_1OBJ: 0000005cf5ffec18] System.Threading.Monitor.Enter(System.Object) 0000005cf5ffed10 00007ff8d5030658 ConsoleApplication51.Program.Run2() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 55] 0000005cf5ffed50 00007ff8d50305ec ConsoleApplication51.Program.b__1_0() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 44] 0000005cf5ffed80 00007ff932b10937 System.Threading.Tasks.Task.Execute() 0000005cf5ffedc0 00007ff932ac674e System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 0000005cf5ffee90 00007ff932ac65e7 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 0000005cf5ffeec0 00007ff932b10bdd System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 0000005cf5ffef70 00007ff932b10303 System.Threading.Tasks.Task.ExecuteEntry(Boolean) 0000005cf5ffefb0 00007ff932acfa10 System.Threading.ThreadPoolWorkQueue.Dispatch() 0000005cf5fff448 00007ff934626a53 [DebuggerU2MCatchHandlerFrame: 0000005cf5fff448]
1、!dumpheap -stat 查看clr的托管堆中的各个类型的占用情况
00007ff932cc2aa8 19 1296 System.String[] 00007ff932cc3698 58 3248 System.RuntimeType 00007ff932cc16b8 186 9218 System.String 000001358b1503d0 57 12824 Free 00007ff932cc1d30 6 35216 System.Object[] 00007ff932cc5dc0 13762 660576 System.Text.StringBuilder 00007ff932cc2860 13775 220334298 System.Char[]
然后看到了有13775个char[]数组
!DumpHeap /d -mt 00007ff932cc2860 //查看当前的方法表 !DumpObj /d 00000135978d5340 //查看当前char[]的内容 !gcroot 00000135a60f4940 //查看当前地址的Root。。。 所以结合“StringBuilder”,结合 ”hello world“ 我们就找出了问题。。。
救火的问题。。。。给公司挽回损失。。。。 【.net高级调试】
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication51 { class Program { static StringBuilder sb = new StringBuilder(); static void Main(string[] args) { //Run(); //new Program().Run(); for (int i = 0; i < 10000000; i++) { sb.Append("hello world"); } Console.WriteLine("执行完毕!"); Console.Read(); } //static void Run() //{ // var task = Task.Factory.StartNew(() => // { // var i = true; // //这个地方是一个非常复杂的逻辑。导致死循环 // while (true) // { // i = !i; // } // }); //} //void Run() //{ // lock (this) // { // var task = Task.Run(() => // { // Console.WriteLine("----- start ---- "); // Thread.Sleep(1000); // Run2(); // Console.WriteLine("------ end -----"); // }); // task.Wait(); // } //} //void Run2() //{ // lock (this) // { // Console.WriteLine("我是 run2.。。。。"); // } //} } }折叠
场景: 1500w数据 800w会员,
全内存跑, 25G空间
多条件在1500w数据中做快速检索。。。。最终目的
最原始的时候,能够达到 90s -120s
别让技术阻碍了业务发展。 【技术一定要领先于业务 eric】
耗费内存不可怕,可怕的是速度提不起来,别让技术阻碍了业务发展。
//1500w foreach(var item in list) { new class(){tradecount=10 ,customerid=1} } //tempCustomerEntityList : 1500w 30s var query = from item in tempCustomerEntityList group item by item.CustomerId into grp select new { key = grp.Key, list = grp.Count() };
Dictionary<int,int> dic
key: customerid
value: totalcount
因为dictionary本身底层实现就是通过数组的。。。
int num = this.comparer.GetHashCode(key) & 2147483647; 1500w
int[] nums=new int[100]; 1s index: 0-99 是不是可以存放customerid=1 .。。。 99 value: totalcount foreach (var customerEntity in tempCustomerEntityList) { if (totalTradeCountArray[(int)customerEntity.CustomerId] == 0) { totalTradeCountArray[(int)customerEntity.CustomerId] = 1; } else { totalTradeCountArray[(int)customerEntity.CustomerId]++; } }
数组天然就是一个hash。
<2> 大数据下字典的性能特别烂,因为每次的Add操作都要计算 hashcode,记得使用天然的hash 形式【数组】
比如:Array中,i ndex=customerid,content=个数 总交易个数 BitArray中,index=customerid,content=true/false 是否包含 1s 【城市等级】
<3> 重点优化总交易金额排名 【2-3天】
a、原始的方式: dictionary + orderby【快排2/3】 20 - 30s
key: customerid value: payment 最后对字典进行orderby
b 、改进1: Array + 小根堆 10s-12s 原因在于小跟堆 太大
TopN的问题。 100个大小的小根堆 1500w * 0.25 % ~400w大小堆
c、桶排序 + TopDictonary 4s
payment: 81.12 => 81.12 * 100 =8112 (array index)
100w * 100 = 1个亿 (Array数组index 达到1个亿)
使用payment * 100 作为index,保持payment在1.5w以内。=1500000
大于1.5w的单独用dictionary存起来,这样的大客户毕竟不多
100 - 5000 绝大多数人。。。
d、太漂亮的代码性能基本都不高,返朴归真的代码性能才是最高的,也是最难写的。
// Or 条件下的多线程 // And 条件下的多线程 // Customer 条件下的多线程 try { //多线程处理 Task<List<long>>[] tasks = new Task<List<long>>[fieldValueItemList.Count]; for (int i = 0; i < fieldValueItemList.Count; i++) { tasks[i] = Task.Factory.StartNew((fieldValueItem) => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件:{0}", filterCore.GetType().Name))) { List<long> smallCustomerIDList = null; try { smallCustomerIDList = filterCore.Filter((FilterValueItem)fieldValueItem); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return smallCustomerIDList; } }, fieldValueItemList[i]); } Task.WhenAll(tasks).ContinueWith(t => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件 追加List时间: {0}", filterCore.GetType().Name))) { foreach (var task in tasks) { customerIDList.AddRange(task.Result); } } }, TaskContinuationOptions.OnlyOnRanToCompletion).Wait(); } catch (Exception ex) { LogHelper.WriteLog(ex); thr