我试图读取一个大文件一行在一次。我在Quora上找到了一个关于这个主题的问题,但我错过了一些联系,把整个事情联系在一起。

 var Lazy=require("lazy");
 new Lazy(process.stdin)
     .lines
     .forEach(
          function(line) { 
              console.log(line.toString()); 
          }
 );
 process.stdin.resume();

我想要弄清楚的是如何一次从文件中读取一行,而不是像本例中那样读取STDIN。

我试着:

 fs.open('./VeryBigFile.csv', 'r', '0666', Process);

 function Process(err, fd) {
    if (err) throw err;
    // DO lazy read 
 }

但这并不奏效。我知道在必要时我可以使用PHP之类的东西,但我想弄清楚这个问题。

我不认为其他答案会起作用,因为文件比我运行它的服务器的内存大得多。


当前回答

带载波模块:

var carrier = require('carrier');

process.stdin.resume();
carrier.carry(process.stdin, function(line) {
    console.log('got one line: ' + line);
});

其他回答

虽然您可能应该像上面的答案所建议的那样使用readline模块,但readline似乎面向命令行接口,而不是行读取。关于缓冲,它也有点不透明。(任何需要流式行阅读器的人可能都想调整缓冲区大小)。readline模块大约有1000行,而这个,加上统计和测试,是34行。

const EventEmitter = require('events').EventEmitter;
class LineReader extends EventEmitter{
    constructor(f, delim='\n'){
        super();
        this.totalChars = 0;
        this.totalLines = 0;
        this.leftover = '';

        f.on('data', (chunk)=>{
            this.totalChars += chunk.length;
            let lines = chunk.split(delim);
            if (lines.length === 1){
                this.leftover += chunk;
                return;
            }
            lines[0] = this.leftover + lines[0];
            this.leftover = lines[lines.length-1];
            if (this.leftover) lines.pop();
            this.totalLines += lines.length;
            for (let l of lines) this.onLine(l);
        });
        // f.on('error', ()=>{});
        f.on('end', ()=>{console.log('chars', this.totalChars, 'lines', this.totalLines)});
    }
    onLine(l){
        this.emit('line', l);
    }
}
//Command line test
const f = require('fs').createReadStream(process.argv[2], 'utf8');
const delim = process.argv[3];
const lineReader = new LineReader(f, delim);
lineReader.on('line', (line)=> console.log(line));

下面是一个更短的版本,没有统计数据,只有19行:

class LineReader extends require('events').EventEmitter{
    constructor(f, delim='\n'){
        super();
        this.leftover = '';
        f.on('data', (chunk)=>{
            let lines = chunk.split(delim);
            if (lines.length === 1){
                this.leftover += chunk;
                return;
            }
            lines[0] = this.leftover + lines[0];
            this.leftover = lines[lines.length-1];
            if (this.leftover) 
                lines.pop();
            for (let l of lines)
                this.emit('line', l);
        });
    }
}

当我试图处理这些行并将它们写入另一个流时,我最终使用Lazy逐行读取大量内存泄漏,这是由于节点工作中的drain/pause/resume方式(参见:http://elegantcode.com/2011/04/06/taking-baby-steps-with-node-js-pumping-data-between-streams/(我喜欢这个家伙顺便说一句))。我还没有仔细研究Lazy,无法确切地理解其中的原因,但是我无法暂停读流以允许在Lazy退出的情况下进行排泄。

我写了代码来处理大量的csv文件到xml文档,你可以在这里看到代码:https://github.com/j03m/node-csv2xml

如果你用Lazy line运行之前的版本,它就会泄露。最新的版本完全没有泄露,你可以把它作为一个阅读器/处理器的基础。虽然我有一些定制的东西在里面。

编辑:我想我还应该指出,我用Lazy编写的代码工作得很好,直到我发现自己编写了足够大的xml片段,因为必要而耗尽/暂停/恢复。对于较小的块,这是可以的。

带载波模块:

var carrier = require('carrier');

process.stdin.resume();
carrier.carry(process.stdin, function(line) {
    console.log('got one line: ' + line);
});

我想解决同样的问题,基本上在Perl中是这样的:

while (<>) {
    process_line($_);
}

我的用例只是一个独立的脚本,而不是服务器,所以同步就可以了。以下是我的标准:

可以在许多项目中重用的最小同步代码。 不限制文件大小或行数。 不限制线的长度。 能够处理UTF-8的完整Unicode,包括BMP以外的字符。 能够处理*nix和Windows行结束符(老式Mac对我来说不需要)。 行中要包含的行结束字符。 能够处理带有或不带有行尾字符的最后一行。 不要使用node.js发行版中不包含的任何外部库。

这是一个让我在node.js中感受低级脚本类型代码的项目,并决定它作为其他脚本语言(如Perl)的替代品的可行性。

经过惊人的努力和一些错误的开始,这是我提出的代码。它非常快,但没有我预期的那么琐碎:(在GitHub上分叉)

var fs            = require('fs'),
    StringDecoder = require('string_decoder').StringDecoder,
    util          = require('util');

function lineByLine(fd) {
  var blob = '';
  var blobStart = 0;
  var blobEnd = 0;

  var decoder = new StringDecoder('utf8');

  var CHUNK_SIZE = 16384;
  var chunk = new Buffer(CHUNK_SIZE);

  var eolPos = -1;
  var lastChunk = false;

  var moreLines = true;
  var readMore = true;

  // each line
  while (moreLines) {

    readMore = true;
    // append more chunks from the file onto the end of our blob of text until we have an EOL or EOF
    while (readMore) {

      // do we have a whole line? (with LF)
      eolPos = blob.indexOf('\n', blobStart);

      if (eolPos !== -1) {
        blobEnd = eolPos;
        readMore = false;

      // do we have the last line? (no LF)
      } else if (lastChunk) {
        blobEnd = blob.length;
        readMore = false;

      // otherwise read more
      } else {
        var bytesRead = fs.readSync(fd, chunk, 0, CHUNK_SIZE, null);

        lastChunk = bytesRead !== CHUNK_SIZE;

        blob += decoder.write(chunk.slice(0, bytesRead));
      }
    }

    if (blobStart < blob.length) {
      processLine(blob.substring(blobStart, blobEnd + 1));

      blobStart = blobEnd + 1;

      if (blobStart >= CHUNK_SIZE) {
        // blobStart is in characters, CHUNK_SIZE is in octets
        var freeable = blobStart / CHUNK_SIZE;

        // keep blob from growing indefinitely, not as deterministic as I'd like
        blob = blob.substring(CHUNK_SIZE);
        blobStart -= CHUNK_SIZE;
        blobEnd -= CHUNK_SIZE;
      }
    } else {
      moreLines = false;
    }
  }
}

它可能会被进一步清理,这是试验和错误的结果。

如果你想逐行读取一个文件,并将其写入另一个文件:

var fs = require('fs');
var readline = require('readline');
var Stream = require('stream');

function readFileLineByLine(inputFile, outputFile) {

   var instream = fs.createReadStream(inputFile);
   var outstream = new Stream();
   outstream.readable = true;
   outstream.writable = true;

   var rl = readline.createInterface({
      input: instream,
      output: outstream,
      terminal: false
   });

   rl.on('line', function (line) {
        fs.appendFileSync(outputFile, line + '\n');
   });
};