加载中...

第四天 同步机制(上)


在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory

.ContinueWhenAll()来实现任务串行化,但是这些简单的方法远远不能满足我们实际的开发需要,从.net 4.0开始,类库给我们提供了很多

的类来帮助我们简化并行计算中复杂的数据同步问题。

 

大体上分为二种:

①   并发集合类:           这个在先前的文章中也用到了,他们的出现不再让我们过多的关注同步细节。

②  轻量级同步机制:      相对于老版本中那些所谓的重量级同步机制而言,新的机制更加节省cpu的额外开销。

 

关于并发集合类没什么好讲的,如果大家熟悉非线程安全的集合,那么这些并发的集合对你来说小菜一碟,这一篇和下一篇我们仔细来玩玩这

些轻量级的同步机制。

一:Barrier(屏障同步)

1:基本概念

    msdn对它的解释是:使多个任务能够采用并行方式依据某种算法在多个阶段协同工作。乍一看有点不懂,没关系,我们采取提干法。

”多个任务“,”多个阶段”,“协同”,仔细想想知道了,下一阶段的执行必须等待上一个阶段中多task全部执行完,那么我们实际中有这样

的需求吗?当然有的,比如我们数据库中有100w条数据需要导入excel,为了在数据库中加速load,我们需要开多个任务去跑,比如这

里的4个task,要想load产品表,必须等4个task都跑完用户表才行,那么你有什么办法可以让task为了你两肋插刀呢?它就是Barrier。

 

好,我们知道barrier叫做屏障,就像下图中的“红色线”,如果我们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待

后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操作为new Barrier(4,(i)=>{})。

 

啰嗦了半天,还是上下代码说话:

2:死锁问题

    先前的例子我们也知道,屏障必须等待4个task通过SignalAndWait()来告知自己已经到达,当4个task全部达到后,我们可以通过

barrier.ParticipantsRemaining来获取task到达状态,那么如果有一个task久久不能到达那会是怎样的情景呢?好,我举个例子。

  1. using System.Collections.Concurrent;
  2. using System.Threading.Tasks;
  3. using System;
  4. using System.Diagnostics;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8.  
  9. class Program
  10. {
  11. //四个task执行
  12. static Task[] tasks = new Task[4];
  13.  
  14. static Barrier barrier = null;
  15.  
  16. static void Main(string[] args)
  17. {
  18. barrier = new Barrier(tasks.Length, (i) =>
  19. {
  20. Console.WriteLine("**********************************************************");
  21. Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
  22. Console.WriteLine("**********************************************************");
  23. });
  24.  
  25. for (int j = 0; j < tasks.Length; j++)
  26. {
  27. tasks[j] = Task.Factory.StartNew((obj) =>
  28. {
  29. var single = Convert.ToInt32(obj);
  30.  
  31. LoadUser(single);
  32. barrier.SignalAndWait();
  33.  
  34. LoadProduct(single);
  35. barrier.SignalAndWait();
  36.  
  37. LoadOrder(single);
  38. barrier.SignalAndWait();
  39.  
  40. }, j);
  41. }
  42.  
  43. Task.WaitAll(tasks);
  44.  
  45. barrier.Dispose();
  46.  
  47. Console.WriteLine("指定数据库中所有数据已经加载完毕!");
  48.  
  49. Console.Read();
  50. }
  51.  
  52. static void LoadUser(int num)
  53. {
  54. Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);
  55.  
  56. if (num == 0)
  57. {
  58. //num=0:表示0号任务
  59. //barrier.ParticipantsRemaining == 0:表示所有task到达屏障才会退出
  60. // SpinWait.SpinUntil: 自旋锁,相当于死循环
  61. SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0);
  62. }
  63. }
  64.  
  65. static void LoadProduct(int num)
  66. {
  67. Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
  68. }
  69.  
  70. static void LoadOrder(int num)
  71. {
  72. Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
  73. }
  74. }

我们发现程序在加载User表的时候卡住了,出现了类似死循环,这句SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0)中

的ParticipantsRemaining==0 永远也不能成立,导致task0永远都不能退出,然而barrier还在一直等待task0调用SignalAndWait来结束屏障。

结果就是造成了相互等待的尴尬局面,我们下个断点看看情况。

3:超时机制

    当我们coding的时候遇到了这种问题还是很纠结的,所以我们必须引入一种“超时机制”,如果在指定的时候内所有的参与者(task)都

没有到达屏障的话,我们就需要取消这些参与者的后续执行,幸好SignalAndWait给我们提供了超时的重载,为了能够取消后续执行,我们

还要采用CancellationToken机制。

  1. using System.Collections.Concurrent;
  2. using System.Threading.Tasks;
  3. using System;
  4. using System.Diagnostics;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8.  
  9. class Program
  10. {
  11. //四个task执行
  12. static Task[] tasks = new Task[4];
  13.  
  14. static Barrier barrier = null;
  15.  
  16. static void Main(string[] args)
  17. {
  18. CancellationTokenSource cts = new CancellationTokenSource();
  19.  
  20. CancellationToken ct = cts.Token;
  21.  
  22. barrier = new Barrier(tasks.Length, (i) =>
  23. {
  24. Console.WriteLine("**********************************************************");
  25. Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
  26. Console.WriteLine("**********************************************************");
  27. });
  28.  
  29. for (int j = 0; j < tasks.Length; j++)
  30. {
  31. tasks[j] = Task.Factory.StartNew((obj) =>
  32. {
  33. var single = Convert.ToInt32(obj);
  34.  
  35. LoadUser(single);
  36.  
  37. if (!barrier.SignalAndWait(2000))
  38. {
  39. //抛出异常,取消后面加载的执行
  40. throw new OperationCanceledException(string.Format("我是当前任务{0},我抛出异常了!", single), ct);
  41. }
  42.  
  43. LoadProduct(single);
  44. barrier.SignalAndWait();
  45.  
  46. LoadOrder(single);
  47. barrier.SignalAndWait();
  48.  
  49. }, j, ct);
  50. }
  51.  
  52. //等待所有tasks 4s
  53. Task.WaitAll(tasks, 4000);
  54.  
  55. try
  56. {
  57. for (int i = 0; i < tasks.Length; i++)
  58. {
  59. if (tasks[i].Status == TaskStatus.Faulted)
  60. {
  61. //获取task中的异常
  62. foreach (var single in tasks[i].Exception.InnerExceptions)
  63. {
  64. Console.WriteLine(single.Message);
  65. }
  66. }
  67. }
  68.  
  69. barrier.Dispose();
  70. }
  71. catch (AggregateException e)
  72. {
  73. Console.WriteLine("我是总异常:{0}", e.Message);
  74. }
  75.  
  76. Console.Read();
  77. }
  78.  
  79. static void LoadUser(int num)
  80. {
  81. Console.WriteLine("\n当前任务:{0}正在加载User部分数据!", num);
  82.  
  83. if (num == 0)
  84. {
  85. //自旋转5s
  86. if (!SpinWait.SpinUntil(() => barrier.ParticipantsRemaining == 0, 5000))
  87. return;
  88. }
  89.  
  90. Console.WriteLine("当前任务:{0}正在加载User数据完毕!", num);
  91. }
  92.  
  93. static void LoadProduct(int num)
  94. {
  95. Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
  96. }
  97.  
  98. static void LoadOrder(int num)
  99. {
  100. Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
  101. }
  102. }

二:spinLock(自旋锁)

    我们初识多线程或者多任务时,第一个想到的同步方法就是使用lock或者Monitor,然而在4.0 之后给我们提供了另一把武器spinLock,

如果你的任务持有锁的时间非常短,具体短到什么时候msdn也没有给我们具体的答案,但是有一点值得确定的时,如果持有锁的时候比较

短,那么它比那些重量级别的Monitor具有更小的性能开销,它的用法跟Monitor很相似,下面举个例子,Add2方法采用自旋锁。


还没有评论.