如何解决Node 中的简单缓冲双工流 - 如何?
我正在尝试实现具有缓冲功能的双工流。
它应该积累数据块,直到有足够的数据,然后再进一步发送。
例如,在播放流式音频/视频数据时可以使用它:不会简单地及时获取帧,对吗?
下面是我创建这样一个缓冲双工流的愚蠢尝试。有一个源流,它将 x\n
个字符发送到缓冲流,而缓冲流又应该将数据进一步发送到 process.stdout
。
唉,它不起作用。具体来说,read()
函数似乎没有任何暂停或停止的方法,例如:
“嘿,我现在没有你的数据,稍后回来”。
不,一旦我返回 undefined
或 null
,故事就结束了,标准输出没有任何内容。
var {Readable,Duplex} = require('stream');
// Source stream,seeds: x\n,x\n,...
let c = 10;
var rs = new Readable({
read () {
if (c > 0) {
c--;
console.log('rs reading:','x');
this.push('x\n');
}
else {
this.push(null)
}
},});
// Buffering duplex stream
// I want it to cache 3 items and only then to proceed
const queue = [];
const limit = 3;
var ds = new Duplex({
writableHighWaterMark: 0,write (chunk,encoding,callback) {
console.log('ds writing:',chunk,'paused: ',ds.isPaused());
queue.push(chunk);
callback();
},readableHighWaterMark: 0,read () {
// We don't want to output anything
// until there's enough elements in the `queue`.
if (queue.length >= limit) {
const chunk = queue.shift();
console.log('ds reading:',chunk);
this.push(chunk);
}
else {
// So how to wait here?
this.push(undefined)
}
},});
// PROBLEM: nothing is coming out of the "ds" and printed on the stdout
rs.pipe(ds).pipe(process.stdout);
这是我的回复:https://repl.it/@OnkelTem/BufferingStream1-1#index.js
我检查了双工的状态,它甚至没有处于暂停状态。所以它没有暂停,它在流动,但 - 什么都不返回。
我还花了几个小时重新阅读有关 Node 流的文档,但实际上并不觉得它是为理解而创建的。
解决方法
缓冲流只是一种转换流。如果我理解您要正确执行的操作,那么实现应该不会比这更复杂:
const { Transform } = require('stream');
const DEFAULT_CAPACITY = 10;
class BufferingTransform extends Transform {
constructor(options = {}) {
super(options);
this.capacity = options.capacity || DEFAULT_CAPACITY ;
this.pending = [] ;
return;
}
get atCapacity() {
return this.pending.length >= this.capacity;
}
_transform(chunk,encoding,cb) {
if ( this.atCapacity ) {
this.push( ...this.pending.shift() );
}
this.pending.push( [chunk,encoding] );
cb();
}
_flush(cb) {
while (this.pending.length > 0) {
this.push( ...this.pending.shift() );
}
cb();
}
}
一旦你有了它,它应该只是通过 BufferingStream and reading from the
BufferingStream` 管道你的源的问题:
async function readFromSource() {
const source = openSourceForReading();
const buffer = new BufferingStream();
source.pipe(buffer);
for await (const chunk of buffer) {
console.log(chunk);
}
}
,
这是一个使用异步迭代的实现:
function bufferStream(stream,bufferCount){
stream = normalizeAsyncIterable(stream);
const iterator = stream[Symbol.asyncIterator]();
const queue = []
while(queue.length < bufferCount)
queue.push(iterator.next());
return normalizeAsyncIterable({
[Symbol.asyncIterator]: () => ({
next: async () => {
const promise = queue.shift() ?? iterator.next();
while(queue.length < bufferCount)
queue.push(iterator.next());
return promise;
}
})
});
}
// Ensures that calls to .next() while the generator is paused are handled correctly
async function* normalizeAsyncIterable(iterable){
for await(const value of iterable)
yield value;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。