在地铁应用程序中,我需要执行一些WCF调用。有大量的调用要执行,所以我需要在并行循环中执行它们。问题是并行循环在WCF调用全部完成之前就退出了。

您将如何重构它以使其按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

当前回答

这应该是非常有效的,并且比整个TPL数据流工作更容易:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}

其他回答

像svick建议的那样使用DataFlow可能有些过度,而且Stephen的回答并没有提供控制操作并发性的方法。然而,这可以很简单地实现:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

ToArray()调用可以通过使用数组而不是列表来优化,并替换已完成的任务,但我怀疑它在大多数情况下不会有太大区别。OP问题的使用示例:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

EDIT Fellow SO用户和TPL wiz Eli Arbel向我指出了Stephen Toub的一篇相关文章。像往常一样,他的实现既优雅又高效:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}

斯维克的回答(一如既往)极好。

但是,我发现当您需要传输大量数据时,Dataflow会更有用。或者当您需要异步兼容队列时。

在你的情况下,一个更简单的解决方案是使用异步风格的并行:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);

foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}

Console.ReadKey();

无需TPL的简单本地方式:

int totalThreads = 0; int maxThreads = 3;

foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);

    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

你可以在下一个任务中检查这个解决方案:

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}

这是一种使用SemaphoreSlim的扩展方法,还允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

随着Parallel的引入,异步操作的并行化问题得到了解决。在。net 6中使用ForEachAsync API,但是使用旧的。net平台的人可能仍然需要一个不错的替代品。实现一个简单的方法是使用来自TPL Dataflow库的ActionBlock<T>组件。这个库包含在标准的. net库中(。NET Core和.NET 5+),并且可以作为。NET框架的NuGet包使用。下面是它的用法:

public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

This solution is only suitable for materialized source sequences, hence the type of the parameter is ICollection<T> instead of the more common IEnumerable<T>. It also has the surprising behavior of ignoring any OperationCanceledExceptions thrown by the action. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.


Below is a different attempt to implement the Parallel.ForEachAsync method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:

public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;

    IEnumerator<T> enumerator = source.GetEnumerator();
    var cts = new CancellationTokenSource();
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (!cts.IsCancellationRequested)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

签名有区别。正文参数的类型是Func<TSource, CancellationToken, Task>,而不是Func<TSource, CancellationToken, ValueTask>。这是因为值-任务是一个相对较新的特性,在. net Framework中是不可用的。

在行为上也有不同。这个实现对主体抛出的OperationCanceledExceptions做出反应,完成为取消。正确的行为应该是将这些异常作为单个错误传播,并将其作为错误完整传播。修复这个小缺陷是可行的,但我不想让这个相对简短且可读的实现变得更加复杂。