C# 线程、任务和同步
1,线程概述
线程是程序汇中独立的指令流。线程有一个优先级,实际上正在处理的程序的位置计数器,一个存储其局部变量的栈。每个线程都有自己的栈。但应用程序的内存和堆由一个进程的所有线程共享。
进程包含资源,如windows句柄,文件句柄或其他内核对象。每个进程都分配了虚拟内存。一个进程至少包含一个线程。操作系统会调度线程。
总结:
同步代码区域(代码块):lock, Monitor, SpinLock,
Mutex,WaitHandle,Semaphore,EventWaitHandle,AutoRestEvent/ManualResetEvent.
Barrier,
ReadWriterLock(Slim)
多线程变量同步:InterLocked,
进程间同步:
Mutex, Semaphore,
2,异步委托:
创建线程的一种简单方式是定义一个委托,并异步调用它。委托时方法类型安全的引用。Delegate类还支持异步调用委托,在后头创建一个执行任务的线程。
委托使用线程池来完成异步调用。
public
delegate int TakesAWhileDelegate(int data, int ms);
2.1投票:
IAsyncResult
ar=al.BeginInvoke(1,3000, null, null);
int
result=dl.EndInvoke(ar);
2.2 等待句柄 (WaitHandle)
1 class Program 2 { 3 public delegate int TakesAWhileDelegate(int data, int ms); 4 static int TakesAWhile(int data, int ms) 5 { 6 Console.WriteLine("TakesAWhile started"); 7 Thread.Sleep(ms); 8 Console.WriteLine("TakesAWhile completed"); 9 return ++data; 10 } 11 private static void Main(string[] args) 12 { 13 Console.WriteLine("Main Begin."); 14 TakesAWhileDelegate dl = TakesAWhile; 15 IAsyncResult ar = dl.BeginInvoke(2, 3000, null, null); 16 17 //ar.IsCompleted 18 //ar.AsyncWaitHandle.WaitOne(50) 19 20 dl.EndInvoke(ar); 21 Console.WriteLine("Main() end."); 22 Console.ReadLine(); 23 } 24 }
2.3 异步回调 (dl.BeginInvoke(1,3000, TakesAWhileCompleted, dl)
)
传入一个回调函数委托,来异步执行。
1 class Program 2 { 3 public delegate int TakesAWhileDelegate(int data, int ms); 4 static int TakesAWhile(int data, int ms) 5 { 6 Console.WriteLine("TakesAWhile started"); 7 Thread.Sleep(ms); 8 Console.WriteLine("TakesAWhile completed"); 9 return ++data; 10 } 11 private static void Main(string[] args) 12 { 13 Console.WriteLine("Main Begin."); 14 TakesAWhileDelegate dl = TakesAWhile; 15 dl.BeginInvoke(2, 3000, ar => 16 { 17 if (ar == null) 18 throw new ArgumentNullException("ar"); 19 TakesAWhileDelegate dl1 = ar.AsyncState as TakesAWhileDelegate; 20 Trace.Assert(dl1 != null, "Invalid object type"); 21 int result = dl1.EndInvoke(ar); 22 Console.WriteLine("result: {0}", result); 23 }, null); 24 25 Console.WriteLine("Main() end."); 26 Console.ReadLine(); 27 } 28 }
3,Thread类
3.1
给线程传递数据
1,使用带ParameterizedThreadStart委托参数的Thread构造函数。2,创建自定义类,把线程的方法定位实例方法,这样就可以初始化实例的数据,之后启动线程。
3.2
后台线程:
只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程依然是激活的,直到所有前台线程完成其任务为止。
private static void Main(string[] args) { var t1 = new Thread(ThreadMain) {Name = "MyNewThread", IsBackground = false}; t1.Start(); Console.WriteLine("Main Thread ending now."); } static void ThreadMain() { Console.WriteLine("Thread {0} started", Thread.CurrentThread.Name); Thread.Sleep(3000); Console.WriteLine("Thread {0} Completed",Thread.CurrentThread.Name); }
3.3 线程的优先级
4 线程池
5,任务
5.1 启动任务:
TaskFactory tf = new TaskFactory(); Task t1 = tf.StartNew(TaskMethod); Task t2 = Task.Factory.StartNew(TaskMethod); Task t3 = new Task(TaskMethod); t3.Start();
5.2 连续的任务
Task t1=new Task(DoOnFirst);
Task
t2=t1.ContinueWith(DoOnSecond);
5.3任务层次结构
6 Parallel
类
Parallel.For
Parallel.ForEach()
Parallel.Invoke(fun1,fun2);
1 private static void Main(string[] args) 2 { 3 Parallel.Invoke(TaskMethod1,TaskMethod2); 4 5 Console.ReadLine(); 6 } 7 8 static void TaskMethod1() 9 { 10 Console.WriteLine("1running in a task."); 11 Console.WriteLine("Task id: {0}", Task.CurrentId); 12 } 13 static void TaskMethod2() 14 { 15 Console.WriteLine("2running in a task."); 16 Console.WriteLine("Task id: {0}", Task.CurrentId); 17 }
7. 取消架构
8, 线程问题: 争用条件和死锁
8.1 争用条件:
1 public class StateObject 2 { 3 private int state = 5; 4 5 public void ChangeState(int loop) 6 { 7 lock (this) 8 { 9 if (state == 5) 10 { 11 state++; 12 Trace.Assert(state == 6, "Race condition ocurred after " + loop + " loops " + Task.CurrentId); 13 if (loop % 1000000 == 0) 14 { 15 Console.WriteLine("after " + loop + " loops " + Task.CurrentId); 16 } 17 } 18 state = 5; 19 } 20 21 } 22 } 23 24 public class SampleTask 25 { 26 public void RaceCondition(object o) 27 { 28 Trace.Assert(o is StateObject, "o must be of type StateObject."); 29 StateObject state = o as StateObject; 30 int i = 0; 31 while (true) 32 { 33 state.ChangeState(i++); 34 } 35 } 36 } 37 38 public class SampleThread 39 { 40 public SampleThread(StateObject s1, StateObject s2) 41 { 42 this.s1 = s1; 43 this.s2 = s2; 44 } 45 46 private StateObject s1; 47 private StateObject s2; 48 49 public void Deadlock1() 50 { 51 int i = 0; 52 while (true) 53 { 54 lock (s1) 55 { 56 lock (s2) 57 { 58 s1.ChangeState(i); 59 s2.ChangeState(i++); 60 Console.WriteLine("still running, {0}", i); 61 62 } 63 } 64 //Thread.Yield(); 65 } 66 } 67 public void Deadlock2() 68 { 69 int i = 0; 70 while (true) 71 { 72 lock (s2) 73 { 74 lock (s1) 75 { 76 s1.ChangeState(i); 77 s2.ChangeState(i++); 78 Console.WriteLine("still running, {0}", i); 79 80 } 81 } 82 //Thread.Yield(); 83 } 84 } 85 } 86 87 private static void Main(string[] args) 88 { 89 var state1 = new StateObject(); 90 var state2 = new StateObject(); 91 92 SampleThread st = new SampleThread(state1, state2); 93 94 Task.Factory.StartNew(st.Deadlock1); 95 Task.Factory.StartNew(st.Deadlock2); 96 97 Console.ReadLine(); 98 }
9, 同步
9.1 Lock
语句
栈是线程独立的,但不是私有的。所有线程的栈内所有内容,都可以被其他线程访问。
为什么不用 lock(this)
?
因为这通常超出我们的控制,因为其他人也有可能lock这个对象。一个私有的对象是更好的选择。避免lock一个公开类型,或者超出你代码的控制的实例。
Tips:可以提供线程安全的原子操作。
1 class Program 2 { 3 public class SharedState 4 { 5 public int State { get; set; } 6 } 7 8 public class Job 9 { 10 private SharedState sharedState; 11 12 public Job(SharedState sharedState) 13 { 14 this.sharedState = sharedState; 15 } 16 17 public void DoTheJob() 18 { 19 for (int i = 0; i < 50000; i++) 20 { 21 sharedState.State +=1; 22 } 23 } 24 } 25 26 private static void Main(string[] args) 27 { 28 int numTasks = 20 29 ; 30 var state = new SharedState(); 31 var tasks = new Task[numTasks]; 32 for (int j = 0; j < 5; j++) 33 { 34 state.State = 0; 35 for (int i = 0; i < numTasks; i++) 36 { 37 tasks[i] = new Task(new Job(state).DoTheJob); 38 tasks[i].Start(); 39 40 } 41 42 for (int i = 0; i < numTasks; i++) 43 { 44 tasks[i].Wait(); 45 46 } 47 Console.WriteLine("summarized {0}", state.State); 48 } 49 50 51 } 52 53 }
9.2
Interlocked类
Interlock类用于使变量的简单语句原子化,线程安全方式递增、递减、交换和读取。i++不是线程安全的(包含3个操作:从内存获取、递增1、存储回内存,这些操作都可以被线程调度器打断)。
9.3 Monitor类
lock语句由编译器解释为Monitor类: Moniter.Enter(obj) ;
Monitor.Exit(obj);
Monitor类的一个优点:可以添加一个等待被锁定的超市值。Monitor.TryEnter(lockObj,500,ref
lockToken);
1 public void DoTheJob() 2 { 3 for (int i = 0; i < 50000; i++) 4 { 5 bool isLocked = false; 6 goLabel: 7 Monitor.TryEnter(sharedState, 500, ref isLocked); 8 if (isLocked) 9 { 10 sharedState.State += 1; 11 Monitor.Exit(sharedState); 12 } 13 else 14 { 15 Console.WriteLine("lock failed."); 16 goto goLabel; 17 } 18 19 } 20 }
9.4
SpinLock结构
适合于有大量的锁定,而且锁定的时间非常短。用法非常接近于Monitor类。获得锁使用Enter()或者TryEnter(),释放锁使用Exit()方法。小心SpinLock的传送,因为是结构,所以会复制。
9.5 WaitHandle基类
Delegate BeginInvoke()
用waithandle.WaitOne(50,false)来bolck当前线程,
WaitHandle是一个抽象基类,用于等待一个信号量的设置。可以等待不同的信号,因为WaitHandle是一个基类,可以派生一些类。
private static void Main(string[] args) { Action ac = () => { Console.WriteLine("Action Begin."); Thread.Sleep(2000); Console.WriteLine("Action End."); }; AsyncCallback callback = (o) => { var cb = (Action)o.AsyncState; cb.EndInvoke(o); Console.WriteLine("Callback finished."); }; IAsyncResult ar = ac.BeginInvoke(callback, ac); while (true) { Console.Write("."); if (ar.AsyncWaitHandle.WaitOne(50, true)) { Console.WriteLine("Can get the result now."); break; } } Console.ReadLine(); }
WaitOne() 等待一个,waitAll()等待多个对象,WaitAny等待多个对象的一个。WaitAll和WaitAny是静态方法。
WaitHandle基类有一个SafeWaitHandle属性,其中可以将本机句柄赋予一个操作系统资源,并等待该句柄。
Mutex、EventWaitHandle
和 Semaphore类继承自WaitHandle基类。所以可以等到使用它们。
9.6 Mutex类
Mutex(mutual
exclusion,互斥)是.net
Framework中提供多个集成同步访问的一个类。它非常类似于Monitor,只有一个线程能拥有锁定。只有一个线程能获得互斥锁定,访问受互斥访问的同步代码区域。
在Mutex类的构造函数中,可以指定互斥是否最初由主调线程拥有。定义互斥的名称,获得互斥是否存在的信息。
系统能识别有名称的Mutex
private static void Main(string[] args) { bool isNew; using (Mutex mutex = new Mutex(false, "ProMutext", out isNew)) { if (isNew) { Console.WriteLine("Get mutex lock."); } else { Console.WriteLine("can‘t get mutex lock."); } Thread.Sleep(3000); } Thread.Sleep(1000000); Console.ReadLine(); }
9.7
Semaphore类
信号量非常类似于互斥,其区别是多个线程使用。信号量是一种技术的互斥锁定。使用信号量可以定义同时访问旗语锁定保护的资源的线程个数。
Semaphore类:可以命名,使用系统范围内的资源,允许不同进程间同步。
static void Main() { int threadCount = 6; int semaphoreCount = 4; var semaphore = new Semaphore( semaphoreCount, semaphoreCount,"ProSemaphore"); var threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(ThreadMain); threads[i].Start(semaphore); } for (int i = 0; i < threadCount; i++) { threads[i].Join(); } Console.WriteLine("All threads finished"); } static void ThreadMain(object o) { Semaphore semaphore = o as Semaphore; Trace.Assert(semaphore != null, "o must be a Semaphore type"); bool isCompleted = false; while (!isCompleted) { if (semaphore.WaitOne(600)) { try { Console.WriteLine("Thread {0} locks the semaphore", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(4000); } finally { semaphore.Release(); Console.WriteLine("Thread {0} releases the semaphore", Thread.CurrentThread.ManagedThreadId); isCompleted = true; } } else { Console.WriteLine("Timeout for thread {0}; wait again", Thread.CurrentThread.ManagedThreadId); } } }
SemaphoreSlim类是对于较短等待时间进行了优化的轻型版本,不能跨进程。不能命名,不使用内核信号量,不能跨进程。
private static void Main(string[] args) { int threadCount = 6; int semaphoreCount = 4; var semaphore = new SemaphoreSlim(semaphoreCount, semaphoreCount); Thread[] threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(ThreadMain); threads[i].Start(semaphore); } for (int i = 0; i < threadCount; i++) { threads[i].Join(); } Console.WriteLine("AllThread finished!"); Console.ReadLine(); } static void ThreadMain(object o) { SemaphoreSlim semaphore = o as SemaphoreSlim; Trace.Assert(semaphore != null, "o must be a Semphore type."); bool isCompleted = false; while (!isCompleted) { if (semaphore.Wait(100)) { try { Console.WriteLine("thread {0} locks the semaphore", Thread.CurrentThread.ManagedThreadId); Thread.Sleep(3000); } finally { semaphore.Release(); Console.WriteLine("Thread {0} release the semaphore", Thread.CurrentThread.ManagedThreadId); isCompleted = true; } } else { Console.WriteLine("Timeout for thread {0}; wait again ", Thread.CurrentThread.ManagedThreadId); } } }
9.8 Event类
事件是另一个系统范围内的资源同步方法。为了从托管代码中使用系统事件,.net
framework提供了ManualResetEvent、AutoResetEvent、ManualResetEventSlim和CountdownEvent类。
private static void Main(string[] args) { const int taskCount = 10; var mEvents = new ManualResetEventSlim[taskCount]; var waitHandles = new WaitHandle[taskCount]; var calcs = new Calculator[taskCount]; TaskFactory taskFactory = new TaskFactory(); for (int i = 0; i < taskCount; i++) { mEvents[i] = new ManualResetEventSlim(false); waitHandles[i] = mEvents[i].WaitHandle; calcs[i] = new Calculator(mEvents[i]); taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3)); } for (int i = 0; i < taskCount; i++) { int index = WaitHandle.WaitAny(waitHandles); if (index == WaitHandle.WaitTimeout) { Console.WriteLine("Timeout!!"); } else { mEvents[index].Reset(); Console.WriteLine("finished task for {0}, result: {1}", index, calcs[index].Result); Thread.Sleep(100); } } Console.ReadLine(); } public class Calculator { private ManualResetEventSlim mEvent; public int Result { get; private set; } public Calculator(ManualResetEventSlim ev) { this.mEvent = ev; } public void Calculation(Object obj) { Tuple<int, int> data = (Tuple<int, int>)obj; Console.WriteLine("Task {0} starts calculation", Task.CurrentId); Thread.Sleep((3000)); Result = data.Item1 + data.Item2; Console.WriteLine("Task {0} is ready", Task.CurrentId); mEvent.Set(); } }
把任务分支到多个任务中,并在以后合并结果,使用新的CountdownEvent类很有用。
不需要位每个任务创建一个单独的事件对象,而只需要创建一个事件对象。
var mEvents = new ManualResetEventSlim[taskCount];
// var
cEvent = new CountdownEvent(taskCount);
var waitHandles = new WaitHandle[taskCount];
var
calcs = new Calculator[taskCount];
int index = WaitHandle.WaitAny(waitHandles);//wait
//tasks
mEvent.Set();//all thread set;
//continue
private static void Main(string[] args) { const int taskCount = 10; var cEvent = new CountdownEvent(taskCount); var calcs = new Calculator[taskCount]; TaskFactory taskFactory = new TaskFactory(); for (int i = 0; i < taskCount; i++) { calcs[i] = new Calculator(cEvent); taskFactory.StartNew(calcs[i].Calculation, Tuple.Create(i + 1, i + 3)); } cEvent.Wait(); Console.WriteLine("All finished."); Console.ReadLine(); } public class Calculator { private CountdownEvent cEvent; public int Result { get; private set; } public Calculator(CountdownEvent ev) { this.cEvent = ev; } public void Calculation(Object obj) { Tuple<int, int> data = (Tuple<int, int>)obj; Console.WriteLine("Task {0} starts calculation", Task.CurrentId); Thread.Sleep((3000)); Result = data.Item1 + data.Item2; Console.WriteLine("Task {0} is ready", Task.CurrentId); cEvent.Signal(); } }
9.9 Barrier 类
适合于工作有多个任务分支且以后又需要合并工作的情况。
var barrier = new Barrier(numberTasks +
1);
barrier.SignalAndWait();//wait
//tasks
barrier.RemoveParticipant();//2
left
barrier.RemoveParticipant();//1 left
//continue.
9.10
ReadWriterLockSlim类
允许多个读取器。同时只有一个写入器工作,此时读取器不能工作。
private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 }; static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); static void ReadMethod(object reader) { try { rwl.EnterReadLock(); for (int i = 0; i < items.Count; i++) { Console.WriteLine("read {0}, loop: {1}, item:{2}", reader, i, items[i]); Thread.Sleep(40); } } finally { rwl.ExitReadLock(); } } static void WriterMethod(object writer) { try { while (!rwl.TryEnterWriteLock(50)) { Console.WriteLine("Writer {0} waiting ,current reader count: {1}", writer, rwl.CurrentReadCount); } Console.WriteLine("Writer{0} acquired the lock.", writer); for (int i = 0; i < items.Count; i++) { items[i]++; Thread.Sleep(50); } Console.WriteLine("Writer {0} finished.", writer); } finally { rwl.ExitWriteLock(); } } private static void Main(string[] args) { var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var tasks = new Task[6]; tasks[0] = taskFactory.StartNew(WriterMethod, 1); tasks[1] = taskFactory.StartNew(ReadMethod, 1); tasks[2] = taskFactory.StartNew(ReadMethod, 2); tasks[3] = taskFactory.StartNew(WriterMethod, 2); tasks[4] = taskFactory.StartNew(ReadMethod, 3); tasks[5] = taskFactory.StartNew(ReadMethod, 4); foreach (Task task in tasks) { task.Wait(); } Console.WriteLine("All finished."); Console.ReadLine(); }
10 Timer类