我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。

如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。

我如何实现一个并行。ForEach循环,它在lambda?这可能吗?

平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。


当前回答

你可以使用AsyncEnumerator NuGet包中的ParallelForEachAsync扩展方法:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

免责声明:我是AsyncEnumerator库的作者,该库是开源的,在MIT的授权下,我发布这条消息只是为了帮助社区。

其他回答

在接受的应答中,ConcurrentBag不是必需的。 下面是一个没有它的实现:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

任何"// some pre stuff"和"// some post stuff"都可以进入GetData实现(或另一个调用GetData的方法)

除了更短之外,没有使用“async void”lambda,这是一种反模式。

如果你只想要简单的并行,你可以这样做:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

如果你需要更复杂的东西,看看Stephen Toub的ForEachAsync帖子。

你可以使用AsyncEnumerator NuGet包中的ParallelForEachAsync扩展方法:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

免责声明:我是AsyncEnumerator库的作者,该库是开源的,在MIT的授权下,我发布这条消息只是为了帮助社区。

我为此创建了一个扩展方法,它使用SemaphoreSlim,还允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

下面的设置是使用IAsyncEnumerable,但可以通过更改类型和删除foreach上的“await”来修改为使用IEnumerable。它更适合于大型数据集,而不是创建无数并行任务,然后等待它们全部完成。

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }