可写流的highWaterMark表示期望这个文件接受多少个值。
end不仅会写入,而且会触发close事件。
一个true,一个false,是因为我们的highWaterMark设置了3,希望只用3个内存来写,但是返回的值与我们是否写入无关,返回false也会写入。
但是有个问题,我们写多个wirte的时候,是并发异步操作,所以不能确定哪个快哪个慢。
可以将并发异步操作变为串行异步。
除了第一次的write,接下来的write排队,第一个完成后,将队列中的每一个write拿出来执行。有点像eventsloop,直到清空队列,但是队列缓存可能会过大,所以需要一个预期,也就是highWaterMark来控制,达到预期后,就不要调用write方法。虽然再调用也会写入进去。
结合fs.createStream。
当我们第一次写入的时候,当文件吃不下了,也就是到highWaterMark的值了,就应该停止写入了,等文件吃完了,触发了drain方法,这时候再去恢复rs.resume(),此时后面的写入就是往真实的文件中去写了,而不是一直在排队了。读取默认的highWaterMark是64k,而写入的默认highWaterMark是16k。
这跟第一种并发异步写入有很大区别。
可写流是基于链表实现的,因为可写流涉及队列排序的问题,比较多的使用了头部的增删。而链表在头部和尾部的增删的时候效率比较高。
链表的实现可以观看:单链表,双链表
原生的。
思路:跟可读流一样,将open和wirte操作分离,通过事件发布的模式。然后通过变量判断当前write是否是第一次调用,保证每次只有一个write在执行,其他的全部扔进缓存,当write执行完毕之后,再从缓存中一个一个拿出来执行write。直到缓存清空完毕,触发drain事件,通知用户缓存清完了,变量置为初始化。
这样就能使并发异步操作变成串行异步操作。
初始化变量,这里跟ReadStream有点区别,比如没有end,encoding默认写入是utf8等等。
len用来判断当前缓存的值的长度,needDrain用来判断缓存是否过多是否达到期望值。cahce是缓存队列。writing用来标识是否第一次写入。
通过缓存以及全局变量,和事件系统,只有第一次write才会真正执行fs.write,其他的write方法会放入缓存,只有当fs.write执行完毕之后,才会继续从缓存取出进行fs.wrtie。直到缓存为空,再触发drain事件,重置变量,让用户可以继续调用write方法写入。就好比人吃饭,喂一口大的,它总要先吃一点小的,剩下部分在口腔里嚼,并且返回false告诉你,先别喂,等全部吃完。当它慢慢嚼完并吞下当前口腔有的事物之后,再告诉你,可以继续喂了,依次类推,反复循环,直到这碗饭吃饭(全部写入)
链表的实现:在单链表这里。
实验:
成功删除了第一个3,链表此时是6=>10。
改造完成,
期望是3,所以第一次会调用三次write,两个write存入queue中,如
首先queue中有两个,然后取出49之后,有剩一个50,再取出来之后就为空。完成。
全部代码:
// WriteStream const LinkedList = require("./linklist"); //基于链表实现队列 class Queue { constructor() { this.link = new LinkedList(); } //在最后一个中加入 offer(element) { this.link.append(element); } //移除链表第一位并返回 shift() { return this.link.removeAt(0); } } class WriteStream extends EventMitter { constructor(path, options) { super(); this.path = path; this.flags = options.flags || "w"; this.encoding = options.encoding || "utf8"; this.autoClose = options.autoClose || true; this.mode = options.mode || 0o666; this.start = options.start || 0; this.highWaterMark = options.highWaterMark || 16 * 1024; this.emitClose = options.emitClose || true; this.offset = this.start; // 每次写入文件的位移数 this.fd = undefined; this.len = 0; //判断的缓存 this.needDrain = false; //是否需要触发drain this.cache = []; //第二次开始的写入缓存 this.writing = false; //标识是否正在写入 this.queue = new Queue(); this.open(); } open() { fs.open(this.path, this.flags, this.mode, (err, fd) => { this.fd = fd; this.emit("open", fd); }); } _write(chunk, encoding, cb) { if (typeof this.fd !== "number") { this.once("open", () => { this._write(chunk, encoding, cb); }); return; } fs.write(this.fd, chunk, 0, chunk.length, this.offset, (err, written) => { this.offset += written; //维护偏移量 this.len -= written; //把缓存的个数减少 cb(); //回调 }); } //依次将缓存队列等待任务拿出来一个个写入。 clearBuffer() { const a = JSON.stringify(this.queue); console.log(a); const data = this.queue.shift(); if (data) { this._write(data.chunk, data.encoding, data.cb); } else { //缓存读取完了,需要触发drain事件 this.writing = false; //当前写入已经完成了。 if (this.needDrain) { //如果存入过多导致期望值过大 this.needDrain = false; this.emit("drain"); } } } } //模拟Stream的Writable的实例上的write方法 EventMitter.prototype.write = function ( chunk, encoding = this.encoding, cb = () => {} ) { // 异步方法 // 注意写入的中文跟英文比较。 将数据全部转换为Buffer chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); this.len += chunk.length; //判断到达期望了吗 const returnValue = this.len < this.highWaterMark ? true : false; //当数据写入后,需要触发drain并且将Len减减 this.needDrain = !returnValue; // 清空缓存队列的逻辑 const userCb = cb; cb = () => { userCb(); this.clearBuffer(); }; //判断是否第一次给入的数据,因为后面的write都是直接放入缓存中 if (!this.writing) { //第一次写入,真正执行写入操作 this._write(chunk, encoding, cb); this.writing = true; } else { //第二次写入了,存入缓存期 this.queue.offer({ chunk, encoding, cb, }); } return returnValue; }; const ws = new WriteStream("b.txt", { flags: "w", encoding: null, autoClose: true, start: 0, //没有end属性,只有start highWaterMark: 3, //与可读流不一样,可写流的highWaterMark表示期望这个文件只接受3个内存 }); ws.on("open", () => { console.log("文件打开"); }); let i = 0; function write() { let flag = true; while (i < 4 && flag) { flag = ws.write(i++ + ""); console.log('flag', flag); } } ws.on("drain", () => { //只有当吸入的数据达到了预期,并且数据已经被写入文件之后才会触发drain事件。 console.log("写完了"); write(); }); write();
// 链表 class Node { constructor(data) { this.data = data; this.next = null; } } module.exports = class LinkedList { constructor() { //头指针 this.head = null; //链表的长度 this.length = 0; } append(data) { const element = new Node(data); if (!this.head) { this.head = element; } else { let current = this.head; //遍历找到最后的节点 while (current.next) { current = current.next; } //插入 current.next = element; } this.length++; } // //特定位置插入 insert(position, data) { if ( typeof position !== "number" || position < 0 || position > this.length ) { return false; } const element = new Node(data); //插入首位 if (position === 0) { element.next = this.head; this.head = element; } else { //插入中间的一位 let current = this.head; let index = 1; //采用插入位置前一位进行操作,比如插入到第四个,就将element.next指向第三个的下一个,再将第三个的下一个重新向element while (index++ < position) { // 如position = 4, index = 4的时候, current指向第三个,因为执行了两遍 current = current.next; } // 让element变成第四个 element.next = current.next; current.next = element; // for (let i = 1; i < position; i++) { // if (i === position - 1) { // element.next = current.next // current.next = element // break; // } // current = current.next // } } this.length++; } // //获取对应位置的元素 get(position) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return undefined; } let current = this.head; let index = 0; while (index++ < position) { current = current.next; } return current.data; } // //返回元素在列表中的索引 indexOf(data) { let current = this.head; let index = 0; while (current) { if (current.data === data) { return index; } current = current.next; index++; } return -1; } // //修改某个位置的元素 update(position, data) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return false; } let current = this.head; let index = 0; while (index++ < position) { current = current.next; } current.data = data; } // //从列表的特定位置移除一项 removeAt(position) { if ( typeof position !== "number" || position < 0 || position >= this.length ) { return undefined; } let current = this.head; if (position === 0) { const headElData = this.head.data; this.head = this.head.next; this.length--; return headElData; } let index = 0; // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。 while (index++ < position - 1) { current = current.next; } const currentElement = current.next; let element = current.next.next; //保存第三个节点 current.next.next = null; //让第二个节点与第三个节点断掉关系 current.next = element; // 第一个节点指向第三个节点 this.length--; return currentElement.data; } // 从列表移除一项 remove(element) { let current = this.head; let nextElement; let isSuccess; //如果第一个就是 if (current.data === element) { nextElement = this.head.next; this.head = element; element.next = nextElement; return true; } // 找到应该删除的节点的前一个,比如删除2,此时的current 为1所指向的节点。 while (current) { if (current.next.data === element) { nextElement = current.next.next; //存粗下下个节点 current.next.next = null; //断开下个节点与下下个节点的联系 current.next = nextElement; //连接下下个节点 isSuccess = true; break; } else { current = current.next; } } //是否删除成功 if (isSuccess) { this.length--; return true; } return false; } isEmpty() { return !!this.length; } size() { return this.length; } toString() { let current = this.head; let str = ""; while (current) { str += `,${current.data.toString()}`; current = current.next; } return str.slice(1); } };