流(Stream)主要用于顺序的数据处理,比如文件读写,从第一行开始,直到最后一行,按照文件的书写顺序依次读写,和我们读书时,用手指指着字读书是一个道理。先用手指指第一个字,读完第一个字,再把手指指第二个字,再读第二字,手指按照书本的书写顺序,依次向后,我们依次读取,直到最后一个字,读取完毕。再比如网络数据传输, 或任何端到端的数据交换,都是有先后顺序的。
流在处理数据时,又与传统方式有所不同,它不是把数据作为一个整体进行处理(传统方式),而是把数据分割成一块一块的进行处理。文件读取时,流并不是把文件的所有内容都读取到内存中,而是只读取一块数据,等待这块数据处理完毕,比如把这块数据写入到另外一个文件中,再读取另一块数据,循环往复,直到文件读取完毕。读取一块数据,处理一块数据,流不会让数据一直在内存中,因此使用流处理数据,可以高效的使用内存,更有可能来处理大文件。再以网络数据传输(网上看视频)为例,并不是把整个电影都从服务器上下载下来才开始播放,而是一块一块地下载,下载一块,播放一块。服务器一块一块地写数据,浏览器一块一块的读数据。用流处理数据,时间上也比较高效。在Node.js中,有以下4种流:
可读流(readable stream): 可以从里面读取数据的流。它负责从数据源里读取数据(到内存),我们负责从它里面读取数据,可以把它看作数据源的抽象。
可写流(writable stream): 可以向其里面写入数据的流,我们向可写流里写入数据(把内存中的数据写入到可写流中),它负责向目的地写入数据,它是目的地的抽象。
双向流或双工流(duplex stream): 既可以从它里面读取数据,也可以向它里面写入数据。
转换流(transform stream):它接受一个流,把流里面的内容进行转换,然后再把流输出,流的性质没变,只是流的内容发生了变化,通常转化的是可读流。
使用流的方式有两种,事件(event) 和管道(pipeline)。
流是一个事件触发器(EventEmitter),完成某项任务后,它就会发出事件,比如,可读流读取到数据后,就会发出data事件,我们只需监听事件,然后在事件处理函数中,做我们想做的事情。流具体发出哪些事件,都是事先实现好的,我们只能监听这些特定的事件。
先说可读流,它比较复杂,因为有两种模式:pause和flow,并且在每种模式下发出的事件也不相同。pause模式是默认模式,就是创建可读流后,可读流并不会自动地从数据源中读取数据,需要我们手动触发它的读取操作。可读流从数据源中读取数据,完全是由程序(消费数据的一端)驱动的。每一次的读取操作都是由消费端发起,数据的读取速度绝对不会比消费数据速度快,用另一句话说,数据是消费端从可读流中拉取出来的。
这会有一个问题,可读流从文件中读取数据的时候,程序就要等待,程序在处理数据的时候,可读流不会读取数据。为了提高效率,可读流内部使用了缓冲技术和提前读取方式。可读流不是一次读取一块数据,而是一次读取多块数据,程序每次从可读流中拉取一块数据,可读流就会再进行读取操作,填充空余的缓冲区。因此程序需要触发第一次读取操作,以后每一次的拉取(消费)数据都会触发一次读取操作。
监听可读流的readable事件,可以触发初始读取操作,并且在数据可读时,可读流发出readable事件。read()方法拉取或消费数据,从而触发另一次读取操作,所以要在readable事件处理函数中,调用read()方法,read() 方法,如果读取不到数据,就会返回null,
const fs = require('fs'); const stream = fs.createReadStream('/path/to/file', { highWaterMark: 64 * 1024, // 内部缓冲大小 }); stream.once('readable', consume); // 触发第一次可读流读取操作 async function consume() { let chunk; // 消费数据,从而触发另一次可读流的读取操作 while ((chunk = stream.read()) !== null) { await asyncHandle(chunk); } stream.once('readable', consume); } async function asyncHandle(chunk) { console.log(chunk); }
但这种处理方法有点麻烦,Node.js10 以后,可以使用异步迭代器(for await ... of )来监听readable 事件,读取数据,因为输入流是异步可迭代对象
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk.toString()); } } logChunks(readableStream);
除了在异步迭代器中,直接处理数据,也可以把流中的数据暂时存储起来,以便日后消费,如果要处理异常,可以用try/catch 把for await 的处理包起来
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); readableStream.setEncoding('utf-8'); async function readableToString(readable) { let result = ''; try { for await (const chunk of readable) { result += chunk; } } catch (error) { } return result; } readableToString(readableStream).then(console.log);
说完pause模式,再说flow模式。flow模式,就是可读流自动从数据源中读取数据,读到数据后,推送数据到程序中(消费端),消费端只管处理数据。怎么从pause模式切换到flow模式呢?监听可读流的data事件,流就自动转化成flow模式,监听可读流的data事件后,可读流就会被驱动,到数据源中读取数据,当读取到一块数据后,可读流就会把数据推送到程序中。推送完成,它就再读取另一块数据。在flow模式下,数据的读取是可读流自己控制的,程序只管处理就可以。
看起来,使用flow模式非常简单,监听data事件,并处理数据就可以了
const fs = require('fs'); const readable = fs.createReadStream('./data.txt'); readable.on('data', data => { console.log(data); })
但有一个问题,消费端并没有方法来控制数据产生的速度(可读流推送数据的速度),如果消费端消费数据很慢,那就要更多消费端,或者在消费端把没有消费的数据进行缓存,
这就导致无限缓存和无法控制内存使用。在flow模式下,可读流控制着读数据的过程,一有数据,就立即推送到消费端(data listener),没有缓存,霜要用backpressure mechanism
const fs = require('fs'); const stream = fs.createReadStream('/path/to/file'); // subscribing to `data` event moves stream to flowing mode stream.on('data', chunk => { queue.push(chunk); if (queue.length > MAX_INFLIGH_CHUNKS) { stream.pause(); // poor man's backpressure queue.on('drain', () => stream.resume()); } }); // ... separate consuming process while (queue.size()) { await process(queue.deque()); } queue.emit('drain');
说完了可读流,再说可写流,我们向可写流里面写数据,可写流再向目的地写数据。
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); readableStream.on('data', data => { writeStream.write(data); // 调用write方法向可写流里面写数据 })
这里面存在一个问题,程序向可写流里写入数据的速度和可写流向目的地的写入数据的速度不一致?写入数据称为生产者,可写流称为消费者。产生数据太快,来不及消费?产生数据太慢,又无法消费?那就在生产者和消费者之间创建一个buffer(缓冲区)。生产者写入数据到缓冲区,消费者从缓冲区读取数据,写入到目的地。如果程序写入还是太快,缓冲区就会溢出,缓冲区满了,就要告诉生产都,不能再写数据了。如果写入的速度慢,缓冲区是空的,那么可读流就要停止读取。
可读流,就是内置了buffer(缓冲区),它是一个FIFO队列,程序向可写流里写入数据,就是写到buffer中,可写流向目的地写入数据,就是从队列(缓冲区)中读取数据。在写的过程中,可写流还会告诉你,它内部的buffer状况,buffer是否满了,还是buffer中仍有空间。buffer 的大小是由highWaterMark 决定的。当调用write()向可写流写入数据的进候,它会返回true or false 来表示内部buffer的状态,当写入成功后,buffer仍有空闲,它就会返回true,表示仍然可以写入。如果写入后,buffer满了,返回false,表示需要backpressure。然而,这个返回值只是一个建议,如果向写入流写入数据时不遵循这个建议,可写流会继续缓冲数据,从而导致过多的内存消耗。如果write()返回false,就要停止向可写流写入数据。然而什么候才能继续写入呢?只要可写流内置的buffer有空闲,或每一次可写流从满的buffer中取了数据,它就会发出“emit”事件,收到drain事件,就可以继续向可写流中写入数据了。
const fs = require('fs'); const util = require('util'); const stream = require('stream'); const { once } = require('events'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); const finished = util.promisify(stream.finished); async function writeIterableToFile(readable, writable) { for await (const chunk of readable) { if (!writable.write(chunk)) { await once(writable, 'drain'); } } writable.end(); // 程序不会再向可写流中写数据了 // finished:可写流把把所有的数据都写入到的目的地中。 await finished(writable); } writeIterableToFile(readableStream, writeStream)
当可读流的目的地是可写流时,以上处理方式就有点麻烦了,有一种更好的方式,那就是pipe管道。所有流都实现了pipeline模式,pipeline模式就是描述了数据流过不同的阶段,像下图所示
源数据经过可读流,最终流经可写流,到达目的地,中间可能经过0个,1个或多个转换流,对数据进行转换。数据被可读流读取到后,可以流向(piped to)其它流, 可读流读取数据,由可写流消费时,就是把可读流导入到可写流,
const fs = require('fs'); const readableStream = fs.createReadStream('./data.txt'); const writeStream = fs.createWriteStream('./result.txt'); readableStream.pipe(writeStream);
pipe() 操作,前一个的输出变成后面一个的输入。readableStream可读流的输出,就是读取到的数据,它正好是可写流的输入,因此,就可以用pipe把这两个链接起来。pipe()的操作就相当于以下几个步骤
可读流监听data事件,驱使可读流去读取数据,同时在事件监听函数中,调用可写流的write()方法写入数据。如果write()返 回false, 可读流就就要暂停读取。同时,可写流要监听'drain'事件,在事件中调用可读流的resume()方法。一旦在可读流的buffer中有空间,可写流发出'drain'事件,继续可读流的读取数据。
const fs = require('fs'); const readable = fs.createReadStream('./biji.txt'); const writeable = fs.createWriteStream('anotehr.txt'); readable.on('data', (data) => { if (!writeable.write(data)) { readable.pause(); writeable.on('drain', () => { readable.resume(); }) } })
pipe() 把可读流的模式转换成了flow模式,并且也处理了backpressure,隐藏了内部的detail,使用简单。但怎么知道文件都写完了呢?使用pipe()方法,并不会关闭流发出事件。事件依然有效,它会告诉你流发生了什么事情。在pipe()方法之前,监听可写流的end事件。
const fs = require('fs'); const readStream = fs.createReadStream('test.txt'); const writeStream = fs.createWriteStream('output.txt'); writeStream.on('end', () => { console.log('Done'); }); readStream.pipe(writeStream);