title: netty长连接,短连接,心跳检测
date: 2018/4/23 11:12:55
tags: [netty,长连接,短连接,心跳检测]
categories:
https://www.cnblogs.com/superfj/p/9153776.html
client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。
这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。
管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。
通常浏览器访问服务器的时候就是短连接。
对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的
如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。
所以对于并发量大,请求频率低的,建议使用短连接。
client向server发起连接,server接受client连接,双方建立连接。
Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。
正常情况下,一条TCP长连接建立后,只要双不提出关闭请求&不出现异常情况,这条连接是一直存在.
操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。
所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。客户端和服务单可一直使用该连接进行数据通信。
长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,
当发生错误时,可以在不关闭连接的情况下进行提示,
减少CPU及内存的使用,因为不需要经常的建立及关闭连接。
一种比较特殊的长连接模式,在一段时间内,“数据中心”与设备之间数据交互频繁,但是过了这段时间,很长一段时间内,都没有任何数据需要交互的情况下,最好的办法就是:在交互频繁的那段时间,保持长连接 ,一旦过了那段时间,立马断开连接,下次需要交互时,再获取连接即可,这种方式,主要可以为“数据中心”服务器节省资源的浪费。
第三种情况的使用,需要考虑两个问题:
长连接模式下,维护连接的方案:
如果是长连接的情况下,一般我们都需要做连接的维护工作,方案主要有以下两种:
1、客户端间隔5分钟,向服务器发起一次“心跳”报文,如果服务器正常回应,那就无所谓,如果不回应,一般就直接断开“连接”,然后重新向服务器再次申请新的连接即可。
2、客户端或者服务端开启一个定时任务,间隔5分钟,判断在这5分钟内,是否有向服务器交互数据,如果有交互,那么就继续维护这个连接,如果没有交互,那么就直接断开连接即可,下次再需要交互时,再向服务器申请新的连接即可。这样做的好处,是给服务器减压
客户端如果主动自己断开,这个一般不需要做特殊处理,直接在下次连接时,申请新的连接即可。
服务器如果自己因为什么原因,断开了,那么客户端,需要定义一个定时任务,间隔10分钟,或者多少时间,去不断的尝试服务器是否恢复。
应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。
TCP . SO_KEEPALIVE。系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。
默认KeepAlive状态是不打开的。
需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,
并且可以设置三个参数:
tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl,
分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。
很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。
典型做法是LRU,把最久没有数据的连接给T掉。
通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。
两种方式实现心跳机制:
虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:
使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,
https://github.com/JayTange/LearnTCP
//构造函数 public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); } //HeartBeatServer.java(服务端) serverBootstrap.childHandler(new HeartBeatServerInitializer()); private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 监听读操作,读超时时间为5秒,超过5秒关闭channel; pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS)); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new HeartbeatServerHandler()); } }
// HeartBeatServerHandler.java extends SimpleChannelInboundHandler<T> @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state()==IdleState.READER_IDLE){ // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) { System.out.println("===>服务端===(读超时,关闭chanel)"); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.close(); } else { // 失败计数器加1 unRecPingTimes++; } }else { super.userEventTriggered(ctx,evt); } } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println("一个客户端已连接"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); System.out.println("一个客户端已断开连接"); }
// 客户端 public class HeartBeatClient { private Random random = new Random(); public Channel channel; public Bootstrap bootstrap; protected String host = "127.0.0.1"; protected int port = 9817; public static void main(String args[]) throws Exception { HeartBeatClient client = new HeartBeatClient(); client.run(); client.sendData(); } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new SimpleClientInitializer(HeartBeatClient.this)); doConncet(); } catch (Exception e) { e.printStackTrace(); } } /** * 发送数据 * @throws Exception */ public void sendData() throws Exception { BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while (true){ String cmd = in.readLine(); switch (cmd){ case "close" : channel.close(); break; default: channel.writeAndFlush(in.readLine()); break; } } } /** * 连接服务端 */ public void doConncet() { if (channel != null && channel.isActive()) { return; } ChannelFuture channelFuture = bootstrap.connect(host, port); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture futureListener) throws Exception { if (channelFuture.isSuccess()) { channel = futureListener.channel(); System.out.println("connect server successfully"); } else { System.out.println("Failed to connect to server, try connect after 10s"); futureListener.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConncet(); } }, 10, TimeUnit.SECONDS); } } }); } private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> { private HeartBeatClient client; public SimpleClientInitializer(HeartBeatClient client) { this.client = client; } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 5, 0)); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("handler", new HeartBeatClientHandler(client)); } } }
客户端handler
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{ private HeartBeatClient client; private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd"); private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat", CharsetUtil.UTF_8)); public HeartBeatClientHandler(HeartBeatClient client) { this.client = client; } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("收到服务端回复:"+msg); if (msg.equals("Heartbeat")) { ctx.write("has read message from server"); ctx.flush(); } ReferenceCountUtil.release(msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()); } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date())); // 定时线程 断线重连 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { client.doConncet(); } }, 10, TimeUnit.SECONDS); } }
https://www.jianshu.com/p/54f9bfcd054b
internet of things 飞速发展,对服务端的协议接入和处理能力要求极高,而且IoT设备接入它有以下特点
以下这些调优,都属于小方法,小技巧,如果系统对性能要求很高,最优的还是采用分布式集群的方式来提升整个服务端的处理能力。
要想实现海量设备的长连接接入,需要对
在 /etc/sysctl.conf 插入 fs.file-max = 1000000
设置单进程打开的最大句柄数
心跳检测周期通常不要超过60s,心跳检测超时通常为心跳检测周期的2倍
.childOption(channelOption.SO_RCVBUF, 8*1024)
java对于缓冲区Buffer的分配和回收是一个耗时的工作(特别是堆外直接内存)。
为了尽量重用缓冲区,netty提供了基于内存池的缓冲区重用机制。
内存池实现上可以分为两类:堆外直接内存和堆内存。
DirectBypeBuf的创建成本高,因为需要配合内存池使用
netty默认的io读写操作采用的都是内存池的堆外直接内存模式,如果用户需要额外使用ByteBuf,建议也采用内存池方式 ; 如果不涉及网络io操作(单纯的内存操作)可以使用堆内存池
// void initchannel(SocketChannel channel) channel.config().setAllocator(UnpooledByteBufAllocator.DEAFAULT);
吞吐量,延迟,内存占用
Minor GC (每次新生代回收尽可能多的内存,以减小FULL GC)
GC内存最大化原则:GC使用的内存越大,效率越高
大多数IoT应用,吞吐量优先
-XX printGC
visual GC工具