我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。
如果我想在并行循环的lambda中调用c#中标记为async的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
当计数为0时就会出现问题,因为创建的所有线程实际上都只是后台线程和并行线程。ForEach调用不等待完成。如果我删除async关键字,方法看起来像这样:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它的工作,但它完全禁用等待聪明,我必须做一些手动异常处理..(为简洁起见,删除)。
我如何实现一个并行。ForEach循环,它在lambda?这可能吗?
平行飞船的原型。ForEach方法以Action<T>作为参数,但我希望它等待我的异步lambda。
对于一个更简单的解决方案(不确定是否是最优的),您可以简单地嵌套Parallel。ForEach在Task中-就像这样
var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
Parallel.ForEach(myCollection, options, item =>
{
DoWork(item);
}
}
ParallelOptions将为您进行开箱即用的节流。
我在一个真实的场景中使用它在后台运行一个很长的操作。这些操作是通过HTTP调用的,它的设计目的是在运行长操作时不阻塞HTTP调用。
调用HTTP进行长时间后台操作。
操作从后台开始。
用户获得状态ID,可用于使用另一个HTTP调用检查状态。
后台操作更新其状态。
这样,CI/CD调用就不会因为长时间的HTTP操作而超时,而是每隔x秒循环一次状态,而不会阻塞进程
我的ParallelForEach异步的轻量级实现。
特点:
节流(最大并行度)。
异常处理(聚合异常将在完成时抛出)。
内存效率高(不需要存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.TrySetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
使用的例子:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
从其他答案和接受的asnwer引用的文章编译的最简单的可能扩展方法:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
更新:这是一个简单的修改,也支持取消令牌,就像在评论中要求的(未经测试)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
一个新的。net 6 api是Parallel。ForEachAsync,一种调度异步工作的方法,允许你控制并行度:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
另一个例子是Scott Hanselman的博客。
来源,供参考。