我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。

如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。

我如何实现一个并行。ForEach循环,它在lambda?这可能吗?

平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。


当前回答

在接受的应答中,ConcurrentBag不是必需的。 下面是一个没有它的实现:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

任何"// some pre stuff"和"// some post stuff"都可以进入GetData实现(或另一个调用GetData的方法)

除了更短之外,没有使用“async void”lambda,这是一种反模式。

其他回答

如果你只想要简单的并行,你可以这样做:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

如果你需要更复杂的东西,看看Stephen Toub的ForEachAsync帖子。

对于一个更简单的解决方案(不确定是否是最优的),您可以简单地嵌套Parallel。ForEach在Task中-就像这样

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

ParallelOptions将为您进行开箱即用的节流。

我在一个真实的场景中使用它在后台运行一个很长的操作。这些操作是通过HTTP调用的,它的设计目的是在运行长操作时不阻塞HTTP调用。

调用HTTP进行长时间后台操作。 操作从后台开始。 用户获得状态ID,可用于使用另一个HTTP调用检查状态。 后台操作更新其状态。

这样,CI/CD调用就不会因为长时间的HTTP操作而超时,而是每隔x秒循环一次状态,而不会阻塞进程

我的ParallelForEach异步的轻量级实现。

特点:

节流(最大并行度)。 异常处理(聚合异常将在完成时抛出)。 内存效率高(不需要存储任务列表)。

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

使用的例子:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

从其他答案和接受的asnwer引用的文章编译的最简单的可能扩展方法:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

更新:这是一个简单的修改,也支持取消令牌,就像在评论中要求的(未经测试)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

一个新的。net 6 api是Parallel。ForEachAsync,一种调度异步工作的方法,允许你控制并行度:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

另一个例子是Scott Hanselman的博客。

来源,供参考。