maven里面引入netty依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.51.Final</version> </dependency>
创建NettyServer类
package com.NettyServer.service; import com.NettyServer.common.ServerChannelInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.springframework.beans.factory.annotation.Value; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.net.InetSocketAddress; @Component @Slf4j public class NettyServer { /** * boss 线程组用于TCP处理连接工作 */ private final EventLoopGroup boss = new NioEventLoopGroup(); /** * work 线程组用于IO数据处理 */ private final EventLoopGroup work = new NioEventLoopGroup(); @Value("${netty.port}") private Integer port; /** * 启动Netty Server */ @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) // 指定Channel .channel(NioServerSocketChannel.class) //使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) //将小的数据包包装成更大的帧进行传送,提高网络的负载 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ServerChannelInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("已启动 Netty Server"); } } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); work.shutdownGracefully().sync(); log.info("关闭Netty"); }
创建ServerChannelInitializer类
package com.NettyServer.common; import com.NettyServer.service.NettyServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) { //添加编解码 socketChannel.pipeline().addLast(new ObjectDecoder(10 * 1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); socketChannel.pipeline().addLast(new ObjectEncoder()); socketChannel.pipeline().addLast(new NettyServerHandler()); } }
创建NettyServerHandler类
package com.NettyServer.service; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static NettyServerHandler handler; /** * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); //@Autowired //private IBaseAttachmentService attachmentService; @PostConstruct public void init() { handler = this; } /** * @param ctx * @author xxx on 2019/4/28 16:10 * @DESCRIPTION: 有客户端连接服务器会触发此函数 * @return: void */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); int clientPort = insocket.getPort(); //获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); //如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { //保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * @param ctx * @author xxx on 2019/4/28 16:10 * @DESCRIPTION: 有客户端终止连接服务器会触发此函数 * @return: void */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { //删除连接 CHANNEL_MAP.remove(channelId); log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * @param ctx * @author xxx on 2019/4/28 16:10 * @DESCRIPTION: 有客户端发消息会触发此函数 * @return: void */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("【" + ctx.channel().id() + "】" + " :" + msg.toString()); //可以在这个地方写业务处理逻辑入库或者什么的................................... // 结果写到客户端 /** * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 * */ //响应客户端 this.channelWrite(ctx.channel().id(), msg); } /** * @param msg 需要发送的消息内容 * @param channelId 连接通道唯一id * @author xxx 2020/05/14 16:10 * @DESCRIPTION: 服务端给客户端发送消息 * @return: void */ public void channelWrite(ChannelId channelId, Object msg) throws Exception { ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info("通道【" + channelId + "】不存在"); return; } if (msg == null || msg == "") { log.info("服务端响应空的消息"); return; } //将客户端的信息直接返回写入ctx ctx.write(msg); //刷新缓存区 ctx.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } } /** * @param ctx * @author xxx on 2019/4/28 16:10 * @DESCRIPTION: 发生异常会触发此函数 * @return: void */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(); ctx.close(); log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); //cause.printStackTrace(); } }
下面贴出来客户端代码
创建NettyClient类
package com.NettyClient.service; import com.NettyClient.common.NettyClientInitializer; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class NettyClient { static final String HOST = "127.0.0.1"; static final int PORT = 8899; private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf; private NettyClientInitializer nettyClientInitializer; public NettyClient() { nettyClientInitializer = new NettyClientInitializer(); group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(nettyClientInitializer); } public void connect() { try { this.cf = b.connect(HOST, PORT).sync(); } catch (InterruptedException e) { log.error("客户端连接服务端异常:" + e); } } public ChannelFuture getChannelFuture() { if (this.cf == null) { this.connect(); } if (!this.cf.channel().isActive()) { this.connect(); } return this.cf; } public void close() { try { this.cf.channel().closeFuture().sync(); this.group.shutdownGracefully(); } catch (InterruptedException e) { e.printStackTrace(); } } public void setMessage(String msg) throws InterruptedException { ChannelFuture cf = this.getChannelFuture(); cf.channel().writeAndFlush(msg); } public static void main(String[] args) { try { } catch (Exception e) { log.error("异常:" + e); } } }
创建NettyClientHandler类
package com.NettyClient.service; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter { private String result; @Override public void channelActive(ChannelHandlerContext ctx) { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //if (!ObjectUtils.isEmpty(msg)) { result = (String) msg; //} } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
创建NettyClientInitializer类
package com.NettyClient.common; import com.NettyClient.service.NettyClientHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.ReadTimeoutHandler; import org.springframework.beans.factory.annotation.Autowired; public class NettyClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); socketChannel.pipeline().addLast(new ObjectEncoder()); socketChannel.pipeline().addLast(new NettyClientHandler()); } }
客户端直接可以在业务代码里面调用发送数据到服务端就行了、我简单写了一个定时器测试发送的例子
package com.NettyClient; import com.NettyClient.service.NettyClient; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @Slf4j @EnableAsync public class TaskManager { @Autowired private NettyClient nettyClient; @Async @Scheduled(cron = "0 */1 * * * ?") public void test() throws InterruptedException { try { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); nettyClient.setMessage("客户端发送此数据" + df.format(new Date())); } catch (Exception e) { log.error("异常:" + e); } } @Async @Scheduled(cron = "0 */1 * * * ?") public void test2() throws InterruptedException { try { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); nettyClient.setMessage("客户端2发送此数据" + df.format(new Date())); } catch (Exception e) { log.error("异常:" + e); } } }