在forEach循环中使用async/await有什么问题吗?我正在尝试循环浏览一系列文件,并等待每个文件的内容。

import fs from 'fs-promise'

async function printFiles () {
  const files = await getFilePaths() // Assume this works fine

  files.forEach(async (file) => {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
  })
}

printFiles()

这段代码确实有效,但这段代码会出错吗?我有人告诉我,你不应该在这样的高阶函数中使用async/await,所以我只想问问这是否有问题。


当前回答

@贝吉已经给出了如何正确处理这一特殊案件的答案。我不会在这里重复。

我想解决在异步和等待时使用forEach和for循环之间的区别

forEach的工作原理

让我们看看forEach是如何工作的。根据ECMAScript规范,MDN提供了一种可以用作polyfill的实现。我将其复制并粘贴到此处,并删除注释。

Array.prototype.forEach = function (callback, thisArg) {
  if (this == null) { throw new TypeError('Array.prototype.forEach called on null or undefined'); }
  var T, k;
  var O = Object(this);
  var len = O.length >>> 0;
  if (typeof callback !== "function") { throw new TypeError(callback + ' is not a function'); }
  if (arguments.length > 1) { T = thisArg; }
  k = 0;
  while (k < len) {
    var kValue;
    if (k in O) {
      kValue = O[k];
      callback.call(T, kValue, k, O); // pay attention to this line
    }
    k++;
  }
};

让我们回到代码,将回调作为函数提取。

async function callback(file){
  const contents = await fs.readFile(file, 'utf8')
  console.log(contents)
}

所以,回调基本上返回一个promise,因为它是用异步声明的。在forEach内部,回调只是以正常方式调用,如果回调本身返回一个promise,javascript引擎不会等待它被解析或拒绝。相反,它将承诺放入作业队列中,并继续执行循环。

如何在回调中等待fs.readFile(文件,'utf8')?

基本上,当异步回调有机会被执行时,js引擎将暂停,直到fs.readFile(文件,'utf8')被解析或拒绝,并在完成后继续执行异步函数。因此contents变量存储fs.readFile的实际结果,而不是promise。因此,console.log(contents)注销文件内容,而不是Promise

为什么。。。作品?

当我们编写循环的泛型for时,我们获得了比forEach更多的控制权。让我们重构printFiles。

async function printFiles () {
  const files = await getFilePaths() // Assume this works fine

  for (const file of files) {
    const contents = await fs.readFile(file, 'utf8')
    console.log(contents)
    // or await callback(file)
  }
}

当为循环求值时,我们在异步函数中有await promise,执行将暂停,直到await promice得到解决。因此,您可以认为文件是按确定的顺序逐个读取的。

按顺序执行

有时,我们确实需要以顺序执行异步函数。例如,我有几个新记录存储在一个数组中,要保存到数据库中,我希望它们按顺序保存,这意味着数组中的第一个记录应该先保存,然后再保存,直到保存最后一个记录。

下面是一个示例:

常量记录=[1,2,3,4];异步函数saveRecord(record){return new Promise((已解决,已拒绝)=>{setTimeout(()=>{已解析(`record${record}已保存`)},数学随机(*500)});}EachSaveRecords(记录)的异步函数{records.forEach(异步(记录)=>{const res=等待saveRecord(记录);console.log(res);})}SaveRecords(记录)的异步函数{for(记录的常量记录){const res=等待saveRecord(记录);console.log(res);}}(异步()=>{console.log(“===保存记录的===”)等待保存记录(记录)console.log(“==对于每个保存记录==”)等待EachSaveRecords(记录)})()

我使用setTimeout来模拟将记录保存到数据库的过程——这是异步的,花费了随机时间。使用forEach,记录将按未确定的顺序保存,但使用for。。的,它们按顺序保存。

其他回答

除了@Bergi的回答,我想提供第三种选择。这与@Bergi的第二个例子非常相似,但不是单独等待每个readFile,而是创建一个承诺数组,每个承诺都在最后等待。

import fs from 'fs-promise';
async function printFiles () {
  const files = await getFilePaths();

  const promises = files.map((file) => fs.readFile(file, 'utf8'))

  const contents = await Promise.all(promises)

  contents.forEach(console.log);
}

注意,传递给.map()的函数不需要是异步的,因为fs.readFile无论如何都会返回Promise对象。因此,Promise是Promise对象的数组,可以将其发送到Promise.all()。

在@Bergi的回答中,控制台可以按照文件内容的读取顺序记录文件内容。例如,如果一个非常小的文件在一个非常大的文件之前完成了读取,那么它将首先被记录,即使小文件在文件数组中位于大文件之后。然而,在我上面的方法中,您可以保证控制台将以与提供的阵列相同的顺序记录文件。

OP的原始问题

在forEach循环中使用async/await有什么问题吗。。。

在@Bergi选择的答案中,它展示了如何串行和并行处理。然而,并行性还存在其他问题-

订单--@chharvey注意到-

例如,如果一个非常小的文件在一个非常大的文件之前完成了读取,那么它将首先被记录,即使小文件在文件数组中位于大文件之后。

可能一次打开太多文件--Bergi在另一个答案下的评论

同时打开数千个文件以同时读取它们也是不好的。人们总是要评估顺序、并行或混合方法是否更好。

因此,让我们来解决这些问题,展示实际的代码,简洁明了,不使用第三方库。易于剪切、粘贴和修改的东西。

并行读取(一次读取),串行打印(每个文件尽可能早)。

最简单的改进是像@Bergi的回答那样执行完全并行,但做了一个小改动,以便在保持顺序的同时尽快打印每个文件。

async function printFiles2() {
  const readProms = (await getFilePaths()).map((file) =>
    fs.readFile(file, "utf8")
  );
  await Promise.all([
    await Promise.all(readProms),                      // branch 1
    (async () => {                                     // branch 2
      for (const p of readProms) console.log(await p);
    })(),
  ]);
}

上面,两个单独的分支同时运行。

分支1:同时并行读取,分支2:连续读取以强制排序,但等待时间不超过必要

这很容易。

在并发限制下并行读取,串行打印(每个文件尽可能早)。

“并发限制”意味着同时读取的文件不超过N个。就像一家一次只允许这么多顾客进入的商店(至少在新冠疫情期间)。

首先引入了一个helper函数-

function bootablePromise(kickMe: () => Promise<any>) {
  let resolve: (value: unknown) => void = () => {};
  const promise = new Promise((res) => { resolve = res; });
  const boot = () => { resolve(kickMe()); };
  return { promise, boot };
}

函数bootablePromise(kickMe:()=>Promise<any>)需要函数kickMe作为启动任务的参数(在本例中为readFile),但不会立即启动。

bootablePromise返回几个财产

承诺类型承诺引导类型函数()=>void

承诺有两个阶段

承诺开始一项任务作为一个承诺,完成一项已经开始的任务。

当调用boot()时,promise从第一状态转换到第二状态。

bootablePromise用于printFiles--

async function printFiles4() {
  const files = await getFilePaths();
  const boots: (() => void)[] = [];
  const set: Set<Promise<{ pidx: number }>> = new Set<Promise<any>>();
  const bootableProms = files.map((file,pidx) => {
    const { promise, boot } = bootablePromise(() => fs.readFile(file, "utf8"));
    boots.push(boot);
    set.add(promise.then(() => ({ pidx })));
    return promise;
  });
  const concurLimit = 2;
  await Promise.all([
    (async () => {                                       // branch 1
      let idx = 0;
      boots.slice(0, concurLimit).forEach((b) => { b(); idx++; });
      while (idx<boots.length) {
        const { pidx } = await Promise.race([...set]);
        set.delete([...set][pidx]);
        boots[idx++]();
      }
    })(),
    (async () => {                                       // branch 2
      for (const p of bootableProms) console.log(await p);
    })(),
  ]);
}

和以前一样,有两个分支

分支1:用于运行和处理并发。分支2:用于打印

现在的区别是不允许并发运行超过concurrentLimit Promise。

重要的变量是

boots:要调用以强制其相应Promise转换的函数数组。它仅在分支1中使用。set:在随机访问容器中有Promise,这样一旦实现,就可以很容易地删除它们。此容器仅在分支1中使用。bootableProms:这些是与最初在集合中的Promise相同的Promise,但它是一个数组而不是集合,并且该数组从未更改。它仅在分支2中使用。

使用模拟fs.readFile运行,所需时间如下(文件名与时间(毫秒))。

const timeTable = {
  "1": 600,
  "2": 500,
  "3": 400,
  "4": 300,
  "5": 200,
  "6": 100,
};

可以看到这样的测试运行时间,显示并发正在运行--

[1]0--0.601
[2]0--0.502
[3]0.503--0.904
[4]0.608--0.908
[5]0.905--1.105
[6]0.905--1.005

可在typescript游乐场沙盒中执行

对于TypeScript用户,使用工作类型的Promise.all(array.map(迭代器))包装器

使用Promise.all(array.map(迭代器))具有正确的类型,因为TypeScript的stdlib支持已经处理了泛型。然而,每次需要异步映射时复制粘贴Promise.all(array.map(迭代器))显然不是最佳的,Promise.all(array.ma(迭代))并不能很好地传达代码的意图,因此大多数开发人员都会将其包装成一个asyncMap()包装函数。然而,要做到这一点,需要使用泛型来确保使用const value=await asyncMap()设置的值具有正确的类型。

export const asyncMap = async <ArrayItemType, IteratorReturnType>(
  array: Array<ArrayItemType>,
  iterator: (
    value: ArrayItemType,
    index?: number
  ) => Promise<IteratorReturnType>
): Promise<Array<IteratorReturnType>> => {
  return Promise.all(array.map(iterator));
};

快速测试:

it(`runs 3 items in parallel and returns results`, async () => {
  const result = await asyncMap([1, 2, 3], async (item: number) => {
    await sleep(item * 100);
    return `Finished ${item}`;
  });
  expect(result.length).toEqual(3);
  // Each item takes 100, 200 and 300ms
  // So restricting this test to 300ms plus some leeway
}, 320);

sleep()只是:

const sleep = async (timeInMs: number): Promise<void> => {
  return new Promise((resolve) => setTimeout(resolve, timeInMs));
};

若要查看如何出错,请在方法末尾打印console.log。

一般情况下可能出错的事情:

任意顺序。printFiles可以在打印文件之前完成运行。性能差。

这些并不总是错误的,但通常在标准用例中。

通常,使用forEach将导致除最后一个之外的所有结果。它将在不等待函数的情况下调用每个函数,这意味着它将告诉所有函数开始,然后完成,而不等待函数完成。

import fs from 'fs-promise'

async function printFiles () {
  const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'))

  for(const file of files)
    console.log(await file)
}

printFiles()

这是本机JS中的一个示例,它将保持顺序,防止函数过早返回,并在理论上保持最佳性能。

这将:

启动所有并行文件读取。通过使用映射将文件名映射到要等待的承诺来保持顺序。按照数组定义的顺序等待每个承诺。

使用此解决方案,第一个文件将在其可用时立即显示,而无需等待其他文件首先可用。

它还将同时加载所有文件,而不必等待第一个文件完成后才能开始第二次文件读取。

这和原始版本的唯一缺点是,如果一次启动多个读取,则由于一次可能发生更多错误,因此处理错误更困难。

对于一次读取一个文件的版本,则会在出现故障时停止,而不会浪费时间尝试读取更多文件。即使有一个精心设计的取消系统,也很难避免它在第一个文件上失败,但也很难读取大部分其他文件。

性能并不总是可预测的。虽然许多系统的并行文件读取速度会更快,但有些系统更倾向于顺序读取。有些是动态的,可能会在负载下发生变化,提供延迟的优化在激烈竞争下并不总能产生良好的吞吐量。

该示例中也没有错误处理。如果有什么东西要求他们要么全部成功展示,要么根本不展示,那它就做不到。

建议在每个阶段使用console.log进行深入实验,并使用假文件读取解决方案(随机延迟)。尽管许多解决方案在简单的情况下似乎都是一样的,但它们都有细微的差异,需要额外的仔细检查才能挤出。

使用此模拟来帮助区分解决方案之间的差异:

(async () => {
  const start = +new Date();
  const mock = () => {
    return {
      fs: {readFile: file => new Promise((resolve, reject) => {
        // Instead of this just make three files and try each timing arrangement.
        // IE, all same, [100, 200, 300], [300, 200, 100], [100, 300, 200], etc.
        const time = Math.round(100 + Math.random() * 4900);
        console.log(`Read of ${file} started at ${new Date() - start} and will take ${time}ms.`)
        setTimeout(() => {
          // Bonus material here if random reject instead.
          console.log(`Read of ${file} finished, resolving promise at ${new Date() - start}.`);
          resolve(file);
        }, time);
      })},
      console: {log: file => console.log(`Console Log of ${file} finished at ${new Date() - start}.`)},
      getFilePaths: () => ['A', 'B', 'C', 'D', 'E']
    };
  };

  const printFiles = (({fs, console, getFilePaths}) => {
    return async function() {
      const files = (await getFilePaths()).map(file => fs.readFile(file, 'utf8'));

      for(const file of files)
        console.log(await file);
    };
  })(mock());

  console.log(`Running at ${new Date() - start}`);
  await printFiles();
  console.log(`Finished running at ${new Date() - start}`);
})();

替换forEach()等待循环的一个简单的解决方案是用map替换forEach,并在开头添加Promise.all()。

例如:

await y.forEach(异步(x)=>{

to

await Promise.all(y.map(异步(x)=>{

结尾处需要一个额外的)。