我需要修改一个现有的程序,它包含以下代码:
var inputs = events.Select(async ev => await ProcessEventAsync(ev))
.Select(t => t.Result)
.Where(i => i != null)
.ToList();
但这对我来说很奇怪,首先使用async和await select。根据Stephen Cleary的回答,我应该可以放弃这些。
然后是第二个Select,它选择结果。这是否意味着任务根本不是异步的,而是同步执行的(如此多的努力是徒劳的),或者任务将异步执行,当它完成时,执行查询的其余部分?
根据Stephen Cleary的另一个回答,我应该像下面这样写上面的代码吗?
var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();
这是完全一样的吗?
var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
.Where(result => result != null).ToList();
当我在这个项目上工作时,我想改变第一个代码示例,但我不太热衷于改变(显然工作)异步代码。也许我只是担心什么,所有3个代码样本做完全相同的事情?
processevensasync看起来像这样:
async Task<InputResult> ProcessEventAsync(InputEvent ev) {...}
var inputs = events.Select(async ev => await ProcessEventAsync(ev))
.Select(t => t.Result)
.Where(i => i != null)
.ToList();
但这对我来说很奇怪,首先在选择中使用async和await。根据Stephen Cleary的回答,我应该可以放弃这些。
调用Select是有效的。这两行基本上是相同的:
events.Select(async ev => await ProcessEventAsync(ev))
events.Select(ev => ProcessEventAsync(ev))
(关于如何从proces76async抛出同步异常有一个微小的区别,但在这段代码的上下文中,这一点都不重要。)
然后是第二个Select,它选择结果。这是否意味着任务根本不是异步的,而是同步执行的(如此多的努力是徒劳的),或者任务将异步执行,当它完成时,执行查询的其余部分?
这意味着查询正在阻塞。所以这并不是真正的异步。
分解一下:
var inputs = events.Select(async ev => await ProcessEventAsync(ev))
将首先为每个事件启动异步操作。然后这一行:
.Select(t => t.Result)
将等待这些操作一次完成一个(首先等待第一个事件的操作,然后是下一个,然后是下一个,等等)。
这是我不关心的部分,因为它会阻塞并将任何异常包装在AggregateException中。
这是完全一样的吗?
var tasks = await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev)));
var inputs = tasks.Where(result => result != null).ToList();
var inputs = (await Task.WhenAll(events.Select(ev => ProcessEventAsync(ev))))
.Where(result => result != null).ToList();
是的,这两个例子是等价的。它们都启动所有异步操作(events.Select(…)),然后异步地等待所有操作以任意顺序完成(await Task.WhenAll(…)),然后继续其余的工作(Where…)。
这两个示例都与原始代码不同。原始代码是阻塞的,将在AggregateException中包装异常。
我使用了下面的代码:
public static async Task<IEnumerable<TResult>> SelectAsync<TSource,TResult>(
this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method)
{
return await Task.WhenAll(source.Select(async s => await method(s)));
}
是这样的:
var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params));
编辑:
有些人提出了并发性的问题,比如当你访问一个数据库时,你不能同时运行两个任务。所以这里有一个更复杂的版本,也允许特定的并发级别:
public static async Task<IEnumerable<TResult>> SelectAsync<TSource, TResult>(
this IEnumerable<TSource> source, Func<TSource, Task<TResult>> method,
int concurrency = int.MaxValue)
{
var semaphore = new SemaphoreSlim(concurrency);
try
{
return await Task.WhenAll(source.Select(async s =>
{
try
{
await semaphore.WaitAsync();
return await method(s);
}
finally
{
semaphore.Release();
}
}));
} finally
{
semaphore.Dispose();
}
}
如果没有参数,它的行为与上面的简单版本完全相同。当参数为1时,它将依次执行所有任务:
var result = await sourceEnumerable.SelectAsync(async s=>await someFunction(s,other params),1);
注意:按顺序执行任务并不意味着执行将在错误时停止!
就像使用更大的并发值或不指定参数一样,所有任务都将执行,如果其中任何任务失败,生成的AggregateException将包含抛出的异常。
如果您希望一个接一个地执行任务,并且在第一个任务中失败,请尝试其他解决方案,例如xhafan (https://stackoverflow.com/a/64363463/379279)建议的解决方案。