远程调用(简称 RPC)主要是指服务器或集群之间对处理过程的调用。通过远程调用可以打通不同系统之间的数据与功能,或是抽离与建立公共的逻辑统一提供服务。远程调用的两端也称为远程调用的客户端与服务端,一般是多对多的关系,需要引入注册与发现机制进行治理,下图为最常见实践:
治理机制通常是因地制宜的,可以基于 ZooKeeper 建设,本章不再展开。
gPRC 是谷歌开源的一款跨语言高性能的 RPC 框架,底层使用 protobuf 进行数据交换,已在谷歌、奈非、思科等企业大规模应用。以下为 gRPC 的调用过程,客户端先进行 DNS 解析,再根据解析结果直连服务端或向负载均衡/治理服务获取服务端信息再连接:
本章将基于上一章已完成的工程 host1-tech/nodejs-server-examples - 11-schedule 加入远程调用功能实现一个简单消息的输入与输出,现在到工程根目录安装 grpc 相关模块:
$ yarn add @grpc/grpc-js @grpc/proto-loader # 本地安装 @grpc/grpc-js、@grpc/proto-loader # ... info Direct dependencies ├─ @grpc/grpc-js@1.1.3 └─ @grpc/proto-loader@0.5.5 # ...
通过 .proto
文件定义远程接口并写入基于该定义的 gRPC 客户端与服务端:
$ mkdir src/rpc # 新建 src/rpc 存放远程调用逻辑 $ mkdir src/rpc/echo # 新建 src/rpc/echo $ tree src -L 1 # 展示 src 目录内容结构 src ├── config ├── controllers ├── middlewares ├── models ├── moulds ├── rpc ├── schedules ├── server.js ├── services └── utils
// src/rpc/echo/def.proto syntax = "proto3"; service Echo { rpc Get(EchoRequest) returns (EchoResponse) {} } message EchoRequest { string message = 1; } message EchoResponse { string message = 1; }
// src/rpc/echo/client.js const { resolve } = require('path'); const { promisify } = require('util'); const protoLoader = require('@grpc/proto-loader'); const grpc = require('@grpc/grpc-js'); const { rpc } = require('../../config'); class EchoClient { grpcClient; async init() { const grpcObject = grpc.loadPackageDefinition( await protoLoader.load(resolve(__dirname, 'def.proto')) ); this.grpcClient = new grpcObject.Echo( `${rpc.domain}:${rpc.port}`, grpc.credentials.createInsecure() ); } get = async ({ s, logger }) => { const { grpcClient } = this; const { message } = await promisify( grpcClient.get.bind(grpcClient, { message: s }) )(); logger.info('Echo/Get Invoked'); return { message }; }; } let client; module.exports = async () => { if (!client) { client = new EchoClient(); await client.init(); } return client; };
// src/rpc/echo/server.js const { resolve } = require('path'); const { callbackify } = require('util'); const protoLoader = require('@grpc/proto-loader'); const grpc = require('@grpc/grpc-js'); class EchoServer { grpcServer; async init() { const grpcObject = grpc.loadPackageDefinition( await protoLoader.load(resolve(__dirname, 'def.proto')) ); this.grpcServer.addService(grpcObject.Echo.service, this); } get = callbackify(async (call) => { const { message } = call.request; return { message }; }); } let server; module.exports = async (grpcServer) => { if (!server) { server = new EchoServer(); Object.assign(server, { grpcServer }); await server.init(); } return server; };
// src/config/index.js // ... const config = { // 默认配置 default: { // ... + + rpc: { + domain: 'localhost', + port: process.env.PORT_RPC || 9001, + }, }, // ... }; // ...
开启 gRPC 日志输出并初始化:
# .env LOG_LEVEL='debug' + +GRPC_TRACE='all' +GRPC_VERBOSITY='DEBUG'
// src/utils/logger.js // ... +const GRPC_LOGGER_REGEXP = /^.+Z\s+\|\s+/; + +function grpcLogger(logger, level = 'debug') { + const verbosities = ['debug', 'info', 'error']; + + return { + error(severity, message) { + if (typeof severity != 'number') { + message = severity; + severity = 0; + } + + if (typeof message != 'string') { + message = String(message || ''); + } + + logger[verbosities[severity] || level]( + message.replace(GRPC_LOGGER_REGEXP, '') + ); + }, + }; +} + module.exports = logger; -Object.assign(module.exports, { logging }); +Object.assign(module.exports, { logging, grpcLogger });
// src/rpc/index.js const { promisify } = require('util'); const grpc = require('@grpc/grpc-js'); const { rpc } = require('../config'); const logger = require('../utils/logger'); const echoClient = require('./echo/client'); const echoServer = require('./echo/server'); const { grpcLogger } = logger; module.exports = async function initRpc() { grpc.setLogger(grpcLogger(logger.child({ type: 'rpc' }), 'debug')); // init rpc servers const grpcServer = new grpc.Server(); await echoServer(grpcServer); await promisify(grpcServer.bindAsync.bind(grpcServer))( `0.0.0.0:${rpc.port}`, grpc.ServerCredentials.createInsecure() ); grpcServer.start(); // init rpc clients await echoClient(); };
// src/server.js const express = require('express'); const { resolve } = require('path'); const { promisify } = require('util'); const initMiddlewares = require('./middlewares'); const initControllers = require('./controllers'); const initSchedules = require('./schedules'); +const initRpc = require('./rpc'); const logger = require('./utils/logger'); const server = express(); const port = parseInt(process.env.PORT || '9000'); const publicDir = resolve('public'); const mouldsDir = resolve('src/moulds'); async function bootstrap() { + await initRpc(); server.use(await initMiddlewares()); server.use(express.static(publicDir)); server.use('/moulds', express.static(mouldsDir)); server.use(await initControllers()); server.use(errorHandler); await initSchedules(); await promisify(server.listen.bind(server, port))(); logger.info(`> Started on port ${port}`); } // ...
添加 gRPC 客户端 logger 与控制层入口:
// src/middlewares/trace.js const { v4: uuid } = require('uuid'); const morgan = require('morgan'); const onFinished = require('on-finished'); const logger = require('../utils/logger'); const { logging } = logger; module.exports = function traceMiddleware() { return [ morgan('common', { skip: () => true }), (req, res, next) => { req.uuid = uuid(); req.logger = logger.child({ uuid: req.uuid }); req.loggerSql = req.logger.child({ type: 'sql' }); req.logging = logging(req.loggerSql, 'info'); + req.loggerRpc = req.logger.child({ type: 'rpc' }); onFinished(res, () => { // ... }); next(); }, ]; };
// src/controllers/echo.js const { Router } = require('express'); const cc = require('../utils/cc'); const rpcEchoClient = require('../rpc/echo/client'); class EchoController { rpcEchoClient; async init() { this.rpcEchoClient = await rpcEchoClient(); const router = Router(); router.get('/', this.get); return router; } get = cc(async (req, res) => { const { s = '' } = req.query; const message = await this.rpcEchoClient.get({ s, logger: req.loggerRpc }); res.send({ success: true, message }); }); } module.exports = async () => { const c = new EchoController(); return await c.init(); };
// src/controllers/index.js const { Router } = require('express'); const shopController = require('./shop'); const chaosController = require('./chaos'); const healthController = require('./health'); const loginController = require('./login'); const csrfController = require('./csrf'); +const echoController = require('./echo'); module.exports = async function initControllers() { const router = Router(); router.use('/api/shop', await shopController()); router.use('/api/chaos', await chaosController()); router.use('/api/health', await healthController()); router.use('/api/login', await loginController()); router.use('/api/csrf', await csrfController()); + router.use('/api/echo', await echoController()); return router; };
访问 http://localhost:9000/api/echo?s=Hello%20RPC 即可看到效果:
同时在命令行能够看到充分的 gRPC 日志:
# ... 08:20:52.320Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:0.0.0.0:9001 (type=rpc) 08:20:52.321Z DEBUG 12-rpc: dns_resolver | Resolution update requested for target dns:0.0.0.0:9001 (type=rpc) 08:20:52.321Z DEBUG 12-rpc: dns_resolver | Returning IP address for target dns:0.0.0.0:9001 (type=rpc) 08:20:52.322Z DEBUG 12-rpc: server | Attempting to bind 0.0.0.0:9001 (type=rpc) 08:20:52.324Z DEBUG 12-rpc: server | Successfully bound 0.0.0.0:9001 (type=rpc) 08:20:52.327Z DEBUG 12-rpc: resolving_load_balancer | dns:localhost:9001 IDLE -> IDLE (type=rpc) 08:20:52.327Z DEBUG 12-rpc: connectivity_state | dns:localhost:9001 IDLE -> IDLE (type=rpc) 08:20:52.327Z DEBUG 12-rpc: dns_resolver | Resolver constructed for target dns:localhost:9001 (type=rpc) # ...
host1-tech/nodejs-server-examples - 12-rpc
从零搭建 Node.js 企业级 Web 服务器(零):静态服务
从零搭建 Node.js 企业级 Web 服务器(一):接口与分层
从零搭建 Node.js 企业级 Web 服务器(二):校验
从零搭建 Node.js 企业级 Web 服务器(三):中间件
从零搭建 Node.js 企业级 Web 服务器(四):异常处理
从零搭建 Node.js 企业级 Web 服务器(五):数据库访问
从零搭建 Node.js 企业级 Web 服务器(六):会话
从零搭建 Node.js 企业级 Web 服务器(七):认证登录
从零搭建 Node.js 企业级 Web 服务器(八):网络安全
从零搭建 Node.js 企业级 Web 服务器(九):配置项
从零搭建 Node.js 企业级 Web 服务器(十):日志
从零搭建 Node.js 企业级 Web 服务器(十一):定时任务
从零搭建 Node.js 企业级 Web 服务器(十二):远程调用