我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。
如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。
我如何实现一个并行。ForEach循环,它在lambda?这可能吗?
平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。
下面的设置是使用IAsyncEnumerable,但可以通过更改类型和删除foreach上的“await”来修改为使用IEnumerable。它更适合于大型数据集,而不是创建无数并行任务,然后等待它们全部完成。
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}
我的ParallelForEach异步的轻量级实现。
特点:
节流(最大并行度)。
异常处理(聚合异常将在完成时抛出)。
内存效率高(不需要存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.TrySetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
使用的例子:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);