我希望处理一个文本文件与节点使用命令行调用,如:
节点app.js < input.txt
文件的每一行都需要单独处理,但是一旦处理了输入行就可以忘记。
使用stdin的on-data侦听器,我得到输入蒸汽按字节大小分块,所以我设置了这个。
process.stdin.resume();
process.stdin.setEncoding('utf8');
var lingeringLine = "";
process.stdin.on('data', function(chunk) {
lines = chunk.split("\n");
lines[0] = lingeringLine + lines[0];
lingeringLine = lines.pop();
lines.forEach(processLine);
});
process.stdin.on('end', function() {
processLine(lingeringLine);
});
但这看起来太草率了。必须围绕行数组的第一个和最后一个项目进行按摩。就没有更优雅的方式了吗?
逐行读取流,应该适合大文件管道到stdin,我的版本:
var n=0;
function on_line(line,cb)
{
////one each line
console.log(n++,"line ",line);
return cb();
////end of one each line
}
var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');
var buffer=[];
readStream.on('data', (chunk) => {
const newlines=/[\r\n]+/;
var lines=chunk.split(newlines)
if(lines.length==1)
{
buffer.push(lines[0]);
return;
}
buffer.push(lines[0]);
var str=buffer.join('');
buffer.length=0;
readStream.pause();
on_line(str,()=>{
var i=1,l=lines.length-1;
i--;
function while_next()
{
i++;
if(i<l)
{
return on_line(lines[i],while_next);
}
else
{
buffer.push(lines.pop());
lines.length=0;
return readStream.resume();
}
}
while_next();
});
}).on('end', ()=>{
if(buffer.length)
var str=buffer.join('');
buffer.length=0;
on_line(str,()=>{
////after end
console.error('done')
////end after end
});
});
readStream.resume();
解释:
to cut it correctly on utf8 letter and not in middle byte set encoding to utf8 it ensures it emits each time full multibyte letter.
When data is received the input is paused. It is used to block the input until all lines are used up. It prevents overflowing the buffet if the lines processing function is slower than input.
If there is every time a line without newlines each time. need to accommulate it for all calls and do nothing, return . once there are more than one line also append it and use the accommulated buffer.
after all the splitted lines were consumed. On the last line push the last line to buffer and resume paused stream.
es6代码
var n=0;
async function on_line(line)
{
////one each line
console.log(n++,"line ",line);
////end of one each line
}
var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');
var buffer=[];
readStream.on('data', async (chunk) => {
const newlines=/[\r\n]+/;
var lines=chunk.split(newlines)
if(lines.length==1)
{
buffer.push(lines[0]);
return;
}
readStream.pause();
// let i=0;
buffer.push(lines[0]); // take first line
var str=buffer.join('');
buffer.length=0;//clear array, because consumed
await on_line(str);
for(let i=1;i<lines.length-1;i++)
await on_line(lines[i]);
buffer.push(lines[lines.length-1]);
lines.length=0; //optional, clear array to hint GC.
return readStream.resume();
}).on('end', async ()=>{
if(buffer.length)
var str=buffer.join('');
buffer.length=0;
await on_line(str);
});
readStream.resume();
我没有测试es6代码
如果你想先问用户行数:
//array to save line by line
let xInputs = [];
const getInput = async (resolve)=>{
const readline = require('readline').createInterface({
input: process.stdin,
output: process.stdout,
});
readline.on('line',(line)=>{
readline.close();
xInputs.push(line);
resolve(line);
})
}
const getMultiInput = (numberOfInputLines,callback)=>{
let i = 0;
let p = Promise.resolve();
for (; i < numberOfInputLines; i++) {
p = p.then(_ => new Promise(resolve => getInput(resolve)));
}
p.then(()=>{
callback();
});
}
//get number of lines
const readline = require('readline').createInterface({
input: process.stdin,
output: process.stdout,
terminal: false
});
readline.on('line',(line)=>{
getMultiInput(line,()=>{
//get here the inputs from xinputs array
});
readline.close();
})