随着Parallel的引入,异步操作的并行化问题得到了解决。在。net 6中使用ForEachAsync API,但是使用旧的。net平台的人可能仍然需要一个不错的替代品。实现一个简单的方法是使用来自TPL Dataflow库的ActionBlock<T>组件。这个库包含在标准的. net库中(。NET Core和.NET 5+),并且可以作为。NET框架的NuGet包使用。下面是它的用法:
public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
int maxDegreeOfParallelism, Func<T, Task> action)
{
var options = new ExecutionDataflowBlockOptions();
options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
var block = new ActionBlock<T>(action, options);
foreach (var item in source) block.Post(item);
block.Complete();
return block.Completion;
}
This solution is only suitable for materialized source sequences, hence the type of the parameter is ICollection<T> instead of the more common IEnumerable<T>. It also has the surprising behavior of ignoring any OperationCanceledExceptions thrown by the action. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.
Below is a different attempt to implement the Parallel.ForEachAsync method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:
public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
ParallelOptions parallelOptions,
Func<T, CancellationToken, Task> body)
{
if (source == null) throw new ArgumentNullException("source");
if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
if (body == null) throw new ArgumentNullException("body");
int dop = parallelOptions.MaxDegreeOfParallelism;
if (dop < 0) dop = Environment.ProcessorCount;
CancellationToken cancellationToken = parallelOptions.CancellationToken;
TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;
IEnumerator<T> enumerator = source.GetEnumerator();
var cts = new CancellationTokenSource();
var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
var workerTasks = new Task[dop];
for (int i = 0; i < dop; i++)
{
workerTasks[i] = Task.Factory.StartNew(async () =>
{
try
{
while (!cts.IsCancellationRequested)
{
cancellationToken.ThrowIfCancellationRequested();
T item;
await semaphore.WaitAsync(); // Continue on captured context.
try
{
if (!enumerator.MoveNext()) break;
item = enumerator.Current;
}
finally { semaphore.Release(); }
await body(item, cts.Token); // Continue on captured context.
}
}
catch { cts.Cancel(); throw; }
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
.Unwrap();
}
return Task.WhenAll(workerTasks).ContinueWith(t =>
{
// Clean up
try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
return t;
}, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
签名有区别。正文参数的类型是Func<TSource, CancellationToken, Task>,而不是Func<TSource, CancellationToken, ValueTask>。这是因为值-任务是一个相对较新的特性,在. net Framework中是不可用的。
在行为上也有不同。这个实现对主体抛出的OperationCanceledExceptions做出反应,完成为取消。正确的行为应该是将这些异常作为单个错误传播,并将其作为错误完整传播。修复这个小缺陷是可行的,但我不想让这个相对简短且可读的实现变得更加复杂。