Netty 提供了 ServerBootstrap 引导类作为程序启动入口,ServerBootstrap 将 Netty 核心组件像搭积木一样组装在一起,服务端启动过程我们需要完成以下三个基本步骤:
一个完备的网络协议需要具备的基本要素:魔数、协议版本号、序列化算法、报文类型、长度域字段、请求数据、保留字段。在实现协议编解码时经常用到两个重要的抽象类:MessageToByteEncoder 编码器和ByteToMessageDecoder 解码器。Netty 也提供了很多开箱即用的拆包器,推荐最广泛使用的 LengthFieldBasedFrameDecoder,它可以满足实际项目中的大部分场景。
ByteBuf 是必须要掌握的核心工具类,并且能够理解 ByteBuf 的内部构造。ByteBuf 包含三个指针:读指针 readerIndex、写指针 writeIndex、最大容量 maxCapacity,根据指针的位置又可以将 ByteBuf 内部结构可以分为四个部分:废弃字节、可读字节、可写字节和可扩容字节。如下图所示。
ChannelPipeline 和 ChannelHandler 也是我们在平时应用开发的过程中打交道最多的组件,这两个组件为用户提供了 I/O 事件的全部控制权。ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
注册中心是 RPC 框架中一个非常重要的组件,主要用于实现服务的注册和发现。目前主流的注册中心有 ZooKeeper、Eureka、Etcd、Consul、Nacos 等,到底选择 CP 还是 AP 类型的注册中心呢?没有最好的选择,需要根据实际的业务场景进行技术选型。对于 RPC 框架而言,应当弱依赖于注册中心,即使注册中心出现问题,也不应该影响服务正常使用。所以建议使用 AP 类型的注册中心,在实现服务发现的场景下相比 CP 类型的注册中心有性能优势,整个集群是不存在 Leader、Flower 概念的,如果其中一个节点挂了,请求会立刻转移到其他节点上,通过牺牲强一致性来保证高可用性。
当服务节点下线时,注册中心需要及时通知服务消费者该节点已经下线了,否则可能会造成部分服务调用出现问题。实现服务优雅下线比较好的方式是采用主动通知 + 心跳检测的方案,心跳检测可以由节点或者注册中心负责,例如注册中心可以向服务节点每 60s 发送一次心跳包,如果 3 次心跳包都没有收到请求结果,可以认为该服务节点已经下线。心跳检测通常也是客户端和服务端之间通知对方存活状态的一种机制。
如果想做到 RPC 底层细节对服务消费者无感知,就无法绕开动态代理。动态代理提供了一种能够在运行时动态构建代理类以及动态调用目标方法的机制,我们必须创建一个接口代理对象,在代理对象中实现编码、请求调用、解码等操作。
常用的动态代理实现有 JDK 动态代理和 Cglib 动态代理,选择哪种动态代理技术需要根据场景有的放矢,需要做好性能压测。JDK 动态代理所代理的对象必须实现一个或者多个接口,生成的代理类也是接口的实现类,然后通过 JDK 动态代理是通过反射调用的方式代理类中的方法,不能代理接口中不存在的方法。Cglib 动态代理相比 JDK 动态代理更加灵活,Cglib 是通过字节码技术对指定类生成一个子类,并重写其中的方法,所以代理类的类型是不受限制的。
服务提供者在接收到 RPC 请求后,需要通过反射机制执行真实的方法调用。为了加速服务接口调用的性能,可以采用 Cglib 提供的 FastClass 机制直接调用方法,相比于反射性能更高。FastClass 机制并没有采用反射的方式调用被代理的方法,而是运行时动态生成一个新的 FastClass 子类,向子类中写入直接调用目标方法的逻辑。同时该子类会为代理类分配一个 int 类型的 index 索引,FastClass 即可通过 index 索引定位到需要调用的方法。生成 FastClass 子类是比较耗时的,可以使用缓存 FastClass 的方式进一步优化 RPC 框架的性能。
如果没有指定 workerGroup 线程组初始化的线程数,那么 Netty 会默认创建 2 倍 CPU 核数作的线程,但这并不一定是一个最佳数量,可以根据实际的压测情况进行适当调整。一般来说,只要服务性能能够满足要求,workerGroup 初始化的线程数应该越少越好,这样可以有效地减少线程上下文切换。
Netty 提供了一个参数 ioRatio,可以调整 I/O 事件处理和任务处理的时间比例,默认值为 50。对于高并发的 RPC 调用场景,ioRatio 可以适当调大,控制 Netty 有更多的时间比例在执行 I/O 任务。
Netty 提供了 ChannelOption 以便于我们优化 TCP 参数配置,为了提高网络通信的吞吐量,一些可选的网络参数我们有必要掌握。
在网络通信过程中,必然涉及序列化和反序列化操作,即将对象编码成字节,再把字节解码成对象的过程。序列化和反序列化属于高频且较笨重的操作,属于 RPC 框架中一个重要的性能优化点。在选择序列化方式时需要综合考虑各方面因素,如高性能、跨语言、可维护性、可扩展性等。
比较常用的序列化算法有 Kryo、Hessian、Protobuf 等,这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。Kryo 序列化后占用字节数较少,网络传输效率更高,但是不支持跨语言。Hessian 是目前业界使用较为广泛的序列化协议,它的兼容性好,支持跨语言,API 方便使用,序列化后的字节数适中。Protobuf 是 gRPC 框架默认使用的序列化协议,属于 Google 出品的序列化框架。Protobuf 支持跨语言、跨平台,具有较好的扩展性,并且性能优于 Hessian。但是 Protobuf 使用时需要编写特定的 prpto 文件,然后进行静态编译成不同语言的程序后拷贝到项目工程中,一定程序增加了开发者的复杂度。综合各方面因素以及实际口碑,个人比较推荐使用 Hessian 和 Protobuf 序列化协议。
关于 RPC 框架序列化进一步的性能优化我们可以采用以下方法:
从内存分配的角度来看,ByteBuf 可以分为堆内存 HeapByteBuf 和堆外内存 DirectByteBuf。Netty 会使用堆外内存 DirectBuffer 进行 Socket 读写,相比使用堆内存减少了一次内存拷贝。然而堆外内存的创建和销毁成本更高,分配和回收的效率较慢,所以通常会使用内存池来提高性能。Netty 提供了池化类型的 PooledDirectByteBuf。Netty 提前申请一块连续内存作为 ByteBuf 内存池,如果有堆外内存申请的需求直接从内存池里获取即可,使用完之后必须重新放回内存池,否则会造成严重的内存泄漏。对于数据量较小的一些场景,可以考虑使用 HeapBuffer,由 JVM 负责内存的分配和回收可能效率更高。Netty 中启用内存池可以在创建客户端或者服务端的时候指定,示例代码如下:
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
对象池与内存池的都是为了提高 Netty 的并发处理能力,通常在项目开发中我们会将一些通用的对象缓存起来,当需要该对象时,优先从对象池中获取对象实例。通过重用对象,不仅避免频繁地创建和销毁所带来的性能损耗,而且对 JVM GC 是友好的。
此外,Netty 还提供了一些技巧来避免内存拷贝:
我们经常使用以下 new HandlerXXX() 的方式进行 Channel 初始化,在每建立一个新连接的时候会初始化新的 HandlerA 和 HandlerB,如果系统承载了 1w 个连接,那么就会初始化 2w 个处理器,造成非常大的内存浪费。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new HandlerA()) .addLast(new HandlerB()); } });
Netty 提供了 @Sharable 注解用于修饰 ChannelHandler,标识该 ChannelHandler 全局只有一个实例,而且会被多个 ChannelPipeline 共享。所以我们必须要注意的是,@Sharable 修饰的 ChannelHandler 必须都是无状态的,这样才能保证线程安全。
高低水位线 WRITE_BUFFER_HIGH_WATER_MARK 和 WRITE_BUFFER_LOW_WATER_MARK 是两个非常重要的流控参数。Netty 每次添加数据时都会累加数据的字节数,然后判断缓存大小是否超过所设置的高水位线,如果超过了高水位,那么 Channel 会被设置为不可写状态。直到缓存的数据大小低于低水位线以后,Channel 才恢复成可写状态。Netty 默认的高低水位线配置是 32K ~ 64K,可以根据发送端和接收端的实际情况合理设置高低水位线,如果你没有足够的测试数据作为参考依据,建议不要随意更改高低水位线。高低水位线的设置方式如下:
// Server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); // Client Bootstrap bootstrap = new Bootstrap(); bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
当缓存超过了高水位,Channel 会被设置为不可写状态,调用 isWritable() 方法会返回 false。建议在 Channel 写数据之前,使用 isWritable() 方法来判断缓存水位情况,防止因为接收方处理较慢造成 OOM。推荐的使用方式如下:
if (ctx.channel().isActive() && ctx.channel().isWritable()) { ctx.writeAndFlush(message); } else { // handle message }
对不同场景下的网络应用程序进行 JVM 参数调优,可以取得很好的性能提升,以及避免 OOM 风险。因为不同业务系统的特性是不一样的,只能说一些重要的注意事项。
从 4.0.16 版本起,Netty 提供了用 C++ 编写 JNI 调用的 Socket Transport,相比 JDK NIO 具备更高的性能和更低的 GC 成本,并且支持更多的 TCP 参数。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> <version>4.1.42.Final</version> </dependency>
使用 Netty Native 非常简单,只需要替换相应的类即可:
NIO |
Epoll |
NioEventLoopGroup |
EpollEventLoopGroup |
NioEventLoop |
EpollEventLoop |
NioServerSocketChannel |
EpollServerSocketChannel |
NioSocketChannel |
EpollSocketChannel |
如果是经常关注系统性能调优,一定挖掘过 Linux 操作系统 CPU 亲和性的黑科技招数。CPU 亲和性是指在多核 CPU 的机器上线程可以被强制运行在某个 CPU 上,而不会调度到其他 CPU,也被称为绑核。当绑定线程到某个固定的 CPU 后,不仅可以避免 CPU 切换的开销,而且可以提高 CPU Cache 命中率,对系统性能有一定提升。
在 C/C++、Golang 中实现绑核操作是非常容易的事,遗憾的是在 Java 中是比较麻烦的。目前 Java 中有一个开源 affinity 类库,GitHub 地址https://github.com/OpenHFT/Java-Thread-Affinity。如果你的项目想引入使用它,需要先引入 Maven 依赖:
<dependency> <groupId>net.openhft</groupId> <artifactId>affinity</artifactId> <version>3.0.6</version> </dependency>
affinity 类库可以和 Netty 轻松集成,比较常用的方式是创建一个 AffinityThreadFactory,然后传递给 EventLoopGroup,AffinityThreadFactory 负责创建 Worker 线程并完成绑核。代码实现如下所示:
EventLoopGroup bossGroup = new NioEventLoopGroup(1); ThreadFactory threadFactory = new AffinityThreadFactory("worker", AffinityStrategies.DIFFERENT_CORE); EventLoopGroup workerGroup = new NioEventLoopGroup(4, threadFactory); ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup);
连接空闲检测是指每隔一段时间检测连接是否有数据读写,如果服务端一直能收到客户端连接发送过来的数据,说明连接处于活跃状态,对于假死的连接是收不到对端发送的数据的。如果一段时间内没收到客户端发送的数据,并不能说明连接一定处于假死状态,有可能客户端就是长时间没有数据需要发送,但是建立的连接还是健康状态,所以服务端还需要通过心跳检测的机制判断客户端是否存活。客户端可以定时向服务端发送一次心跳包,如果有 N 次没收到心跳数据,可以判断当前客户端已经下线或处于不健康状态。由此可见,连接空闲检测和心跳检测是应对连接假死的一种有效手段,通常空闲检测时间间隔要大于 2 个周期的心跳检测时间间隔,主要是为了排除网络抖动的造成心跳包未能成功收到。
TCP 中已经有 SO_KEEPALIVE 参数,为什么我们还要在应用层加入心跳机制呢?心跳机制不仅能说明应用程序是活跃状态,更重要的是可以判断应用程序是否还在正常工作。然而 TCP KEEPALIVE 是有严重缺陷的,KEEPALIVE 设计初衷是为了清除和回收处于死亡状态的连接,实时性不高。KEEPALIVE 只能检查连接是否活跃,但是不能判断连接是否可用,例如服务端如果处于高负载假死状态,但是连接依然是处于活跃状态的。
Netty 中提供了开箱即用的 IdleStateHandler 实现连接空闲检测,如果我们想把一定时间间隔内没有读到数据的客户端连接进行关闭,可以采取如下的实现方式:
public class RpcIdleStateHandler extends IdleStateHandler { public RpcIdleStateHandler() { super(60, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { ctx.channel().close(); } }
IdleStateHandler 实现心跳检测本质是向任务队列中添加定时任务,判断 channelRead() 或 write() 方法是否发生空闲超时,IdleStateHandler 的构造函数支持设置读空闲时间、写空闲时间、读写空闲时间。super(60, 0, 0, TimeUnit.SECONDS) 表示我们只关注读空闲时间,如果服务端 60s 没未读到数据,就会回调 channelIdle() 方法,此时我们进行连接关闭,避免资源浪费。
心跳检测在 Netty 中并没有现成的实现,但是与空闲检测实现的原理是差不多的,客户端可以采用 EventLoop 提供的 schedule() 方法向任务队列中添加心跳数据上报的定时任务,如下所示:
public class RpcHeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); doHeartBeatTask(ctx); } private void doHeartBeatTask(ChannelHandlerContext ctx) { ctx.executor().schedule(() -> { if (ctx.channel().isActive()) { HeartBeatData heartBeatData = buildHeartBeatData(); ctx.writeAndFlush(heartBeatData); doHeartBeatTask(ctx); } }, 10, TimeUnit.SECONDS); } }
客户端向服务端定时发送心跳包,服务端收到后并不回复响应,因为如果同时与服务端建立的客户端连接规模较大,响应心跳数据需要消耗一定的资源。如果想要实现客户端和服务端互相感知存活状态,需要采用双向心跳机制。我们需要根据实际场景选择最合理的心跳检测方式。
Netty 在实现数据解码时,需要等待到缓冲区有足够多的字节才能开始解码。为了避免缓冲区缓存太多数据造成内存耗尽,我们可以在解码器中设置一个最大字节的阈值,然后结合 Netty 提供的 TooLongFrameException 异常通知 ChannelPipeline 中其他 ChannelHandler。示例如下:
public class MyDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_LIMIT = 1024; @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int readable = in.readableBytes(); if (readable > MAX_FRAME_LIMIT) { in.skipBytes(readable); throw new TooLongFrameException("too long frame"); } // decode } }
检测缓冲区可读字节是否大于 MAX_FRAME_LIMIT,如果超过忽略这些可读字节,对于应用程序在特定的场景下是一种有效的保护措施。
如果 RPC 服务是基础服务,可能会有非常多的调用方,例如用户接口、订单接口等等。在我们实现的 RPC 框架中,业务线程池是共用的,所有的 RPC 请求都会有该线程池处理。如果有一天其中一个服务调用方的流量激增,导致线程池资源耗尽,那么其他服务调用方都会受到严重的影响。我们可以尝试将不同的服务调用方划分到不同等级的业务线程池中,通过分组的方式对服务调用方的流量进行隔离,从而避免其中一个调用方出现异常状态导致其他所有调用方都不可用,提高服务整体性能和可用率。
流量隔离技术是服务治理中非常重要的一个措施,在很多大规模流量的业务系统中都有所应用,例如秒杀系统,可以根据特殊的请求头识别出是否是秒杀请求,从而跟日常请求的流量隔离开来。那么对于 RPC 框架而言,如何对服务调用方进行合理的分组呢?一般来说,根据应用的重要等级作为分组依据是一个很好的衡量标准,一定要保障核心业务不受影响,例如下单、支付等接口都需要有自己独立的业务线程池,避免受到其他服务调用方的影响。
Netty 是基于 Reactor 线程模型实现的,I/O 线程数量固定且资源珍贵,ChannelPipeline 负责所有事件的传播,如果其中任何一个 ChannelHandler 处理器需要执行耗时的操作,其中那么 I/O 线程就会出现阻塞,甚至整个系统都会被拖垮。所以推荐的做法是在 ChannelHandler 处理器中自定义新的业务线程池,将耗时的操作提交到业务线程池中执行。建议根据业务逻辑的核心等级拆分出多个业务线程池,如果某类业务逻辑出现异常造成线程池资源耗尽,也不会影响到其他业务逻辑,从而提高应用程序整体可用率。以 RPC 框架为例,在服务提供者处理 RPC 请求调用时就是将 RPC 请求提交到自定义的业务线程池中执行,如下所示:
public class RpcRequestHandler extends SimpleChannelInboundHandler<MiniRpcProtocol<MiniRpcRequest>> { @Override protected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocol<MiniRpcRequest> protocol) { RpcRequestProcessor.submitRequest(() -> { // 处理 RPC 请求 }); } }
而对于 netty I/O线程,每个 EventLoop 可以与某类业务线程池绑定,避免出现多线程锁竞争。
流量整形(Traffic Shaping)是一种主动控制服务流量输出速率的措施,保证下游服务能够平稳处理。流量整形和流控的区别在于,流量整形不会丢弃和拒绝消息,无论流量洪峰有多大,它都会采用令牌桶算法控制流量以恒定的速率输出,如下图所示。
Netty 通过实现流量整形的抽象类 AbstractTrafficShapingHandler,提供了三种类型的流量整形策略:GlobalTrafficShapingHandler、ChannelTrafficShapingHandler 和 GlobalChannelTrafficShapingHandler,它们之间的关系如下:
GlobalTrafficShapingHandler = ChannelTrafficShapingHandler + GlobalChannelTrafficShapingHandler
全局流量整形 GlobalChannelTrafficShapingHandler 作用范围是所有 Channel,用户可以设置全局报文的接收速率、发送速率、整形周期。Channel 级流量整形 ChannelTrafficShapingHandler 作用范围是单个 Channel,可以对不同的 Channel 设置流量整形策略。举个简单的例子,火爆的旅游景区不仅在大门口对游客限流(相当于 GlobalChannelTrafficShapingHandler),而且在景区内部不同的小景点也对游客有限流(相当于 ChannelTrafficShapingHandler),这两个流量整形策略加起来就是 GlobalTrafficShapingHandler。
流量整形并不能保证系统处于安全状态,当流量洪峰过大,数据会一直积压在内存中,所以流量整形和流控应该结合使用才能保证系统的高可用。
为了保障服务的稳定性和容错性,重试机制是一般可以帮助我们解决不少问题,例如网络抖动、请求超时等场景都需要重试机制。
关于 RPC 框架的重试机制有以下几点最佳实践和注意事项:
集群容错是指服务消费者调用服务提供者集群时发生异常时的处理方案。以 Dubbo 框架为例,提供了六种内置的集群容错措施。
以上几种集群容错措施可以根据实际的业务场景进行配置选择,而且 Dubbo 给我们提供了 Cluster 扩展接口,我们可以自己定制集群的容错模式。
jmap -histo:live <pid>
手动触发 FullGC, 观察堆外内存是否被回收,如果正常回收很可能是因为堆外设置太小,可以通过 -XX:MaxDirectMemorySize 调整。
JDK 默认采用 Cleaner 回收释放 DirectByteBuffer,Cleaner 继承于 PhantomReference,因为依赖 GC 进行处理,所以回收的时间是不可控的。对于 hasCleaner 的 DirectByteBuffer,Java 提供了一系列不同类型的 MXBean 用于获取 JVM 进程线程、内存等监控指标,代码实现如下:
BufferPoolMXBean directBufferPoolMXBean = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class).get(0); LOGGER.info("DirectBuffer count: {}, MemoryUsed: {} K", directBufferPoolMXBean.getCount(), directBufferPoolMXBean.getMemoryUsed()/1024);
对于 Netty 中 noCleaner 的 DirectByteBuffer,直接通过 PlatformDependent.usedDirectMemory() 读取即可。
Netty 提供了自带的内存泄漏检测工具,我们可以通过以下命令启用堆外内存泄漏检测工具:
-Dio.netty.leakDetection.level=paranoid
Netty 一共提供了四种检测级别:
Netty 会检测 ByteBuf 是否已经不可达且引用计数大于 0,判定内存泄漏的位置并输出到日志中,你需要关注日志中 LEAK 关键字。
我们可以通过传统 Dump 内存的方法排查堆外内存泄漏问题,运行如下命令:
jmap -dump:format=b,file=heap.dump pid
Dump 完内存堆栈之后,将其导入 MemoryAnalyzer 工具进行分析内存泄漏的可疑点,最终定位到代码源头。
Btrace 是一款通过字节码检测 Java 程序的排障神器,它可以获取程序在运行过程中的一切信息,与 AOP 的使用方式类似。我们可以通过如下方式追踪 DirectByteBuffer 的堆外内存申请的源头:
@BTrace public class TraceDirectAlloc { @OnMethod(clazz = "java.nio.Bits", method = "reserveMemory") public static void printThreadStack() { jstack(); } }