在地铁应用程序中,我需要执行一些WCF调用。有大量的调用要执行,所以我需要在并行循环中执行它们。问题是并行循环在WCF调用全部完成之前就退出了。

您将如何重构它以使其按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

当前回答

你可以使用新的AsyncEnumerator NuGet包来节省精力,4年前这个问题最初发布时,这个包还不存在。它允许你控制并行度:

using System.Collections.Async;
...

await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

免责声明:我是AsyncEnumerator库的作者,该库是开源的,在MIT的授权下,我发布这条消息只是为了帮助社区。

其他回答

Parallel.ForEach()背后的整个思想是,您有一组线程,每个线程处理集合的一部分。正如您所注意到的,这在async-await中不起作用,在async调用期间,您希望释放线程。

你可以通过阻塞ForEach()线程来“修复”这个问题,但这就违背了async-await的全部意义。

您可以使用TPL Dataflow而不是Parallel.ForEach(),后者很好地支持异步任务。

具体来说,您的代码可以使用TransformBlock编写,它使用async lambda将每个id转换为Customer。此块可以配置为并行执行。您可以将该块链接到一个ActionBlock,该ActionBlock将每个Customer写入控制台。 在你建立了块网络之后,你可以Post()每个id到TransformBlock。

在代码:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

尽管您可能希望将TransformBlock的并行度限制为某个小常数。此外,您还可以限制TransformBlock的容量,并使用SendAsync()向其异步添加项目,例如,如果集合太大。

与您的代码相比(如果它工作的话),一个额外的好处是,只要一个项目完成,编写就会开始,而不是等到所有的处理都完成。

这是一种使用SemaphoreSlim的扩展方法,还允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

随着Parallel的引入,异步操作的并行化问题得到了解决。在。net 6中使用ForEachAsync API,但是使用旧的。net平台的人可能仍然需要一个不错的替代品。实现一个简单的方法是使用来自TPL Dataflow库的ActionBlock<T>组件。这个库包含在标准的. net库中(。NET Core和.NET 5+),并且可以作为。NET框架的NuGet包使用。下面是它的用法:

public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

This solution is only suitable for materialized source sequences, hence the type of the parameter is ICollection<T> instead of the more common IEnumerable<T>. It also has the surprising behavior of ignoring any OperationCanceledExceptions thrown by the action. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.


Below is a different attempt to implement the Parallel.ForEachAsync method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:

public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;

    IEnumerator<T> enumerator = source.GetEnumerator();
    var cts = new CancellationTokenSource();
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (!cts.IsCancellationRequested)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

签名有区别。正文参数的类型是Func<TSource, CancellationToken, Task>,而不是Func<TSource, CancellationToken, ValueTask>。这是因为值-任务是一个相对较新的特性,在. net Framework中是不可用的。

在行为上也有不同。这个实现对主体抛出的OperationCanceledExceptions做出反应,完成为取消。正确的行为应该是将这些异常作为单个错误传播,并将其作为错误完整传播。修复这个小缺陷是可行的,但我不想让这个相对简短且可读的实现变得更加复杂。

我有点晚了,但你可能想考虑使用GetAwaiter.GetResult()在同步上下文中运行你的异步代码,但如下所示;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

无需TPL的简单本地方式:

int totalThreads = 0; int maxThreads = 3;

foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);

    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

你可以在下一个任务中检查这个解决方案:

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}