Nodejs Stream pipe 的使用与实现原理分析


经过流咱们能够将一大块数据拆分为一小部分一点一点的活动起来,而无需一次性悉数读入,在 Linux 下咱们能够经过 | 符号完结,相似的在 Nodejs 的 Stream 模块中同样也为咱们提供了 pipe() 办法来完结。

1. Nodejs Stream pipe 根本示例

挑选 Koa 来完结这个简略的 Demo,由于之前有人在 “Nodejs技能栈” 交流群问过一个问题,怎样在 Koa 中回来一个 Stream,趁便在下文借此机会提下。

1.1 未运用 Stream pipe 状况

在 Nodejs 中 I/O 操作都是异步的,先用 util 模块的 promisify 办法将v g s @ H # X / d fs.readFile 的 callback 办法转为 P( r B cromise 办法9 p ! 2 = * j u `i = X # H这块代码看似没问题,可是它的体会不是很好,由于它是将数据一次性读入内存再进行的回来,当数据文件很大的时分也是对内存的一种消耗,因此不推荐它。; i ! J O

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = r: r ; 4 9 . s =equire('path');
const readFile = promisifyL 0 D q ` % F . I(fs.readFile);
aJ } 1 G Z [ ) _pp.uM p ^ 5 Yse(async ctx => {
try {
ctx.body = await readFile(resolve(__dirname, 'test.json'))_ r c M L o  |;
} catch(err) { ctx.body = err };
});
app.listen(3000);

1.2 运用 StreamQ j D pipe 状况

下面,再看看怎样经过 Stream 的办法在 Koa 框架中呼应数据

...
app.use(async ctx => {
try {
const readabl% @ % , N Ne = fs.createReadStream(t 0 x p j M ^ /resolve(__dirnama @ 8 u p 6 We, 'test.js6 g l Don'));
ctx.body = readabO | A J = m Z m Ile;
} catch(err) { ctx.body = err };
});

以上在 Koa 中直接创立一个可读流赋值给 ctx.body 就能够了,你可能疑问了为什么没有 pipe 办法,由于框架给你封装好了,不要被表象所利诱了,看下相关源K K O 5 @ / P s h码:

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {
...
let body = ctx.body;
if (body instanceof Stream) return body.pipe(res);
...k j 0  Q w
}

没有奇特之处,框架在回来的时分做了层判别,由于 res 是一个可写流目标,假如 body 也是一个 Stream 目标(此刻的 Body 是一个可读流),则运用 body.pipe(res) 以流的办法进行呼应。

1.3 运用 Stream VS 不运用 Stream

Nodejs Stream pipe 的使用与实现原理分析
Nodejs Stream pipe 的使用与实现原理分析

看到一个图片,不得不说画的实在太萌了,来历 www.cnblogs; i / s.com/vajoy/p/634…

2 pipe 的调用进程与完结原理剖析

以上最终以流的办法呼应数据最中心的完结就是运用e d ~ ) pipe 办法来完结的输入、输出,本节的要点也是研讨 pipe 的完结,最好的打开办法经过阅览源码完结吧。

2.1 顺藤摸瓜

在应用层咱们调用了 fL m R #s.createReadStream() 这个办法,顺藤摸瓜找到这个办法创立的可读流目标的 pipe 办法完结,以下仅列举中心代码完结,根据 Nodejs v12.x 源码。

2.1.1 /lib/fs.js

导出一个 createReadStream 办法,4 _ H i `在这个办法里边创立了一个 ReadStream 可读流目标,且 ReadStream 来自 internalL v u ] q q t/fs/streams 文件,持续向; y f } j _ 5 r下找。

// https://github.com/nodejs/node/blob/v12.xg  e L W X i G/lib/fs.js
// 懒加载,主要在用到的时分用来实例化 ReadStream、WriteStream ... 等目标
function lazyLoadStreams() {
if (!ReadStream) {
({ ReadStream,] b y WriteStream } = require('internal/fs/streams'));
[ FileReadStream, FileWriteStream ] = [ ReadStream, Writf a yeStreaz % } G wm ];
}
}
function createReadStream(path, options) {
lazyLoadStreams();
return new ReadStream(path, options); // 创v H ; & 2立一个可读流
}
module.exports = fs =@ } - r {
createReadStream, // 导出 createReadSt# x ~ E H ) / , Fream 办法
...
}

2.1.2 /lib/internal/fs/streams.js

这个办法里界说了结构函数 ReadStream,且在原型上界说了 open、_read、_destroy 等办法,并没有咱们要找的 pipe 办法。

可是呢经过 Obje_ ; { = _ctSetPrototypr t 6 F ] A 5 q 5eOf 办法完结了承继,9 ! i U E 3ReadStream 承继了 Rean i G X M Hdable 在原型中界说的函数,接下来持续查找 Readable 的完结

// https://github.com/nodejs/nod| $ ~ 5 , 7 fe/blob/v12.x/lib/internal/fs/strK M # W y eeams.js
const { Readable, Writable } = require('stream');
function ReadStream(path, options) {
if (!(t( } m o W d phis instanceof R[ i f N [ B n IeadStream))
return new ReadStre= ( *am(path, options);
...
Reada. M =ble.call(this, options);
...
}
Objec! w Z U @tSetPrototypeOf(ReadSi L K I ntream.prototype,5 ) u : 2 B 1 C Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);
ReadStream.prototype.open = function() { ... };
ReadStream.prototype._read = funct/ 9 L Q d aion(n) { ... };;
Readn  U % o (  pStrT | ` V }eam.prototype._destroy = function(err, cb) { ... };
...
moduq E [ 8le.exports = {
ReadStream,
WriteStream
};

2.1.3 /lib/stream.js

在 stream.js 的完结中,有条注释:在 Readable/Writable/Duplex/… 之前导入 Strea$ f $ c Mm,T J /原因是为了防止 cross-reference(require),为什么会这样?

第一步 stream.js 这里将 require(‘internal/streams/legacy’) 导出复制给了 Stream。

在之后的 _stream_readable、Writable、Duplex … 模块也会反过来引证 stream.js 文件,详细完结下面会看到。

Stream 导入了 internal/streams/legacy

上面 /lP J = W n ? ^ | +ib/internal/fs/streams.js 文件从 stream 模块获取了一个 Readable 目标,就是下面的 Stream.Readable 的界说。

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
/? U a/ Note: export Stream before Readable/Wr H [itable/Duplex/...
// to avoid a cross^ l s `-reference(require) issues
const Stream = module.n # ~ X w u G Y Yex4 Y / E l Lpf  T m : 9 oorts = require('internal/streams/legacy');
Stream.Readable = require('_stream_readable');
Stream.Writable =- 4 2 u 2 require('_stream_writable');
StrO o r r c Oeam.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrougz X B f Y * 6 e :h');
...

2.1.4 /lib/internal/streams/legacy.js

上面的 S 0 e btream 等于 internal/streams/legacy,首先承W + * v _继了 Events 模块,之后呢在原型上界说了 pipe 办法,刚开始看n & ( h * y 8到这里的时分以为完结是在V 3 9 u B这里了,但后来看 _stream_readable 的完结之后,发现 _stream_readablej / # F – D : 承继了 Stream 之后自己又从头完结了 pipe 办法,那么疑问来: ( K了这个模块的 pipe 办法是干嘛的?什么时分会被用?翻译文件名 “legacy=留传”?有点没太理解,难道是留传了?有清楚的大佬U ^ ) =能够点拨下,也欢迎在大众号 “Nodejs技能栈” 后台加我微信一块评论下!

// https://github.com/nof D r , C *dejs/node/blobr v / y/v12.x/lib/internal/stre@ / u L [ams/le% R & P l C Ygacy.js
const {
ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototy5 e |pe, EE.prol / y J Stotype)x b N;
ObjectSetPrototypeOf(Stream& G ! !, EE);J L ;
Strl Z Y { s j Q ^eam.prototype.pipe = function(dest, options) {
...
};
moduG ; F g : N u fle.exports = Stream;

2.1.5 /lib/_streams V k L [ w R_readable.js

在 _stream_readable.js 的完结里边界说了 Readable 结构函数,且承继于 Stream,这个 Stream 正是咱们上面说到的 /lib/stream.js 文件,而在 /lib/stream.js 文件里加载了 internal/str} : d 4 6eams/legacy 文件且重写了里边界说的 pipe 办法。

经过上面一系列的剖析,总算( 9 H ,找到可读流的 pipe6 / L m 在哪里,同时也[ D 6 { %更进一步的知道到了在创立一个可读流时的履O { X !行调用进程,下面将要点来看这个办法的完结。

module.exports = Readable;
Readable.ReM s x # M ^adableState = ReadableState;
const EE = require('events');
const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototype a 9 - R : fOf(Readable, Stream);
function Readable(op4 _ ] ~ :tions) {
if (!(this instaS [ J X n G unceof Readable))
retur k , 8 4 q m 9n neP c 6 G 8w Readable(options);
...
Stream.call(this, options); // 承继自 Stream 结构函数的界说
}
...

2.2 _stream_readable 完结剖析

2.2.1 声明结构函数 Readable

声明结构函数 Readaby { 6 5 ! X :le 承继 Stream 的结构函数和原型。

Stream 是 /lib/stream.js 文件,@ : R E z j Y 0 J上面剖析了,这个文件承继了0 A 2 ~ events 事情,此刻也就拥有了 events 在原型中界说的特点,例如 on、emA A 1 % K @ $it 等办法。

const Stream = r{ v B q Uequire('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObN 3 @ : 2  Q LjectSetPrototypeOf(Readabl@ Z 7 W c l F v 8e, Stream);
functioX T i Y + F nn ReaK : g d 1 H ; I 4dabN 4 +le(options) {
if (!(this inv | l C v 9stanceof Readable))
return new Readable(options);
...
Stream.call(this, options);
}

2.2.2 声明 pipe 办法,订阅 data 事情

在 Stream 的原型. d B ` E M M上声明) – w : 7 pid J t C hpe 办法u E ` . f L 4 B –,订阅 data 事情,src 为可读流目标,dest 为可写流目标。

咱们在运用 pipe 办法的时+ D ! !分也是监听的 data 事情,一边读取数据一边写入数据。

看下 ondata() 办法里的几个中心完结c H 8 + v Y &

  • dest.write(chunk):接纳 chunk 写入数据,假如内z N y ` 9部的缓冲小于创立流时装备的 highWate4 # Y % 4 [ A Y orMark,则回T V , R来 true,否则回来 false 时应该停止向流写入数据,直到 ‘drain’ 事情被触发
  • src.pause():可读流会停止 data 事情,意味着此刻暂停数据写入了。

r n ( R V 5 ^所以调用 src.pause() 是为了防止读入数据过快来不及写入,什么时分知道来不及写入呢,要看 dest.write(chunk) 什么时分回来 false,是根据创立流时传的 highb z o , # / U mWat. { G 8 * ?erMark 特点,默认为 16384 (16kb),目标形式的流默认为 16。

Readable.prototype.pipe = function(dest, options) {
const src = this;
src.on('dN o z Qata', ondata);
function ondata(chunk) {
const reW y F 2 s ) Z rt = dest.write(chunkL = $ : 2 ) 5);
if (ret === false) {
...
src.paus^ 3 E v 8 _e();
}
}
...
};

2.2.3 订阅 drai# s B ; jn 事情,持续活动数据

上面说到在 data 事情里,假如调用 dest.write(chunk) 回来 false,就会调用 src.pause() 停止数据活动,什么时分再次开启呢?

假如说能够持续写入事情到流` | 7 _时会触发 drain 事情,也是在 dest.write(d X 2chunk) 等于 false 时,假如 ondrain 不存在则注册 drain 事情。

Readable.prototype.pipe = function(dest, options) {
const src = this;
src.on('data',( ~ r D I o ^ Z v ondata);
fun5 z 4 x ` g Oction ondata(chunk) {
const ret = dest.write(chunk);
if (rw j [ |et === false) {
...
if (!ondrain)9 R g U - & / {
// When the dest drains, it reduces the awaitDrain counter
// on the source.2 & c = =  This would be more eF b $ $legant with a .once()
//5 h e 4 w p v I handler in flow(), but adding and removing r; B u I G Q @epeatedly is
// too slow.
ondrain = pipeOnDrain(src);
dest.on('dra/ B [ u X ! { ^in', ondrain);
}
src.pause();
}
}
...
};
// 当可写入流 dest 耗尽时,它将会在可读流目标 source 上削减 awaitDrain 计数器
// 为了保证所有需要缓冲的写入都完结,即 state.awaitDrain === 0 和 src 可读流上的 data 事情存在,切换流到活动形式
fu6 P + f O #nction pipeOnDrain(src) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
debug('pip) m A  T jeOnDrain', state.awaitDrain);
if (state.awaitDrainj K T M ? # r O !)
state.awaitk X y 5 Drain--;
if (state.awaitp a y % cDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
// stream.read() 从内部缓冲拉取并回来数据。假如没有可读的数据,则回来 null。在可读流上 src 还有一个 readable 特点,假如能够安全地调用 readable.read(),则为 tH 6 A ~ _ @rue
function flo( 7 _  I d  7w(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && streh 2 E r 4 R 0 A Uam.read() !== null);
}

2d E z g o.2.4 触发 data 事情

调用 readable 的 resume()R L c } @ | E 4 办法,触发可读流的 ‘data’ 事情,进入活! y n B p动形式。

Readable.prototype.pipe = function(dest, options) {
const src = this;
// Start the flow if it hasn't bew  l J  c Ren started already.
if (!statI X * ^ t 4 e.flowing) {
debug('pipe resume');
src.resume();
}
..8 4 + O a o Y.

然后实例上的 resume(Readable 原型上界说的)会在调用 resume() 办法,在该办法内部又调用了 resume_(),最终履行了 stream.read(0) 读取了一次空数据(size 设置的为 0),将会触发实例上的 _read() 办法,之后会在触发 data 事情。

function resume(stream, state) {
...
process.nR 2 CextTick(resume_, stream, state);
}
function resume_(stream, state) {
d3 k 8 c | b D 0ebug('resume', state.re$ = u P 2 z Zading);
if (!state.rea! Z lding) {
stream.read(0);
}
...
}

2.2.5 订阅 end 事情

end 事情:当可读流中没有数据可供消费时触发,调用 onend 函数,履行 dest.end() 办法q f S D,标明已没有数据要被写入可写流,进行封闭(封闭可写流的 fd),之后再调用 stream.write() 会导致过错。

Readable.prototype.pip6 V ( H k le = function(dest, options) {
...
const doEnd = (!pipeOpts || pipeOpts.endq l [  Z !== faR N L t . Ilse) &&
dest !== process.stdout &&
dest !== procesm | b _ m U us.stderr;
const endFn = doEr n C u k Qnd ? onend : unpipe;
if (state.endEmitted)
process.n= x S |extTick(endFn);
else
src.once('end', endFn);
dest.on('unpipe', onunpipM e n /e);
...
function onend() {
debug('onend');
dest.end();
}
}

2.2.6 触8 a ;发 pipe 事情

在 pipe 办法里边最终还会触发一个 pipe 事情,传入可读流目标

Readable.prototype.pipe = function(dest, options) {
...
const source = this;
dest.emit('pipe'#  * m j | ^ f l, src);
...
};

在应用层运用的时分能够在可写流上订阅 pipe 事情,做一些判别,详细可参考官网给的这个示例 stream_event_pipe

2.2.7 支撑链式调用

最终回来 dest,支撑相似 unix 的用法:A.pipe(B).pipe(C)

Readable.prototype.pipe = function(dest, options) {
return dest;
};

3. 总结

本文整体分为两部分:

  • 第一部分相对较基础,讲解了 Nodejs Stream 的 pipe 办法在 Koa2 中是怎样i p . # Z G N去应用的。
  • 第二部分仍以 Nodejs Stream pipe 办法为题,查找, ? m它的完结,以及对源码的一个简略剖析,其实 pipe 办法中心还是要去监听 data 事情,向可写流写入数据,假如内部缓冲大于创立流时装备的 highWaterMark,则要停止数据活动,直到 drain 事情触发或者完毕,当然还要监听 end、error 等事情做一些处理t ` f Y = 4 = $ R

4. Reference

  • nodejs.cn/api/stream.html
  • cnodejs.org/toG ; @ p ] !pic/56ba030271204e03637a3870
  • github.com/nodejs/node/blob/master/lib/_stream_readable.js

发表评论

提供最优质的资源集合

立即查看 了解详情