我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。
如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。
我如何实现一个并行。ForEach循环,它在lambda?这可能吗?
平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。
一个新的。net 6 api是Parallel。ForEachAsync,一种调度异步工作的方法,允许你控制并行度:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
另一个例子是Scott Hanselman的博客。
来源,供参考。
下面的设置是使用IAsyncEnumerable,但可以通过更改类型和删除foreach上的“await”来修改为使用IEnumerable。它更适合于大型数据集,而不是创建无数并行任务,然后等待它们全部完成。
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}
一个新的。net 6 api是Parallel。ForEachAsync,一种调度异步工作的方法,允许你控制并行度:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
另一个例子是Scott Hanselman的博客。
来源,供参考。