我正在入侵一个Node程序,该程序使用SMTP协议捕获SMTP电子邮件并对邮件数据采取行动。库将邮件数据作为流提供,但我不知道如何将其转换为字符串。
我目前正在用stream.pipe(process. pipe)将其写入标准输出。stdout, {end: false}),但正如我所说的,我需要一个字符串中的流数据,一旦流结束,我就可以使用它。
我如何收集所有的数据从一个Node.js流到一个字符串?
我正在入侵一个Node程序,该程序使用SMTP协议捕获SMTP电子邮件并对邮件数据采取行动。库将邮件数据作为流提供,但我不知道如何将其转换为字符串。
我目前正在用stream.pipe(process. pipe)将其写入标准输出。stdout, {end: false}),但正如我所说的,我需要一个字符串中的流数据,一旦流结束,我就可以使用它。
我如何收集所有的数据从一个Node.js流到一个字符串?
当前回答
我通常使用这个简单的函数将流转换为字符串:
function streamToString(stream, cb) {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk.toString());
});
stream.on('end', () => {
cb(chunks.join(''));
});
}
使用的例子:
let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
console.log(data); // data is now my string variable
});
其他回答
像减流器这样的东西怎么样?
下面是一个使用ES6类的例子如何使用一个。
var stream = require('stream')
class StreamReducer extends stream.Writable {
constructor(chunkReducer, initialvalue, cb) {
super();
this.reducer = chunkReducer;
this.accumulator = initialvalue;
this.cb = cb;
}
_write(chunk, enc, next) {
this.accumulator = this.reducer(this.accumulator, chunk);
next();
}
end() {
this.cb(null, this.accumulator)
}
}
// just a test stream
class EmitterStream extends stream.Readable {
constructor(chunks) {
super();
this.chunks = chunks;
}
_read() {
this.chunks.forEach(function (chunk) {
this.push(chunk);
}.bind(this));
this.push(null);
}
}
// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
["hello ", "world !"]
.map(function(str) {
return Buffer.from(str, 'utf8');
})
)).pipe(new StreamReducer(
function (acc, v) {
acc.push(v);
return acc;
},
[],
function(err, chunks) {
console.log(Buffer.concat(chunks).toString('utf8'));
})
);
这对我来说是有效的,并且基于Node v6.7.0文档:
let output = '';
stream.on('readable', function() {
let read = stream.read();
if (read !== null) {
// New stream data is available
output += read.toString();
} else {
// Stream is now finished when read is null.
// You can callback here e.g.:
callback(null, output);
}
});
stream.on('error', function(err) {
callback(err, null);
})
我通常使用这个简单的函数将流转换为字符串:
function streamToString(stream, cb) {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk.toString());
});
stream.on('end', () => {
cb(chunks.join(''));
});
}
使用的例子:
let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
console.log(data); // data is now my string variable
});
另一种方法是将流转换为promise(参考下面的示例),然后使用then(或await)将解析值分配给变量。
function streamToString (stream) {
const chunks = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
stream.on('error', (err) => reject(err));
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
})
}
const result = await streamToString(stream)
所有列出的答案似乎都以流动模式打开可读流,这不是NodeJS的默认模式,并且可能有局限性,因为它缺乏NodeJS在暂停可读流模式中提供的反压力支持。 这里是一个使用Just Buffers、本机流和本机流转换并支持对象模式的实现
import {Transform} from 'stream';
let buffer =null;
function objectifyStream() {
return new Transform({
objectMode: true,
transform: function(chunk, encoding, next) {
if (!buffer) {
buffer = Buffer.from([...chunk]);
} else {
buffer = Buffer.from([...buffer, ...chunk]);
}
next(null, buffer);
}
});
}
process.stdin.pipe(objectifyStream()).process.stdout