<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.30.Final</version> </dependency>
链接:https://pan.baidu.com/s/1g64RpTcPwIgVFybcOFEJVA
提取码:ncz9
public class NettyClientUtil { static String[] ips = {"127.0.0.1"}; static Integer port = 8886; static Bootstrap bootstrap; // 客户端就只需要创建一个 线程组了 static EventLoopGroup loopGroup = new NioEventLoopGroup(); public static void main(String[] args) { for (String ip : ips) { start(ip, port); } } public static void start(String ip, Integer port) { // 创建 启动器 bootstrap = new Bootstrap(); try { bootstrap.group(loopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)) // .addLast(new FixedLengthFrameDecoder(22)) //数据包的长度 .addLast(new NettyClientHandler()); } }); // 连接服务 ChannelFuture future = connect(ip, port); // 对服务关闭 监听 // future.channel().closeFuture().sync(); } catch (Exception e) { loopGroup.shutdownGracefully(); System.out.println(e.getMessage()); } /*finally { }*/ } public static ChannelFuture connect(String ip, Integer port) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); ChannelFuture channelFuture = bootstrap.connect(ip, port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { System.out.println(Thread.currentThread().getName() + ">>>>>连接" + ip + ":" + port + "服务端超时,1秒后重试……"); Thread.sleep(1000); connect(ip, port); } } }); return channelFuture; } }
public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 连接上服务的回调方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); // 发送数据 System.out.println(Thread.currentThread().getName() + ">>>>>连接上了" + inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort() + "服务端...."); } /** * 读取服务端返回的信息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName() + ">>>>>" + buf.toString(CharsetUtil.UTF_8)); } /** * 异常断开回调方法 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = inetSocketAddress.getAddress().getHostAddress(); int port = inetSocketAddress.getPort(); System.out.println(Thread.currentThread().getName() + ">>>>>与服务端" + ip + ":" + port + "断开连接,3秒后尝试重试连接……"); Thread.sleep(3000); NettyClientUtil.connect(ip, port); super.channelInactive(ctx); } }
连接服务端
服务端推送消息
关闭服务端
再次打开服务端