我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。
如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。
我如何实现一个并行。ForEach循环,它在lambda?这可能吗?
平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。
对于一个更简单的解决方案(不确定是否是最优的),您可以简单地嵌套Parallel。ForEach在Task中-就像这样
var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
Parallel.ForEach(myCollection, options, item =>
{
DoWork(item);
}
}
ParallelOptions将为您进行开箱即用的节流。
我在一个真实的场景中使用它在后台运行一个很长的操作。这些操作是通过HTTP调用的,它的设计目的是在运行长操作时不阻塞HTTP调用。
调用HTTP进行长时间后台操作。
操作从后台开始。
用户获得状态ID,可用于使用另一个HTTP调用检查状态。
后台操作更新其状态。
这样,CI/CD调用就不会因为长时间的HTTP操作而超时,而是每隔x秒循环一次状态,而不会阻塞进程
下面的设置是使用IAsyncEnumerable,但可以通过更改类型和删除foreach上的“await”来修改为使用IEnumerable。它更适合于大型数据集,而不是创建无数并行任务,然后等待它们全部完成。
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}