<1> thread的内核数据结构,其中有osid,context => CPU寄存器的里面的一些变量。 30 ms
<2>. thread 环境块 :
tls【thread本地存储】, execptionList 的信息。。。。
WinDbg 来给大家演示。。。 32,64 =可以达到clr的层面给大家展示底层知识
.loadby sos clr
<3> 用户模式堆栈 内存溢出的一个异常 【堆栈溢出】
一个线程 分配 1M的堆栈空间,,【参数,局部变量】
<4> 内核模式堆栈
在CLR的线程操作,包括线程同步,大多都是调用底层的win32 函数 ,用户模式的参数需要传递到内核模式。。。
<1> 我们进程启动的时候,会加载很多的dll [托管和非托管的], exe,资源,元数据。。。。
进程启动的时候,默认会有三个应用程序域。system domain, shared domain[int,long....] ,domain1.
开启一个thread,销毁一个thread 都会通知进程中的dll,attach,detach 标志位。。。
通知dll的目的就是 给thread做准备工作,比如销毁,让这些dll做资源清理。。。。
<2> 时间片切换
比如说9个thread并发执行。 必然会有一个thread休眠30 ms。。。。
for => palleral for
了解Thread的实例方法。。。 【id,ThreadState】
管理Thread生命周期 Start, Suspend, Resume, Intterupt,Abort。。。 Join 在使用Thread的时候是用的非 常多的。。。
thread = new Thread(new ThreadStart(() => { while (true) { try { Thread.Sleep(1000); textBox1.Invoke(new Action(() =>{ textBox1.AppendText(string.Format("{0},", index++)); })); } catch (Exception ex) { MessageBox.Show(string.Format("{0}, {1}", ex.Message, index)); } } })); thread.Start();
0:017> !ThreadState ab024 User Suspend Pending Legal to Join CLR Owns CoInitialized In Multi Threaded Apartment Fully initialized Sync Suspended
while(true){ continue.... 效果}
0:007> !ThreadState 202b020 Legal to Join CLR Owns CoInitialized In Multi Threaded Apartment Fully initialized Interruptible
《1》 t1 ,t2 共享 变量 public注意有“锁”的概念
《2》 t1 , t2 各自有一个 变量 internel 没有锁争用的概念
static void Main(string[] args) { var slot = Thread.AllocateNamedDataSlot("username"); //主线程 上 设置槽位,, 也就是hello world 只能被主线程读取,其他线程无法读取 Thread.SetData(slot, "hello world!!!"); var t = new Thread(() => { var obj = Thread.GetData(slot); Console.WriteLine("当前工作线程:{0}", obj); }); t.Start(); var obj2 = Thread.GetData(slot); Console.WriteLine("主线程:{0}", obj2); Console.Read(); }
[ThreadStatic] static string username = string.Empty; static void Main(string[] args) { username = "hello world!!!"; var t = new Thread(() => { Console.WriteLine("当前工作线程:{0}", username); }); t.Start(); Console.WriteLine("主线程:{0}", username); Console.Read();
static void Main(string[] args) { ThreadLocal<string> local = new ThreadLocal<string>(); local.Value = "hello world!!!"; var t = new Thread(() => { Console.WriteLine("当前工作线程:{0}", local.Value); }); t.Start(); Console.WriteLine("主线程:{0}", local.Value); Console.Read(); }
MemoryBarrier、VolatileRead/Write 这些方法到底有什么用处。。。。
因为Release中做了一些代码和缓存的优化。。。 比如说将一些数据从memory中读取到CPU高速缓存中。
冒泡排序 O(N)2 1w * 1w = 1亿
class Program { static void Main(string[] args) { var path = Environment.CurrentDirectory + "//1.txt"; var list = System.IO.File.ReadAllLines(path).Select(i => Convert.ToInt32(i)).ToList(); for (int i = 0; i < 5; i++) { var watch = Stopwatch.StartNew(); var mylist = BubbleSort(list); watch.Stop(); Console.WriteLine(watch.Elapsed); } Console.Read(); } //冒泡排序算法 static List<int> BubbleSort(List<int> list) { int temp; //第一层循环: 表明要比较的次数,比如list.count个数,肯定要比较count-1次 for (int i = 0; i < list.Count - 1; i++) { //list.count-1:取数据最后一个数下标, //j>i: 从后往前的的下标一定大于从前往后的下标,否则就超越了。 for (int j = list.Count - 1; j > i; j--) { //如果前面一个数大于后面一个数则交换 if (list[j - 1] > list[j]) { temp = list[j - 1]; list[j - 1] = list[j]; list[j] = temp; } } } return list; } } }
static void Main(string[] args) { var isStop = false; var t = new Thread(() => { var isSuccess = false; while (!isStop) { isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = true; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); }
就是t这个线程会将isStop加载到Cpu Cache中。。。 【release大胆的优化】
static void Main(string[] args) { var isStop = 0; var t = new Thread(() => { var isSuccess = false; while (isStop == 0) { Thread.VolatileRead(ref isStop); isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = 1; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); }
thread 我如果想做一个异步任务,就需要开启一个Thread。 具有专有性。。。
ThreadPool =》 如果想做异步任务 只需要向租车公司借用 =》 使用完了就要归还
static void Main(string[] args) { ThreadPool.QueueUserWorkItem((obj) => { var func = obj as Func<string>; Console.WriteLine("我是工作线程:{0}, content={1}", Thread.CurrentThread.ManagedThreadId,func()); }, new Func<string>(() => "hello world")); Console.WriteLine("主线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Console.Read(); }
1、区别: DeadThread: 10
static void Main(string[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(() => { for (int j = 0; j < 10; j++) { Console.WriteLine("work:{0},tid={1}", i, Thread.CurrentThread.ManagedThreadId); } }); thread.Name = "main" + i; thread.Start(); } Console.Read(); }
DeadThread: 0 0:014> !threadpool CPU utilization: 4% Worker Thread: Total: 8 Running: 0 Idle: 8 MaxLimit: 2047 MinLimit: 8 Work Request in Queue: 0 Number of Timers: 0 Completion Port Thread:Total: 0 Free: 0 MaxFree: 16 CurrentLimit: 0 MaxLimit: 1000 MinLimit: 8
其中有“工作线程” 和 “IO线程”
工作线程: 给一般的异步任务执行的。。其中不涉及到 网络,文件 这些IO操作。。。 【开发者调用】
IO线程: 一般用在文件,网络IO上。。。 【CLR调用】
8的又来就是因为我有 8个逻辑处理器,也就是说可以8个Thread 并行处理。。。。
1、threadPool 可以用8个线程 解决 thread 10个线程干的事情,
时间: 通过各个托管和非托管的dll。。。
空间:teb,osthread结构, 堆栈。
1、ThreadPool 定时器功能
static void Main(string[] args) { ThreadPool.RegisterWaitForSingleObject(new AutoResetEvent(true), new WaitOrTimerCallback((obj, b) => { //做逻辑判断,判断是否在否以时刻执行。。。 Console.WriteLine("obj={0},tid={1}, datetime={2}", obj, Thread.CurrentThread.ManagedThreadId, DateTime.Now); }), "hello world", 1000, false); Console.Read(); }
windbg 来看一下底层线程是什么样的。。。。
ID OSID ThreadOBJ State GC Mode GC Alloc Context Domain Count Apt Exception 0 1 3f54 01157bc8 2a020 Preemptive 02E8A3E4:00000000 01152258 1 MTA 5 2 2594 011678f8 2b220 Preemptive 00000000:00000000 01152258 0 MTA (Finalizer) 6 3 3c28 01189990 1020220 Preemptive 00000000:00000000 01152258 0 Ukn (Threadpool Worker) 7 4 121c 0118a2c0 8029220 Preemptive 02E8D8A4:00000000 01152258 0 MTA (Threadpool Completion Port) 8 5 28f4 0118bd70 8029220 Preemptive 00000000:00000000 01152258 0 MTA (Threadpool Completion Port) 0:009> !threadpool CPU utilization: 9% Worker Thread: Total: 0 Running: 0 Idle: 0 MaxLimit: 2047 MinLimit: 8 Work Request in Queue: 0 -------------------------------------- Number of Timers: 0 -------------------------------------- Completion Port Thread:Total: 2 Free: 2 MaxFree: 16 CurrentLimit: 2 MaxLimit: 1000 MinLimit: 8
System.threading 下面有timer
System.Timer 下面Timer。。。
System.Windows.Form 下面Timer。。。
System.Web.UI 下面Timer。。。
0:009> !threads ThreadCount: 4 UnstartedThread: 0 BackgroundThread: 3 PendingThread: 0 DeadThread: 0 Hosted Runtime: no Lock ID OSID ThreadOBJ State GC Mode GC Alloc Context Domain Count Apt Exception 0 1 2d74 00f785c8 2a020 Preemptive 02E360F0:00000000 00f72030 1 MTA 5 2 3784 00f87ea0 2b220 Preemptive 00000000:00000000 00f72030 0 MTA (Finalizer) 6 3 2dc4 00faae18 102a220 Preemptive 00000000:00000000 00f72030 0 MTA (Threadpool Worker) 7 4 3e34 00fab748 1029220 Preemptive 02E3D4D0:00000000 00f72030 0 MTA (Threadpool Worker)
底层有一个队列 TimerQueue instance2 = TimerQueue.Instance; internal class TimerQueue
Timer 首先是用 ThreadPool.UnsafeQueueUserWorkItem(waitCallback, timer); 来完成定时功能。。
Task task = new Task(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); }); task.Start(); Console.Read();
//使用TaskFactory启动 var task = Task.Factory.StartNew(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); });
//使用Task的Run方法 var task = Task.Run(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); });
//这个是同步执行。。。。也就是阻塞执行。。。 var task = new Task(() => { Console.WriteLine("我是工作线程: tid={0}", Thread.CurrentThread.ManagedThreadId); }); task.RunSynchronously();
TaskScheduler 相当于Task的CPU处理器。。。
wpf中的TaskScheduler是 SynchronizationContextTaskScheduler
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
让Task具有返回值。。。 它的父类其实就是Task。。
Thread t = new Thread(() => { System.Threading.Thread.Sleep(100); Console.WriteLine("我是工作线程1"); }); Thread t2 = new Thread(() => { System.Threading.Thread.Sleep(100); Console.WriteLine("我是工作线程2"); }); t.Start(); t2.Start(); t.Join(); // t1 && t2 都完成了 WaitAll操作。。。 WaitAny t1 || t2 t2.Join(); Console.WriteLine("我是主线程"); Console.Read();
static void Main(string[] args) { Task task1 = new Task(() =>{ System.Threading.Thread.Sleep(1000); Console.WriteLine("我是工作线程1:{0}", DateTime.Now); }); task1.Start(); Task task2 = new Task(() => { System.Threading.Thread.Sleep(2000); Console.WriteLine("我是工作线程2:{0}", DateTime.Now); }); task2.Start(); Task.WhenAll(task1, task2).ContinueWith(t => { //执行“工作线程3”的内容 Console.WriteLine("我是工作线程 {0}", DateTime.Now); }); Console.Read(); }
Task.Factory.ContinueWhenAll(new Task[2] { task1, task2 }, (t) => { //执行“工作线程3”的内容 Console.WriteLine("我是主线程 {0}", DateTime.Now); });
介绍Task的7种阻塞方式 + 延续
using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main(string[] args) { Task task1 = new Task(() => { System.Threading.Thread.Sleep(1000); Console.WriteLine("我是工作线程1:{0}", DateTime.Now); }); task1.Start(); Task task2 = new Task(() => { System.Threading.Thread.Sleep(2000); Console.WriteLine("我是工作线程2:{0}", DateTime.Now); }); task2.Start(); //Task.WhenAny(task1, task2).ContinueWith(t => //{ // //执行“工作线程3”的内容 // Console.WriteLine("我是主线程 {0}", DateTime.Now); //}); //Task.WhenAll(task1, task2).ContinueWith(t => //{ // //执行“工作线程3”的内容 // Console.WriteLine("我是工作线程 {0}", DateTime.Now); //}); Console.Read(); } } }
public Task(Action action, TaskCreationOptions creationOptions); // // 摘要: // 指定可控制任务的创建和执行的可选行为的标志。 [Flags] public enum TaskCreationOptions { // // 摘要: // 指定应使用默认行为。 None = 0, // // 摘要: // 提示 System.Threading.Tasks.TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。 PreferFairness = 1, // // 摘要: // 指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。它会向 System.Threading.Tasks.TaskScheduler // 提示,过度订阅可能是合理的。您可以通过过度订阅创建比可用硬件线程数更多的线程。 LongRunning = 2, // // 摘要: // 指定将任务附加到任务层次结构中的某个父级。有关详细信息,请参阅 已附加和已分离的子任务。 AttachedToParent = 4, // // 摘要: // 如果尝试附有子任务到创建的任务,指定 System.InvalidOperationException 将被引发。 DenyChildAttach = 8, // // 摘要: // 防止环境计划程序被视为已创建任务的当前计划程序。这意味着像 StartNew 或 ContinueWith 创建任务的执行操作将被视为 System.Threading.Tasks.TaskScheduler.Default // 当前计划程序。 HideScheduler = 16 }
public Task ContinueWith(Action continuationAction, TaskContinuationOptions continuationOptions);
TaskCreationOptions :
Task task = new Task(() => { Task task1 = new Task(() => { Thread.Sleep(100); Console.WriteLine("task1"); }, TaskCreationOptions.AttachedToParent); Task task2 = new Task(() => { Thread.Sleep(10); Console.WriteLine("task2"); }, TaskCreationOptions.AttachedToParent); task1.Start(); task2.Start(); }); task.Start(); task.Wait(); //task.WaitAll(task1,task2); Console.WriteLine("我是主线程!!!!"); Console.Read();
DenyChildAttach: 不让子任务附加到父任务上去。。。
static void Main(string[] args) { Task task = new Task(() => { Task task1 = new Task(() => { Thread.Sleep(100); Console.WriteLine("task1"); }, TaskCreationOptions.AttachedToParent); Task task2 = new Task(() => { Thread.Sleep(10); Console.WriteLine("task2"); }, TaskCreationOptions.AttachedToParent); task1.Start(); task2.Start(); }, TaskCreationOptions.DenyChildAttach); task.Start(); task.Wait(); //task.WaitAll(task1,task2); Console.WriteLine("我是主线程!!!!"); Console.Read(); }
HideScheduler: 子任务默认不使用父类的Task的Scheduler。。。而是使用默认的。
LongRunning:如果你明知道是长时间运行的任务,建议你使用此选项。。建议使用 “Thread” 而不是“threadPool"。
如果长期租用不还给threadPool,threadPool为了满足市场需求,会新开一些线程。。。 满足当前使用,如果此时租用线程被归还,这会导致ThreadPool的线程过多,销毁和 调度都是一个很大的麻烦。。。
catch (ThreadAbortException exceptionObject)
this.FinishThreadAbortedTask(true, false);
internal void InternalQueueTask(Task task)
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
PreferFairness: 给你的感觉就是一个”queue“的感觉。。。
会将Task放入到ThreadPool的全局队列中。。。。 让work thread进行争抢。。。。
// System.Threading.ThreadPoolWorkQueue [SecurityCritical] public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) { ThreadPoolWorkQueueThreadLocals threadPoolWorkQueueThreadLocals = null; if (!forceGlobal) { threadPoolWorkQueueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; } if (this.loggingEnabled) { FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); } if (threadPoolWorkQueueThreadLocals != null) { threadPoolWorkQueueThreadLocals.workStealingQueue.LocalPush(callback); } else { ThreadPoolWorkQueue.QueueSegment queueSegment = this.queueHead; while (!queueSegment.TryEnqueue(callback)) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref queueSegment.Next, new ThreadPoolWorkQueue.QueueSegment(), null); while (queueSegment.Next != null) { Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, queueSegment.Next, queueSegment); queueSegment = this.queueHead; } } } this.EnsureThreadRequested(); }
// // 摘要: // Default = "Continue on any, no task options, run asynchronously" 指定应使用默认行为。默认情况下,完成前面的任务之后将安排运行延续任务,而不考虑前面任务的最终 // System.Threading.Tasks.TaskStatus。 None = 0, // // 摘要: // 提示 System.Threading.Tasks.TaskScheduler 以一种尽可能公平的方式安排任务,这意味着较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。 PreferFairness = 1, // // 摘要: // 指定某个任务将是运行时间长、粗粒度的操作。它会向 System.Threading.Tasks.TaskScheduler 提示,过度订阅可能是合理的。 LongRunning = 2, // // 摘要: // 指定将任务附加到任务层次结构中的某个父级。 AttachedToParent = 4, // // 摘要: // 如果尝试附有子任务到创建的任务,指定 System.InvalidOperationException 将被引发。 DenyChildAttach = 8, // // 摘要: // 防止环境计划程序被视为已创建任务的当前计划程序。这意味着像 StartNew 或 ContinueWith 创建任务的执行操作将被视为 System.Threading.Tasks.TaskScheduler.Default // 当前计划程序。 HideScheduler = 16, // // 摘要: // 在延续取消的情况下,防止延续的完成直到完成先前的任务。 LazyCancellation = 32, // // 摘要: // 指定不应在延续任务前面的任务已完成运行的情况下安排延续任务。此选项对多任务延续无效。 NotOnRanToCompletion = 65536, // // 摘要: // 指定不应在延续任务前面的任务引发了未处理异常的情况下安排延续任务。此选项对多任务延续无效。 NotOnFaulted = 131072, // // 摘要: // 指定只应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。 OnlyOnCanceled = 196608, // // 摘要: // 指定不应在延续任务前面的任务已取消的情况下安排延续任务。此选项对多任务延续无效。 NotOnCanceled = 262144, // // 摘要: // 指定只有在延续任务前面的任务引发了未处理异常的情况下才应安排延续任务。此选项对多任务延续无效。 OnlyOnFaulted = 327680, // // 摘要: // 指定只应在延续任务前面的任务已完成运行的情况下才安排延续任务。此选项对多任务延续无效。 OnlyOnRanToCompletion = 393216, // // 摘要: // 指定应同步执行延续任务。指定此选项后,延续任务将在导致前面的任务转换为其最终状态的相同线程上运行。如果在创建延续任务时已经完成前面的任务,则延续任务将在创建此延续任务的线程上运行。只应同步执行运行时间非常短的延续任务。 ExecuteSynchronously = 524288折叠
Cancellation 判断任务的取消。。。 Thread abort
task1 -> continuewith task2 -> continuewith -> task3
有因为task2 和task1已经没有延续关系了。。。所以 task1和task3可以并行,
TaskContinuationOptions.LazyCancellation 它的本质就是:
需要等待task1执行完成之后再判断source.token的状态。。。。 这样的话,
首先就形成了一条链: task1 -> task2 -> task3...
static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); source.Cancel(); Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }, source.Token, TaskContinuationOptions.LazyCancellation, TaskScheduler.Current); var task3 = task2.ContinueWith(t => { Console.WriteLine("task3 tid={0}, dt={1} {2}", Thread.CurrentThread.ManagedThreadId,DateTime.Now, task2.Status); }); task1.Start(); Console.Read(); }
Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); },TaskContinuationOptions.ExecuteSynchronously);
Task task1 = new Task(() => { Thread.Sleep(1000); Console.WriteLine("task1 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); throw new Exception("hello world"); }); var task2 = task1.ContinueWith(t => { Console.WriteLine("task2 tid={0}, dt={1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); }, TaskContinuationOptions.NotOnRanToCompletion);
static void Main(string[] args) { var isStop = false; var thread = new Thread(() => { while (!isStop) { Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }); thread.Start(); Thread.Sleep(1000); isStop = true; Console.Read(); }
CancellationTokenSource 远比 isStop这个变量强的多。。。
使用cancel实现isStop同样的功能。。。 强在哪里????
<1> 当任务取消的时候,我希望有一个函数能够被触发,这个触发可以做一些资源的清理,
CancellationTokenSource source = new CancellationTokenSource(); source.Token.Register(() => { //如果当前的token被取消,此函数将会被执行 Console.WriteLine("当前source已经被取消,现在可以做资源清理了。。。。"); }); var task = Task.Factory.StartNew(() => { while (!source.IsCancellationRequested) { Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }, source.Token); Thread.Sleep(1000); source.Cancel(); Console.Read();
我想2秒之后自动取消,N秒。。。。 webservice。。。wcf。
《1》 CancelAfter
source.CancelAfter(new TimeSpan(0, 0, 0, 1));
《2》 CancellationTokenSource 的构造函数中进行取消
CancellationTokenSource source = new CancellationTokenSource(1000);
<3> 取消的组合 将CancellationTokenSource 组合成一个链表
其中任何一个CancellationTokenSource被取消,组合source也会被取消。。。 var s3= s1 && s2; static void Main(string[] args) { CancellationTokenSource source1 = new CancellationTokenSource(); //现在要让source1取消 //source1.Cancel(); CancellationTokenSource source2 = new CancellationTokenSource(); source2.Cancel(); var combineSource = CancellationTokenSource.CreateLinkedTokenSource(source1.Token, source2.Token); Console.WriteLine("s1={0} s2={1} s3={2}", source1.IsCancellationRequested, source2.IsCancellationRequested, combineSource.IsCancellationRequested); Console.Read(); }
//如果一个任务被取消,我希望代码抛出一个异常。。。 if(IsCancellationRequested) throw new Exception("adasdaf"); // == 等价操作 == throwIfCancellationRequested(); CancellationTokenSource source1 = new CancellationTokenSource(); //现在要让source1取消 //source1.Cancel(); CancellationTokenSource source2 = new CancellationTokenSource(); source2.Cancel(); var combineSource = CancellationTokenSource.CreateLinkedTokenSource(source1.Token, source2.Token); Console.WriteLine("s1={0} s2={1} s3={2}", source1.IsCancellationRequested, source2.IsCancellationRequested, combineSource.IsCancellationRequested); Console.Read(); //---------------------------------------------------------------------------------------- //例: using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); source.Token.Register(() => { //如果当前的token被取消,此函数将会被执行 Console.WriteLine("当前source已经被取消,现在可以做资源清理了。。。。"); }); var task = Task.Factory.StartNew(() => { while (true) { source.Token.ThrowIfCancellationRequested(); Thread.Sleep(100); Console.WriteLine("当前thread={0} 正在运行", Thread.CurrentThread.ManagedThreadId); } }, source.Token); Thread.Sleep(1000); source.Cancel(); Thread.Sleep(100); Console.WriteLine(task.Status); Console.Read(); } } }折叠
class Program { private static int workThreadNums = 0; private static bool isStop = false; static void Main(string[] args) { var tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = Task.Factory.StartNew((obj) => { Run(); }, i); } //是否退出 string input = Console.ReadLine(); while ("Y".Equals(input, StringComparison.OrdinalIgnoreCase)) { break; } isStop = true; while (workThreadNums != 0) { Console.WriteLine("正在等待线程结束,当前还在运行线程有:{0}", workThreadNums); Thread.Sleep(10); } Console.WriteLine("准备退出了。。。"); Console.Read(); Environment.Exit(0); } static void Run() { try { workThreadNums++; while (true) { if (isStop) break; Thread.Sleep(1000); //执行业务逻辑 Console.WriteLine("我是线程:{0},正在执行业务逻辑", Thread.CurrentThread.ManagedThreadId); } } finally { workThreadNums--; } } }折叠
1、 如何实现多线程中的协作取消?
直接用一个bool变量来判断子线程是否退出的办法其实是很没有档次的,在net 4.0中有一个类(CancellationTokenSource)专门来解决使用bool变量来判断的这种很low的场景,而且比bool变量具有更强大的功能,这个会在以后的文章中跟大家去讲。
class Program { static void Main(string[] args) { CancellationTokenSource source = new CancellationTokenSource(); var tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = Task.Factory.StartNew((m) => { Run(source.Token); }, i); } Task.WhenAll(tasks).ContinueWith((t) => { Console.WriteLine("准备退出了。。。"); Console.Read(); Environment.Exit(0); }); string input = Console.ReadLine(); while ("Y".Equals(input, StringComparison.OrdinalIgnoreCase)) { source.Cancel(); } Console.Read(); } static void Run(CancellationToken token) { while (true) { if (token.IsCancellationRequested) break; Thread.Sleep(1000); //执行业务逻辑 Console.WriteLine("我是线程:{0},正在执行业务逻辑", Thread.CurrentThread.ManagedThreadId); } } }
protected List<long> ExecuteOR(IFilterCore filterCore, List<FilterValueItem> fieldValueItemList) { List<long> customerIDList = new List<long>(); try { //多线程处理 Task<List<long>>[] tasks = new Task<List<long>>[fieldValueItemList.Count]; for (int i = 0; i < fieldValueItemList.Count; i++) { tasks[i] = Task.Factory.StartNew((fieldValueItem) => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件:{0}", filterCore.GetType().Name))) { List<long> smallCustomerIDList = null; try { smallCustomerIDList = filterCore.Filter((FilterValueItem)fieldValueItem); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return smallCustomerIDList; } }, fieldValueItemList[i]); } Task.WhenAll(tasks).ContinueWith(t => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件 追加List时间: {0}", filterCore.GetType().Name))) { foreach (var task in tasks) { customerIDList.AddRange(task.Result); } } }, TaskContinuationOptions.OnlyOnRanToCompletion).Wait(); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return customerIDList; }折叠
1、Result 多了此属性
<1> 获取Task的返回值: Wait();
public class Task<TResult> : Task Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); task1.Wait(); Console.WriteLine(task1.Result); Console.Read(); // System.Threading.Tasks.Task [__DynamicallyInvokable] public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken) { if (millisecondsTimeout < -1) { throw new ArgumentOutOfRangeException("millisecondsTimeout"); } if (!this.IsWaitNotificationEnabledOrNotRanToCompletion) { return true; } if (!this.InternalWait(millisecondsTimeout, cancellationToken)) { return false; } if (this.IsWaitNotificationEnabledOrNotRanToCompletion) { this.NotifyDebuggerOfWaitCompletionIfNecessary(); if (this.IsCanceled) { cancellationToken.ThrowIfCancellationRequested(); } this.ThrowIfExceptional(true); } return true; }
<2> 直接TResult。。。
// System.Threading.Tasks.Task<TResult> internal TResult GetResultCore(bool waitCompletionNotification) { if (!base.IsCompleted) { base.InternalWait(-1, default(CancellationToken)); } if (waitCompletionNotification) { base.NotifyDebuggerOfWaitCompletionIfNecessary(); } if (!base.IsRanToCompletion) { base.ThrowIfExceptional(true); } return this.m_result; }
2、ContinueWith 也可以具有返回值
static void Main() { Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); var task2 = task1.ContinueWith<string>(t => { int num = t.Result; var sum = num + 10; return sum.ToString(); }); Console.WriteLine(task2.Result); Console.Read(); }
3、Task.WhenAll/ WhenAny
static void Main() { Task<int> task1 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 1; }); Task<int> task2 = Task.Factory.StartNew(() => { //做一些逻辑运算 return 2; }); var task = Task.WhenAll<int>(new Task<int>[2] { task1, task2 }); var result = task.Result; Console.WriteLine(task2.Result); Console.Read(); }
public void Handle(Func<Exception, bool> predicate) { if (predicate == null) { throw new ArgumentNullException("predicate"); } List<Exception> list = null; for (int i = 0; i < this.m_innerExceptions.Count; i++) { if (!predicate(this.m_innerExceptions[i])) { if (list == null) { list = new List<Exception>(); } list.Add(this.m_innerExceptions[i]); } } if (list != null) { throw new AggregateException(this.Message, list); } } //********************************************************************************** //例: using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { var task = Task.Factory.StartNew(() => { var childTask1 = Task.Factory.StartNew(() => { throw new Exception("我是 childTask1 异常"); }, TaskCreationOptions.AttachedToParent); var childTask2 = Task.Factory.StartNew(() => { throw new Exception("我是 childTask2 异常"); }, TaskCreationOptions.AttachedToParent); }); try { try { task.Wait(); } catch (AggregateException ex) { ex.Handle(x => { if (x.InnerException.Message == "我是 childTask1 异常") { return true; } return false; }); } } catch (Exception ex) { } try { } catch (InvalidCastException ex) { throw; } Console.Read(); } } }折叠
Parallel for,foreach invoke
//串行计算 for (int i = 0; i < 100; i++) { Console.WriteLine(i); } //并行计算 Parallel.For(0, 100, (item) => { Console.WriteLine(item); });
<1> 我们在并行的时候,可以指定当前有几个线程参与计算。。。
<2> RangeManager rangeManager = new RangeManager((long)fromInclusive, (long)toExclusive, 1L, nNumExpectedWorkers);
分区函数 【0-100】
100/8 =12 100 % 8 = 4 t1 => 0-11 t2 => 12-24 t3 => 25-36 。。。 ParallelForReplicatingTask : Task
最后会使用ParallelForReplicatingTask 进行处理
static void Main() { ConcurrentStack<int> stack = new ConcurrentStack<int>(); //并行计算 Parallel.For(0, 100, (item, loop) => { if (item == 10) { loop.Stop(); return; } stack.Push(item); }); Console.WriteLine(string.Join(",", stack)); }
public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
聚合函数是一样的。。。 其实就是一个并行的聚合计算。
比如说:我想做一个并行的从 1-100的累积计算。。。
1+2+3+4+5+。。。。+99 =4950
Parallel.For 可以实现一些数组的累计运算。。。
Parallel.ForEach 应对一些集合运算 【非数组】
// System.Threading.Tasks.Parallel private static ParallelLoopResult ForEachWorker<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally) { if (parallelOptions.CancellationToken.IsCancellationRequested) { throw new OperationCanceledException(parallelOptions.CancellationToken); } TSource[] array = source as TSource[]; if (array != null) { return Parallel.ForEachWorker<TSource, TLocal>(array, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); } IList<TSource> list = source as IList<TSource>; if (list != null) { return Parallel.ForEachWorker<TSource, TLocal>(list, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); } return Parallel.PartitionerForEachWorker<TSource, TLocal>(Partitioner.Create<TSource>(source), parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); }
internal InternalPartitionEnumerable(IEnumerator<TSource> sharedReader, bool useSingleChunking, bool isStaticPartitioning) { this.m_sharedReader = sharedReader; this.m_sharedIndex = new Partitioner.SharedLong(-1L); this.m_hasNoElementsLeft = new Partitioner.SharedBool(false); this.m_sourceDepleted = new Partitioner.SharedBool(false); this.m_sharedLock = new object(); this.m_useSingleChunking = useSingleChunking; if (!this.m_useSingleChunking) { int num = (PlatformHelper.ProcessorCount > 4) ? 4 : 1; this.m_FillBuffer = new KeyValuePair<long, TSource>[num * Partitioner.GetDefaultChunkSize<TSource>()]; } if (isStaticPartitioning) { this.m_activePartitionCount = new Partitioner.SharedInt(0); return; } this.m_activePartitionCount = null; }
字典的分区函数 。。。。
Dictionary<int, int> dic = new Dictionary<int, int>() { {1,100}, {2,200 }, {3,300 } }; Parallel.ForEach(dic, (item) => { Console.WriteLine(item.Key); });
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { //ConcurrentStack<int> stack = new ConcurrentStack<int>(); ////并行计算 //Parallel.For(0, 100, new ParallelOptions() //{ // MaxDegreeOfParallelism = Environment.ProcessorCount - 1, //保证当前还有一个线程不会参与 //}, (item, loop) => //{ // System.Threading.Thread.Sleep(10000000); // stack.Push(item); //}); //Console.WriteLine(string.Join(",", stack)); //var totalNums = 0; //Parallel.For<int>(1, 100, () => { return 0; }, (current, loop, total) => //{ // total += (int)current; // return total; //}, (total) => //{ // Interlocked.Add(ref totalNums, total); //}); //Console.WriteLine(totalNums); //int[] nums = new int[10]; //Parallel.For(0, nums.Length, (item) => //{ // //do logic // var temp = nums[item]; //}); //Dictionary<int, int> dic = new Dictionary<int, int>() //{ // {1,100}, // {2,200 }, // {3,300 } //}; //Parallel.ForEach(dic, (item) => //{ // Console.WriteLine(item.Key); //}); //Parallel.Invoke(() => //{ // Console.WriteLine("我是并行计算1 " + Thread.CurrentThread.ManagedThreadId); //}, () => //{ // Console.WriteLine("我是并行计算2 " + Thread.CurrentThread.ManagedThreadId); //}); } } }折叠
linq to object var nums = Enumerable.Range(0, 100); var query = from n in nums.AsParallel() select new { thread = Thread.CurrentThread.ManagedThreadId, num = n }; foreach (var item in query) { Console.WriteLine(item); }
AsParallel() 可以将串行的代码转换为并行
AsOrdered() 就是将并行结果还是按照 未排序的样子进行排序。。。
asOrdered => orderby
[10,1,2,3,4] => 并行计算.asOrderrd => [10,1,2,3,4]
[10,1,2,3,4] => orderby =>[1,2,3,4,10]
AsUnordered() 不按照原始的顺序排序。。。
AsSequential() <=> AsParallel() 是相对应的。。。。
前者将plinq转换为linq 后者将linq转换为plinq
0:010> !clrstack OS Thread Id: 0x579c (10) Child SP IP Call Site 053bf580 77c0e91c [HelperMethodFrame: 053bf580] System.Threading.Thread.SleepInternal(Int32) 053bf604 7029daba System.Threading.Thread.Sleep(Int32) 053bf60c 00ba18f4 *** WARNING: Unable to verify checksum for C:\1\ConsoleApplication2\ConsoleApplication2\bin\Debug\ConsoleApplication2.exe ConsoleApplication2.Program.GetThreadID() [C:\1\ConsoleApplication2\ConsoleApplication2\Program.cs @ 38] 053bf620 00ba188d ConsoleApplication2.Program+c.b__0_0(Int32) [C:\1\ConsoleApplication2\ConsoleApplication2\Program.cs @ 22] 053bf638 00ba184e System.Linq.Parallel.SelectQueryOperator`2+SelectQueryOperatorResults[[System.Int32, mscorlib],[System.__Canon, mscorlib]].GetElement(Int32) 053bf644 68466a7d System.Linq.Parallel.QueryResults`1[[System.__Canon, mscorlib]].get_Item(Int32) 053bf64c 6845712c System.Linq.Parallel.PartitionedDataSource`1+ListContiguousIndexRangeEnumerator[[System.__Canon, mscorlib]].MoveNext(System.__Canon ByRef, Int32 ByRef) 053bf668 00ba17cf System.Linq.Parallel.PipelineSpoolingTask`2[[System.__Canon, mscorlib],[System.Int32, mscorlib]].SpoolingWork() 053bf684 6846c222 System.Linq.Parallel.SpoolingTaskBase.Work() 053bf6bc 6845a7e0 System.Linq.Parallel.QueryTask.BaseWork(System.Object) 053bf6dc 6845aa42 System.Linq.Parallel.QueryTask+c.b__10_0(System.Object) 053bf6e0 7028dcff System.Threading.Tasks.Task.InnerInvoke() 053bf6ec 7028d934 System.Threading.Tasks.Task.Execute() 053bf710 7028dcba System.Threading.Tasks.Task.ExecutionContextCallback(System.Object) 053bf714 702e1512 System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 053bf780 702e1446 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 053bf794 7028db38 System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 053bf7f8 7028da4c System.Threading.Tasks.Task.ExecuteEntry(Boolean) 053bf808 7028d98c System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem() 053bf80c 7029b2d3 System.Threading.ThreadPoolWorkQueue.Dispatch() 053bf85c 7029b17a System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 053bfa80 7151ea96 [DebuggerU2MCatchHandlerFrame: 053bfa80]
plinq底层都是用task的。。。。 基于task的一些编程模型,让我们快速进行并行计算的。
WithDegreeOfParallelism(Environment.ProcessorCount) 告诉plinq当前8个线程都要参与。。。
WithCancellation: 如果执行之前被取消,那就不要执行了。。。
public enum ParallelExecutionMode { Default = 0, ForceParallelism = 1 }
Plinq :主要是划分区块,然后对区块进行聚合计算。。。从而达到分而治之。。。
smallsum smallsum smallsum smallsum
-> mergesum <- -> mergesum <-
-> totalsum <-
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { CancellationTokenSource source = new CancellationTokenSource(); var nums = Enumerable.Range(0, 2).ToList(); nums[0] = 10000; var query = from n in nums.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount) .WithCancellation(source.Token) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.Default) select new { thread = GetThreadID(), num = n }; foreach (var item in query) { Console.WriteLine(item); } Console.Read(); } static int GetThreadID() { return Thread.CurrentThread.ManagedThreadId; } } }折叠
RunLonging... Thread。。。。
protected internal override void QueueTask(Task task) { if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) { new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) { IsBackground = true }.Start(task); return; } bool forceGlobal = (task.Options & TaskCreationOptions.PreferFairness) > TaskCreationOptions.None; ThreadPool.UnsafeQueueCustomWorkItem(task, forceGlobal); }
private void button1_Click(object sender, EventArgs e) { Task task = new Task(() => { try { label1.Text = "你好"; } catch (Exception ex) { MessageBox.Show(ex.Message); } }); task.Start(TaskScheduler.FromCurrentSynchronizationContext()); }
var task = Task.Factory.StartNew(() => { //默认耗时操作 Thread.Sleep(1000 * 10); }); task.ContinueWith(t => { label1.Text = "你好"; }, TaskScheduler.FromCurrentSynchronizationContext());
//例: using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { Task task = new Task(() => { //System.Threading.Thread.Sleep(1000000); Console.WriteLine("hello world!"); }); task.Start(new PerThreadTaskScheduler()); Console.Read(); } } public class PerThreadTaskScheduler : TaskScheduler { /// <summary> /// 给debug用的。 /// </summary> /// <returns></returns> protected override IEnumerable<Task> GetScheduledTasks() { return Enumerable.Empty<Task>(); } /// <summary> /// 执行task /// </summary> /// <param name="task"></param> protected override void QueueTask(Task task) { var thread = new Thread(() => { TryExecuteTask(task); }); thread.Start(); } /// <summary> /// 同步执行 /// </summary> /// <param name="task"></param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns></returns> protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return true; } } }折叠
xxxbegin xxxend 这么两个配对的经典方法。。。。 委托给线程池去执行的。。。 FileStream (ReadBegin, ReadEnd)配对方法 Action委托,都可以异步执行
static void Main() { FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); var bytes = new byte[fs.Length]; var task = Task.Factory.FromAsync(fs.BeginRead, fs.EndRead, bytes, 0, bytes.Length, string.Empty); var nums = task.Result; Console.WriteLine(nums); }
FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); var bytes = new byte[fs.Length]; fs.BeginRead(bytes, 0, bytes.Length, (aysc) => { var nums = fs.EndRead(aysc); Console.WriteLine(nums); }, string.Empty); Console.Read();
aysc await 本质也是用了一个包装器。。。
DownloadDataTaskAsync: 看看这个是怎么用的。。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication2 { class Program { static void Main() { //FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); //var bytes = new byte[fs.Length]; //var task = Task.Factory.FromAsync(fs.BeginRead, fs.EndRead, bytes, 0, bytes.Length, string.Empty); //var nums = task.Result; //Console.WriteLine(nums); //FileStream fs = new FileStream(Environment.CurrentDirectory + "//1.txt", FileMode.Open); //var bytes = new byte[fs.Length]; //fs.BeginRead(bytes, 0, bytes.Length, (aysc) => //{ // var nums = fs.EndRead(aysc); // Console.WriteLine(nums); //}, string.Empty); //Console.Read(); //Action action = () => //{ // System.Threading.Thread.Sleep(100000); // Console.WriteLine("hello world!"); //}; //var task = Task.Factory.FromAsync(action.BeginInvoke, action.EndInvoke, string.Empty); ////task.Start(); //Console.Read(); var task = GetTaskAsyc("http://cnblogs.com"); var nums = task.Result; Console.WriteLine(nums.Length); } public static Task<byte[]> GetTaskAsyc(string url) { TaskCompletionSource<byte[]> source = new TaskCompletionSource<byte[]>(); WebClient client = new WebClient(); client.DownloadDataCompleted += (sender, e) => { try { //如果下载完成了,将当前的byte[]个数给task包装器 source.TrySetResult(e.Result); } catch (Exception ex) { source.TrySetException(ex); } }; client.DownloadDataAsync(new Uri(url)); return source.Task; } } }折叠
【潜规则】 ThreadPool IOthread网络IO,文件IO都有一些异步方法。 MemoryStream,FileStream。WebRequest
如果你用同步的思维去理解,容易出问题。。。返回值对不上。 我们在编译器层面看到的代码,不见得是真的代码。。。
private static void InvokeMoveNext(object stateMachine) { ((IAsyncStateMachine)stateMachine).MoveNext(); }
是clr反向通知的。。 如果使用同步IO,会是什么样的呢??? ↓
一个线程读,一个写,在release的某种情况下,会有debug。。。 Thread.MemoryBarrier , VolatileRead
1.不可以底层对代码进行优化。。 2.我的read和write都是从memrory中读取。。。【我读取的都是最新的】
class Program { public static volatile bool isStop = false; static void Main(string[] args) { //isStop = false; var t = new Thread(() => { var isSuccess = false; while (!isStop) { isSuccess = !isSuccess; } }); t.Start(); Thread.Sleep(1000); isStop = true; t.Join(); Console.WriteLine("主线程执行结束!"); Console.ReadLine(); } }
Interlocked 【只能做一些简单类型的计算】 Increment:自增操作
Add: 增加指定的值
Exchange: 赋值
CompareExchange: 比较赋值
特殊的业务逻辑让thread在用户模式下进行自选,欺骗cpu当前thread正在运行中。。。。 用户模式 -> 内核模式 -> 用户模式 数据递增
就是调用win32底层的代码,来实现thread的各种操作 Thread.Sleep
用户模式 + 内核模式 【场景是做多的】 xxxslim
多个线程对一个“共享资源”进行操作的时候,容易出问题。。。 共享资源混乱。。。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { public static SpinLock spinLock = new SpinLock(); static void Main(string[] args) { var sum = 5; var num1 = 10; var num2 = 10; //Interlocked.Exchange(ref sum, 10); // sum=10; //Interlocked.CompareExchange(ref num1, sum, num2); // num1==num2 ; num1=sum; //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { var b = false; spinLock.Enter(ref b); Console.WriteLine(nums++); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { spinLock.Exit(); } } } } }折叠
在万不得已的情况下,不要使用内核模式的锁,因为代价太大。。。 其实我们有更多的方式可以替代:混合锁机制, lock
true:表示终止状态 false:表示非终止 现实中的场景: 进站火车闸机,我们用火车票来实现进站操作。 true: 终止表示: 闸机中没有火车票, 终止=> 初始状态 false: 非终止表示:闸机中此时有一张火车票 static AutoResetEvent areLock = new AutoResetEvent(false); static void Main(string[] args) { areLock.WaitOne(); //塞一张火车票到闸机中,因为此时有票在闸机,所以我只能等待 =》 mainthread Console.WriteLine("火车票检验通过,可以通行"); areLock.Set(); //从闸机中取火车票 }
WaitOne:用来将火车票塞入到闸机中 Set: 从闸机中把票取出来
static AutoResetEvent areLock = new AutoResetEvent(true); static void Main(string[] args) { areLock.WaitOne(); //塞一张火车票到闸机中 =》 mainthread Console.WriteLine("火车票检验通过,可以通行"); areLock.Set(); //从闸机中取火车票 }
ManualResetEvent :现实场景就是 => 有人看守的铁道栅栏
如果有火车马上要来了,这个栅栏会合围起来,阻止行人通过铁路。 如果火车走了,这个栅栏就会从合围状态转为两侧。 行人就可以通过了 true: 栅栏没有合围,没有阻止行人通过铁路 false:栅栏合围了, 阻止行人通过
两者 ManualResetEvent 和 AutoResetEvent 是不一样的,所以不能混用。。。
class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); static ManualResetEvent mreLock = new ManualResetEvent(false); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Thread.Sleep(5000); //5s中之后,火车开走了,这个时候就要撤销栅栏 mreLock.Set(); Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { mreLock.WaitOne(); Console.WriteLine(nums++); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
static Semaphore seLock = new Semaphore(1, 1); 我当前只能是一个线程通过。。 class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); static Semaphore seLock = new Semaphore(1, 10); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { seLock.WaitOne(); Console.WriteLine(nums++); seLock.Release(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); //static Semaphore seLock = new Semaphore(1, 10); static Mutex mutex = new Mutex(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { //seLock.WaitOne(); mutex.WaitOne(); Console.WriteLine(nums++); //seLock.Release(); mutex.ReleaseMutex(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } }
这三种锁,我们发现都有一个WaitOne方法。。。因为他们都是继承于WaitHandle。。。 三种锁都是同根生,其实底层都是通过SafeWaitHandle来对win32api的一个引用。
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { //static AutoResetEvent areLock = new AutoResetEvent(true); //static ManualResetEvent mreLock = new ManualResetEvent(false); //static Semaphore seLock = new Semaphore(1, 10); static Mutex mutex = new Mutex(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { for (int i = 0; i < 100; i++) { try { //seLock.WaitOne(); mutex.WaitOne(); Console.WriteLine(nums++); //seLock.Release(); mutex.ReleaseMutex(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { } } } } }折叠
不是从限定个数的角度出发。 而是按照读写的角度进行功能分区。。。。
sqllite: 库锁
sqlserver: 行锁 【我只锁住行】
多个线程可以一起读, 只能让要给线程去写。。。
读写 8/2 开。。。
Ctrip。。。。。 机票db。。。
商旅事业部: orders,,,
机票事业部: orders。。。
order1 join order2...join plane 读取时间太长,也导致write线程长时间进不来,
namespace ConsoleApplication1 { class Program { static ReaderWriterLock rwlock = new ReaderWriterLock(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Read(); }); } Task.Factory.StartNew(() => { Write(); }); Console.Read(); } static int nums = 0; /// <summary> /// 线程读 /// </summary> static void Read() { while (true) { Thread.Sleep(10); rwlock.AcquireReaderLock(int.MaxValue); Thread.Sleep(10); Console.WriteLine("当前 t={0} 进行读取 {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); rwlock.ReleaseReaderLock(); } } /// <summary> /// 线程写 /// </summary> static void Write() { while (true) { //3s进行一次写操作 Thread.Sleep(3000); rwlock.AcquireWriterLock(int.MaxValue); Thread.Sleep(3000); Console.WriteLine("当前 t={0} 进行写入。。。。。。。。。。。。。。。。。。。。。。。{1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); rwlock.ReleaseWriterLock(); } } }折叠
比如说:Orders Products Users
比如说: Orders表 10w: 10个线程读取,一个线程1w
Products表:5w 5个线程 一个1w
Users 表 1w 2个线程 5w
xxxx.continuewithcontinuewith.... continuewith + TaskCreationOptions.AttachedToParent CountdownEvent cdeLock = new CountdownEvent(10);
初始化的时候设置一个 默认threadcount上线。。。
操作,相当于Task.Wait() 执行完成了。
Reset: 重置当前的threadcount上线
Wait: 相当于我们的Task.WaitAll
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static CountdownEvent cdeLock = new CountdownEvent(10); static void Main(string[] args) { //加载Orders搞定 cdeLock.Reset(10); for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { LoadOrders(); }); } cdeLock.Wait(); Console.WriteLine("所有的Orders都加载完毕。。。。。。。。。。。。。。。。。。。。。"); //加载Product搞定 cdeLock.Reset(5); for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { LoadProducts(); }); } cdeLock.Wait(); Console.WriteLine("所有的Products都加载完毕。。。。。。。。。。。。。。。。。。。。。"); //加载Users搞定 cdeLock.Reset(2); for (int i = 0; i < 2; i++) { Task.Factory.StartNew(() => { LoadUsers(); }); } cdeLock.Wait(); Console.WriteLine("所有的Users都加载完毕。。。。。。。。。。。。。。。。。。。。。"); Console.WriteLine("所有的表数据都执行结束了。。。恭喜恭喜。。。。"); Console.Read(); } /// <summary> /// 10 threads /// </summary> static void LoadOrders() { //具体的业务逻辑,不细说 Console.WriteLine("当前Orders正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } /// <summary> /// 5 threads /// </summary> static void LoadProducts() { //具体的业务逻辑,不细说 Console.WriteLine("当前Products正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } /// <summary> /// 2 threads /// </summary> static void LoadUsers() { //具体的业务逻辑,不细说 Console.WriteLine("当前Users正在加载中。。。{0}", Thread.CurrentThread.ManagedThreadId); cdeLock.Signal(); } } }折叠
限定线程个数的一把锁 :Monitor Enter 锁住某一个资源
Exit 退出某一个资源
static object lockMe = new object(); static void Run() { for (int i = 0; i < 100; i++) { var b = false; try { //SpinLock Monitor.Enter(lockMe, ref b); Console.WriteLine(nums++); //seLock.Release(); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { if (b) Monitor.Exit(lockMe); } } }
lock (lockMe) { Console.WriteLine(nums++); }
------------------------------------------ 编译器 ---------------------------------------
.method private hidebysig static void Run () cil managed { // Method begins at RVA 0x20a0 // Code size 72 (0x48) .maxstack 3 .locals init ( [0] int32, [1] object, [2] bool, [3] bool ) IL_0000: nop IL_0001: ldc.i4.0 IL_0002: stloc.0 IL_0003: br.s IL_003e // loop start (head: IL_003e) IL_0005: nop IL_0006: ldsfld object ConsoleApplication1.Program::lockMe IL_000b: stloc.1 IL_000c: ldc.i4.0 IL_000d: stloc.2 .try { IL_000e: ldloc.1 IL_000f: ldloca.s 2 IL_0011: call void [mscorlib]System.Threading.Monitor::Enter(object, bool&) IL_0016: nop IL_0017: nop IL_0018: ldsfld int32 ConsoleApplication1.Program::nums IL_001d: dup IL_001e: ldc.i4.1 IL_001f: add IL_0020: stsfld int32 ConsoleApplication1.Program::nums IL_0025: call void [mscorlib]System.Console::WriteLine(int32) IL_002a: nop IL_002b: nop IL_002c: leave.s IL_0039 } // end .try finally { IL_002e: ldloc.2 IL_002f: brfalse.s IL_0038 IL_0031: ldloc.1 IL_0032: call void [mscorlib]System.Threading.Monitor::Exit(object) IL_0037: nop IL_0038: endfinally } // end handler IL_0039: nop IL_003a: ldloc.0 IL_003b: ldc.i4.1 IL_003c: add IL_003d: stloc.0 IL_003e: ldloc.0 IL_003f: ldc.i4.s 100 IL_0041: clt IL_0043: stloc.3 IL_0044: ldloc.3 IL_0045: brtrue.s IL_0005 // end loop IL_0047: ret } // end of method Program::Run折叠
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication1 { class Program { static Person personLockMe = new Person(); static void Main(string[] args) { //比如开启5个task for (int i = 0; i < 5; i++) { Task.Factory.StartNew(() => { Run(); }); } Console.Read(); } static int nums = 0; static void Run() { Person person = new Person(); for (int i = 0; i < 100; i++) { lock (person) { Console.WriteLine(nums++); } //var b = false; //try //{ // //SpinLock // Monitor.Enter(lockMe, ref b); // Console.WriteLine(nums++); // //seLock.Release(); //} //catch (Exception ex) //{ // Console.WriteLine(ex.Message); //} //finally //{ // if (b) Monitor.Exit(lockMe); //} } } } class Person { } }折叠
Thread.Sleep(1) :让线程休眠1ms
Thread.Sleep(0) :让线程放弃当前的时间片,让本线程更高或者同等线程得到时间片运行。
Thread.Yield() :让线程立即放弃当前的时间片,可以让更低级别的线程得到运行,当其他thread时间片用完,本thread再度唤醒。。
Yield < Sleep(0) < Sleep(1) 一个时间片 = 30ms。。
通常会用到用户模式锁。。。while + 这些Thread。。。
ManualResetEventSlim: 有人看守的火车轨道标志,栅栏是合围状态
// System.Threading.WaitHandle [SecurityCritical] [MethodImpl(MethodImplOptions.InternalCall)] private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);
for (int i = 0; i < spinCount; i++) { if (this.IsSet) { return true; } if (i < num2) { if (i == num2 / 2) { Thread.Yield(); } else { Thread.SpinWait(PlatformHelper.ProcessorCount * (4 << i)); } } else { if (i % num4 == 0) { Thread.Sleep(1); } else { if (i % num3 == 0) { Thread.Sleep(0); } else { Thread.Yield(); } } } if (i >= 100 && i % 10 == 0) { cancellationToken.ThrowIfCancellationRequested(); } }
class Program { //默认1个线程同时运行,最大10个 static SemaphoreSlim slim = new SemaphoreSlim(1, 10); static void Main(string[] args) { for (int i = 0; i < 10; i++) { Task.Run(() => { Run(); }); } //某一个时刻,我像改变默认的并行线程个数,从默认的1 改成10 System.Threading.Thread.Sleep(2000); slim.Release(10); Console.Read(); } static void Run() { slim.Wait(); Thread.Sleep(1000 * 5); Console.WriteLine("当前t1={0} 正在运行 {1}", Thread.CurrentThread.ManagedThreadId, DateTime.Now); slim.Release(); } }
用EnterReadLock 带代替AcquireReaderLock 方法,性能比内核版本要搞得多
ReaderWriterLockSlim slim = new ReaderWriterLockSlim(); slim.EnterReadLock(); slim.ExitReadLock(); slim.EnterWriteLock(); slim.ExitWriteLock(); ReaderWriterLock rwlock = new ReaderWriterLock(); //rwlock.AcquireReaderLock() Console.Read();
混合锁: 先在用户模式下内旋,如果超过一定的阈值,会切换到内核锁。。。
// System.Collections.Concurrent 1. ConcurrentQueue => Queue 2. ConcurrentDictionary<TKey, TValue> => Dictionary 3. ConcurrentStack<T> => Stack 4. ConcurrentBag<T> !=> List/ LinkList ???
ThreadLocal 是什么意思??? 每个线程有一个自己的备份(线程不可见)
每一个线程分配一个“链表” 这个链表可以任务是list(ThreadLocalList)
TryTake: 获取数据
t1: 1,2,3 locals
t2: 1,3,2 locals
t3: 2,3,4 locals
t1: 1,2,3 locals
t2: 1,3,2 locals
t3: empty locals
这个时候就到Bag的下一级的ttl head 和 next中去找。。。。【steal 偷盗的时候使用的】
for (threadLocalList = this.m_headList; threadLocalList != null; threadLocalList = threadLocalList.m_nextList) { list.Add(threadLocalList.m_version); if (threadLocalList.m_head != null && this.TrySteal(threadLocalList, out result, take)) { return true; } }
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); bag.Add(1); bag.Add(2); var result = 0; bag.TryTake(out result); }
线程安全的Stack是使用链表的形式,而同步版本是用 数组 实现的。。。
线程安全的Stack是使用Interlocked来实现线程安全。。 而没有使用 内核锁
static void Main(string[] args) { ConcurrentStack<int> stack = new ConcurrentStack<int>(); stack.Push(1); stack.Push(2); var result = 0; stack.TryPop(out result); Console.WriteLine(result); }
static void Main(string[] args) { ConcurrentQueue<int> queue = new ConcurrentQueue<int>(); queue.Enqueue(1); var result = 0; queue.TryDequeue(out result); Console.WriteLine(result); }
ConcurrentDictionary<int, int> dic = new ConcurrentDictionary<int, int>(); dic.TryAdd(1, 10); dic.ContainsKey(1); foreach (var item in dic) { Console.WriteLine(item.Key + item.Value); }
ConcurrentDictionary => 同步 + lock/其他锁机制 也是可以的。。。
1.现象: 门票的首页之后,页面一直在loading中。。。。
查下来是在一个while循环中做了一个 << >>操作,到时候while条件一直都是true。。。
定义为“一级事件” => 上报董事会 “二级事件” => 事业部CEO
while(true) { i=i<<2。。。 if(ddddd){ } }
如果用windbg去找到。。。 去调试dump文件
<1> 生成release x64
<2> 在“任务管理器”中生成一个dump文件
<3> 需要用x64 的windbg。。。
<4> !runaway 查看当前托管线程已执行时间
Thread Time 9:5ca8 0 days 0:00:37.796 0:2a68 0 days 0:00:00.015 8:5600 0 days 0:00:00.000 7:46fc 0 days 0:00:00.000 6:33d4 0 days 0:00:00.000 5:3498 0 days 0:00:00.000 4:5644 0 days 0:00:00.000 3:398 0 days 0:00:00.000 2:2a60 0 days 0:00:00.000 1:63c0 0 days 0:00:00.000
<5> 切换到指定的线程 ~~[5ca8]s
<6> 查看当前线程的调用堆栈 !clrstack
000000f4d63ff2a8 00007ff8d50405f7 *** WARNING: Unable to verify checksum for ConsoleApplication51.exe ConsoleApplication51.Program+c.b__1_0() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 22] 000000f4d63ff2b0 00007ff932b10937 System.Threading.Tasks.Task.Execute() 000000f4d63ff2f0 00007ff932ac674e System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 000000f4d63ff3c0 00007ff932ac65e7 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 000000f4d63ff3f0 00007ff932b10bdd System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 000000f4d63ff4a0 00007ff932b10303 System.Threading.Tasks.Task.ExecuteEntry(Boolean) 000000f4d63ff4e0 00007ff932acfa10 System.Threading.ThreadPoolWorkQueue.Dispatch() 000000f4d63ff978 00007ff934626a53 [DebuggerU2MCatchHandlerFrame: 000000f4d63ff978]
从调用堆栈上来看,当前线程 在 Program+c.b__1_0() 方法之后就没有调用堆栈了,说明方法在这个地方
<7> 最后到指定的b__1_0方法去寻找一下是否有异常。。。
<8> 通过windbg自己生成dll 【!help】
//通过下面命令生成,并查找 //!dumpdomain //!savemodule 00007ff8d4f350f0 c:\2\1.dll class Program { static void Main(string[] args) { Run(); Console.Read(); } static void Run() { var task = Task.Factory.StartNew(() => { var i = true; //这个地方是一个非常复杂的逻辑。导致死循环 while (true) { i = !i; } }); } }
=> 乱用lock语句,或者“锁机制” [这是一种情况]
0:007> !syncblk Index SyncBlock MonitorHeld Recursion Owning Thread Info SyncBlock Owner 7 0000020bf7522eb8 3 1 0000020bf74c7910 4e04 0 0000020b80007808 ConsoleApplication51.Program
0000005cf5ffea98 00007ff949476c24 [GCFrame: 0000005cf5ffea98] 0000005cf5ffebd8 00007ff949476c24 [GCFrame: 0000005cf5ffebd8] 0000005cf5ffec18 00007ff949476c24 [HelperMethodFrame_1OBJ: 0000005cf5ffec18] System.Threading.Monitor.Enter(System.Object) 0000005cf5ffed10 00007ff8d5030658 ConsoleApplication51.Program.Run2() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 55] 0000005cf5ffed50 00007ff8d50305ec ConsoleApplication51.Program.b__1_0() [c:\users\hxc\documents\visual studio 2015\Projects\ConsoleApplication51\ConsoleApplication51\Program.cs @ 44] 0000005cf5ffed80 00007ff932b10937 System.Threading.Tasks.Task.Execute() 0000005cf5ffedc0 00007ff932ac674e System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 0000005cf5ffee90 00007ff932ac65e7 System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean) 0000005cf5ffeec0 00007ff932b10bdd System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef) 0000005cf5ffef70 00007ff932b10303 System.Threading.Tasks.Task.ExecuteEntry(Boolean) 0000005cf5ffefb0 00007ff932acfa10 System.Threading.ThreadPoolWorkQueue.Dispatch() 0000005cf5fff448 00007ff934626a53 [DebuggerU2MCatchHandlerFrame: 0000005cf5fff448]
1、!dumpheap -stat 查看clr的托管堆中的各个类型的占用情况
00007ff932cc2aa8 19 1296 System.String[] 00007ff932cc3698 58 3248 System.RuntimeType 00007ff932cc16b8 186 9218 System.String 000001358b1503d0 57 12824 Free 00007ff932cc1d30 6 35216 System.Object[] 00007ff932cc5dc0 13762 660576 System.Text.StringBuilder 00007ff932cc2860 13775 220334298 System.Char[]
!DumpHeap /d -mt 00007ff932cc2860 //查看当前的方法表 !DumpObj /d 00000135978d5340 //查看当前char[]的内容 !gcroot 00000135a60f4940 //查看当前地址的Root。。。 所以结合“StringBuilder”,结合 ”hello world“ 我们就找出了问题。。。
救火的问题。。。。给公司挽回损失。。。。 【.net高级调试】
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication51 { class Program { static StringBuilder sb = new StringBuilder(); static void Main(string[] args) { //Run(); //new Program().Run(); for (int i = 0; i < 10000000; i++) { sb.Append("hello world"); } Console.WriteLine("执行完毕!"); Console.Read(); } //static void Run() //{ // var task = Task.Factory.StartNew(() => // { // var i = true; // //这个地方是一个非常复杂的逻辑。导致死循环 // while (true) // { // i = !i; // } // }); //} //void Run() //{ // lock (this) // { // var task = Task.Run(() => // { // Console.WriteLine("----- start ---- "); // Thread.Sleep(1000); // Run2(); // Console.WriteLine("------ end -----"); // }); // task.Wait(); // } //} //void Run2() //{ // lock (this) // { // Console.WriteLine("我是 run2.。。。。"); // } //} } }折叠
场景: 1500w数据 800w会员,
全内存跑, 25G空间
最原始的时候,能够达到 90s -120s
别让技术阻碍了业务发展。 【技术一定要领先于业务 eric】
//1500w foreach(var item in list) { new class(){tradecount=10 ,customerid=1} } //tempCustomerEntityList : 1500w 30s var query = from item in tempCustomerEntityList group item by item.CustomerId into grp select new { key = grp.Key, list = grp.Count() };
Dictionary<int,int> dic
key: customerid
value: totalcount
int num = this.comparer.GetHashCode(key) & 2147483647; 1500w
int[] nums=new int[100]; 1s index: 0-99 是不是可以存放customerid=1 .。。。 99 value: totalcount foreach (var customerEntity in tempCustomerEntityList) { if (totalTradeCountArray[(int)customerEntity.CustomerId] == 0) { totalTradeCountArray[(int)customerEntity.CustomerId] = 1; } else { totalTradeCountArray[(int)customerEntity.CustomerId]++; } }
<2> 大数据下字典的性能特别烂,因为每次的Add操作都要计算 hashcode,记得使用天然的hash 形式【数组】
比如:Array中,i ndex=customerid,content=个数 总交易个数 BitArray中,index=customerid,content=true/false 是否包含 1s 【城市等级】
<3> 重点优化总交易金额排名 【2-3天】
a、原始的方式: dictionary + orderby【快排2/3】 20 - 30s
key: customerid value: payment 最后对字典进行orderby
b 、改进1: Array + 小根堆 10s-12s 原因在于小跟堆 太大
TopN的问题。 100个大小的小根堆 1500w * 0.25 % ~400w大小堆
c、桶排序 + TopDictonary 4s
payment: 81.12 => 81.12 * 100 =8112 (array index)
100w * 100 = 1个亿 (Array数组index 达到1个亿)
使用payment * 100 作为index,保持payment在1.5w以内。=1500000
100 - 5000 绝大多数人。。。
// Or 条件下的多线程 // And 条件下的多线程 // Customer 条件下的多线程 try { //多线程处理 Task<List<long>>[] tasks = new Task<List<long>>[fieldValueItemList.Count]; for (int i = 0; i < fieldValueItemList.Count; i++) { tasks[i] = Task.Factory.StartNew((fieldValueItem) => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件:{0}", filterCore.GetType().Name))) { List<long> smallCustomerIDList = null; try { smallCustomerIDList = filterCore.Filter((FilterValueItem)fieldValueItem); } catch (Exception ex) { LogHelper.WriteLog(ex); throw; } return smallCustomerIDList; } }, fieldValueItemList[i]); } Task.WhenAll(tasks).ContinueWith(t => { using (SearchStopWatch watch = new SearchStopWatch(string.Format("或者条件 追加List时间: {0}", filterCore.GetType().Name))) { foreach (var task in tasks) { customerIDList.AddRange(task.Result); } } }, TaskContinuationOptions.OnlyOnRanToCompletion).Wait(); } catch (Exception ex) { LogHelper.WriteLog(ex); thr