RPC通信的学问很深,目前了解一下即可,下面的代码很有意思,最好能静下心去看。
建立个模拟客户端行为的文件client.js
const net = require('net') const socket = new net.Socket({}) // 和http模块完全不一样 socket.connect({ host: '127.0.0.1', port: 4000 }) socket.write("我只会心疼giegie") // 向服务端发送
再模拟服务端行为server.js:
const net = require('net') const server = net.createServer((socket) => { // 创建服务 socket.on('data', function (buffer) { // 接收数据 console.log(buffer, buffer.toString()); // <Buffer e6 88 91 e5 8f aa e4 bc 9a e5 bf 83 e7 96 bc 67 69 65 67 69 65> 我只会心疼giegie }) }) server.listen(4000)
然后先启动服务node server.js
然后启动客户端node client.js
,服务端就接到buffer数据:
client.js:
const net = require('net'); // 创建socket const socket = new net.Socket({}); // 连接服务器 socket.connect({ host: '127.0.0.1', port: 4000 }); // 课程id const lessonids = [ "136797", "136798", "136799", "136800", "136801", "136803", "136804", "136806", "136807", "136808", "136809", "141994", "143517", "143557", "143564", "143644", "146470", "146569", "146582" ] let id = Math.floor(Math.random() * lessonids.length); // 随机出一个课程id // 把编码请求包的逻辑封装为一个函数 function encode(index) { buffer = Buffer.alloc(4); // 4位 buffer.writeInt32BE( // 把课程id写入 lessonids[index] ); return buffer; } // 往服务器传数据 socket.write(encode(id)); // encode把id编码成类似<Buffer 00 02 2a aa> // 监听返回的数据 socket.on('data', (buffer) => { console.log(buffer.toString()) // 解码二进制数据 // 接收到数据之后,按照半双工通信的逻辑,马上开始下一次请求 id = Math.floor(Math.random() * lessonids.length); socket.write(encode(id)); })
server.js:
const net = require('net'); // 创建tcp服务器 const server = net.createServer((socket) => { // 接收客户端发来的数据 socket.on('data', function(buffer) { // 从传来的buffer里读出一个int32 const lessonid = buffer.readInt32BE(); // 50毫秒后回写数据 setTimeout(()=> { socket.write( Buffer.from(data[lessonid]) ); }, 50) }) }); // 监听端口启动服务 server.listen(4000); const data = { 136797: "01 | 课程介绍", 136798: "02 | 内容综述", 136799: "03 | Node.js是什么?", 136800: "04 | Node.js可以用来做什么?", 136801: "05 | 课程实战项目介绍", 136803: "06 | 什么是技术预研?", 136804: "07 | Node.js开发环境安装", 136806: "08 | 第一个Node.js程序:石头剪刀布游戏", 136807: "09 | 模块:CommonJS规范", 136808: "10 | 模块:使用模块规范改造石头剪刀布游戏", 136809: "11 | 模块:npm", 141994: "12 | 模块:Node.js内置模块", 143517: "13 | 异步:非阻塞I/O", 143557: "14 | 异步:异步编程之callback", 143564: "15 | 异步:事件循环", 143644: "16 | 异步:异步编程之Promise", 146470: "17 | 异步:异步编程之async/await", 146569: "18 | HTTP:什么是HTTP服务器?", 146582: "19 | HTTP:简单实现一个HTTP服务器" }
这样客户端每发一个包后,服务端就会返回,客户端接收到后会立刻继续请求。
当需要全双工通信时,因为客户端的发送和服务端的发送可以同时进行,容易出现发包顺序错乱的问题,所以需要有标记包号的字段。
当同时发送多个包的时候,TCP底层会触发“粘包”优化,自动将这些包拼接起来。
以下代码可能看着有些乱,跟着标记【】的顺序看即可:
client.js:
const net = require('net'); const socket = new net.Socket({}); const LESSON_IDS = [ // 课程id "136797", "136798", "136799", "136800", "136801", "136803", "136804", "136806", "136807", "136808", "136809", "141994", "143517", "143557", "143564", "143644", "146470", "146569", "146582" ] socket.connect({ host: '127.0.0.1', port: 4000 }); let id = Math.floor(Math.random() * LESSON_IDS.length); // 随机一个课程id // 【*******************************逻辑3***********************】 let oldBuffer = null; /* 监听服务端传来的数据 */ socket.on('data', (buffer) => { // 把上一次data事件使用残余的buffer接上来(这应该是模拟TCP的粘包机制) if (oldBuffer) { buffer = Buffer.concat([oldBuffer, buffer]); } let completeLength = 0; // 只要还存在可以解成完整包的包长 while (completeLength = checkComplete(buffer)) { const package = buffer.slice(0, completeLength); buffer = buffer.slice(completeLength); const result = decode(package); // 把这个包解成数据和seq console.log(`包${result.seq},返回值是${result.data}`); } // 把残余的buffer记下来 oldBuffer = buffer; }) /* 【*******************************逻辑2***********************】 */ let seq = 0; // 记录包的顺序 /** * 二进制包编码函数 * 在一段rpc调用里,客户端需要经常编码rpc调用时,业务数据的请求包 */ function encode(data) { // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包。为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送 const body = Buffer.alloc(4); // body创建一个4位的 body.writeInt32BE(LESSON_IDS[data.id]); // 写入4位的课程id // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分。包头的作用就是用来记载包的序号和包的长度,以实现全双工通信 const header = Buffer.alloc(6); // 包头用6位 header.writeInt16BE(seq) // 记录2位的顺序 header.writeInt32BE(body.length, 2); // 紧接记录4位的body长度 // 包头和包体拼起来发送 const buffer = Buffer.concat([header, body]) // 此时一个buffer内容构成就为 { 2位的seq + 4位的body长度 }+ { 4位的课程id } console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`); seq++; // 每发送完一个包,下个包的序号就加1 return buffer; } /** * 二进制包解码函数 * 在一段rpc调用里,客户端需要经常解码rpc调用时,业务数据的返回包 */ function decode(buffer) { const header = buffer.slice(0, 6); const seq = header.readInt16BE(); const body = buffer.slice(6) return { seq, data: body.toString() } } /** * 检查一段回来的buffer是不是一个完整的数据包。 * 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长 * 如果是,则返回这个包长,意味着这个请求包是完整的。 * 如果不是,则返回0,意味着包还没接收完 * @param {} buffer */ function checkComplete(buffer) { if (buffer.length < 6) { return 0; } const bodyLength = buffer.readInt32BE(2); return 6 + bodyLength // 返回粘包后的长度 } // 【**********************************逻辑1*****************************************】 /* 这里试试并发100次请求,可以换成个位数来理解一下逻辑 */ for (let k = 0; k < 100; k++) { id = Math.floor(Math.random() * LESSON_IDS.length); socket.write(encode({ id })); }
server.js也是同理的,就不写顺序提示了:
const net = require('net'); // 假数据 const LESSON_DATA = { 136797: "01 | 课程介绍", 136798: "02 | 内容综述", 136799: "03 | Node.js是什么?", 136800: "04 | Node.js可以用来做什么?", 136801: "05 | 课程实战项目介绍", 136803: "06 | 什么是技术预研?", 136804: "07 | Node.js开发环境安装", 136806: "08 | 第一个Node.js程序:石头剪刀布游戏", 136807: "09 | 模块:CommonJS规范", 136808: "10 | 模块:使用模块规范改造石头剪刀布游戏", 136809: "11 | 模块:npm", 141994: "12 | 模块:Node.js内置模块", 143517: "13 | 异步:非阻塞I/O", 143557: "14 | 异步:异步编程之callback", 143564: "15 | 异步:事件循环", 143644: "16 | 异步:异步编程之Promise", 146470: "17 | 异步:异步编程之async/await", 146569: "18 | HTTP:什么是HTTP服务器?", 146582: "19 | HTTP:简单实现一个HTTP服务器" } const server = net.createServer((socket) => { let oldBuffer = null; socket.on('data', function (buffer) { // 把上一次data事件使用残余的buffer接上来 if (oldBuffer) { buffer = Buffer.concat([oldBuffer, buffer]); } let packageLength = 0; // 只要还存在可以解成完整包的包长 while (packageLength = checkComplete(buffer)) { const package = buffer.slice(0, packageLength); buffer = buffer.slice(packageLength); // 把这个包解成数据和seq const result = decode(package); // 计算得到要返回的结果,并write返回 socket.write( encode(LESSON_DATA[result.data], result.seq) ); } // 把残余的buffer记下来 oldBuffer = buffer; }) }); server.listen(4000); /** * 二进制包编码函数 * 在一段rpc调用里,服务端需要经常编码rpc调用时,业务数据的返回包 */ function encode(data, seq) { // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包 // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回 const body = Buffer.from(data) // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分 // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信 const header = Buffer.alloc(6); header.writeInt16BE(seq) header.writeInt32BE(body.length, 2); const buffer = Buffer.concat([header, body]) return buffer; } /** * 二进制包解码函数 * 在一段rpc调用里,服务端需要经常解码rpc调用时,业务数据的请求包 */ function decode(buffer) { const header = buffer.slice(0, 6); const seq = header.readInt16BE(); // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包 // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可 const body = buffer.slice(6).readInt32BE() // 这里把seq和数据返回出去 return { seq, data: body } } /** * 检查一段buffer是不是一个完整的数据包。 * 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长 * 如果是,则返回这个包长,意味着这个请求包是完整的。 * 如果不是,则返回0,意味着包还没接收完 * @param {} buffer */ function checkComplete(buffer) { if (buffer.length < 6) { return 0; } const bodyLength = buffer.readInt32BE(2); return 6 + bodyLength }