在地铁应用程序中,我需要执行一些WCF调用。有大量的调用要执行,所以我需要在并行循环中执行它们。问题是并行循环在WCF调用全部完成之前就退出了。

您将如何重构它以使其按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

当前回答

随着Parallel的引入,异步操作的并行化问题得到了解决。在。net 6中使用ForEachAsync API,但是使用旧的。net平台的人可能仍然需要一个不错的替代品。实现一个简单的方法是使用来自TPL Dataflow库的ActionBlock<T>组件。这个库包含在标准的. net库中(。NET Core和.NET 5+),并且可以作为。NET框架的NuGet包使用。下面是它的用法:

public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

This solution is only suitable for materialized source sequences, hence the type of the parameter is ICollection<T> instead of the more common IEnumerable<T>. It also has the surprising behavior of ignoring any OperationCanceledExceptions thrown by the action. Addressing these nuances and attempting to replicate precisely the features and behavior of the Parallel.ForEachAsync is doable, but it requires almost as much code as if more primitive tools were used. I've posted such an attempt in the 9th revision of this answer.


Below is a different attempt to implement the Parallel.ForEachAsync method, offering exactly the same features as the .NET 6 API, and mimicking its behavior as much as possible. It uses only basic TPL tools. The idea is to create a number of worker tasks equal to the desirable MaxDegreeOfParallelism, with each task enumerating the same enumerator in a synchronized fashion. This is similar to how the Parallel.ForEachAsync is implemented internally. The difference is that the .NET 6 API starts with a single worker and progressively adds more, while the implementation below creates all the workers from the start:

public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;

    IEnumerator<T> enumerator = source.GetEnumerator();
    var cts = new CancellationTokenSource();
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (!cts.IsCancellationRequested)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

签名有区别。正文参数的类型是Func<TSource, CancellationToken, Task>,而不是Func<TSource, CancellationToken, ValueTask>。这是因为值-任务是一个相对较新的特性,在. net Framework中是不可用的。

在行为上也有不同。这个实现对主体抛出的OperationCanceledExceptions做出反应,完成为取消。正确的行为应该是将这些异常作为单个错误传播,并将其作为错误完整传播。修复这个小缺陷是可行的,但我不想让这个相对简短且可读的实现变得更加复杂。

其他回答

这是一种使用SemaphoreSlim的扩展方法,还允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

我有点晚了,但你可能想考虑使用GetAwaiter.GetResult()在同步上下文中运行你的异步代码,但如下所示;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

在介绍了一堆helper方法之后,你将能够使用以下简单的语法运行并行查询:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

这里发生的事情是:我们将源集合分成10个块(. split (DegreeOfParallelism)),然后运行10个任务,每个任务逐个处理它的项(. selectmanyasync(…)),并将它们合并回一个列表。

值得一提的是,有一个更简单的方法:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

但是它需要一个预防措施:如果您有一个太大的源集合,它将立即为每个项目安排一个Task,这可能会导致显著的性能损失。

上面例子中使用的扩展方法如下所示:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));

        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }

            yield return list;
        }
    }

    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }

    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }

    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}

这应该是非常有效的,并且比整个TPL数据流工作更容易:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});

...

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();

    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }

    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}

环绕平行线。Foreach到Task.Run(),而不是await关键字使用[yourasyncmethod]。结果

(你需要完成任务。运行事情不阻塞UI线程)

就像这样:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;