流经网络的数据总是具有相同的类型:字节,这些字节如何传输主要取决于我们所说的网络传输。用户并不关心传输的细节,只在乎字节是否被可靠地发送和接收
如果使用 Java 网络编程,你会发现,某些时候当你需要支持高并发连接,随后你尝试将阻塞传输切换为非阻塞传输,那么你会因为这两种 API 的截然不同而遇到问题。Netty 提供了一个通用的 API,这使得转换更加简单。
这里介绍仅使用 JDK API 来实现应用程序的阻塞(OIO)和非阻塞版本(NIO)
阻塞网络编程如下:
public class PlainOioServer { public void server(int port) throws IOException { // 将服务器绑定到指定端口 final ServerSocket socket = new ServerSocket(port); try { while (true) { // 接收连接 final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); // 创建一个新的线程来处理连接 new Thread(() -> { OutputStream out; try { out = clientSocket.getOutputStream(); // 将消息写给已连接的客户端 out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8)); out.flush(); // 关闭连接x clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } catch (IOException e) { e.printStackTrace(); } } }
这段代码可以处理中等数量的并发客户端,但随着并发连接的增多,你决定改用异步网络编程,但异步的 API 是完全不同的
非阻塞版本如下:
public class PlainNioServer { public void server(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); // 将服务器绑定到选定的端口 ssocket.bind(address); // 打开 Selector 来处理 Channel Selector selector = Selector.open(); // 将 ServerSocket 注册到 Selector 以接受连接 serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes()); while (true) { try { // 等待需要处理的新事件,阻塞将一直持续到下一个传入事件 selector.select(); } catch (IOException e) { e.printStackTrace(); break; } Set<SelectionKey> readKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 检查事件是否是一个新的已经就绪可以被接受的连接 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客户端,并将它注册到选择器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 检查套接字是否已经准备好写数据 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 将数据写到已连接的客户端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException exception) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { cex.printStackTrace(); } } } } } }
可以看到,阻塞和非阻塞的代码是截然不同的。如果为了实现非阻塞而完全重写程序,无疑十分困难
使用 Netty 的阻塞网络处理如下:
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8)); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new SimpleChannelInboundHandler<>() { @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
而非阻塞版本和阻塞版本几乎一模一样,只需要改动两处地方
EventLoopGroup group = new NioEventLoopGroup(); b.group(group).channel(NioServerSocketChannel.class);
传输 API 的核心是 interface Channel,它被用于所有的 IO 操作。每个 Channel 都将被分配一个 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了该 Channel 的所有配置设置,ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实例
除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法
方法名 | 描述 |
---|---|
eventLoop | 返回分配给 Channel 的 EventLoop |
pipeline | 返回分配给 Channel 的 ChannelPipeline |
isActive | 如果 Channel 活动的,返回 true |
localAddress | 返回本地的 SocketAddress |
remoteAddress | 返回远程的 SocketAddress |
write | 将数据写到远程节点 |
flush | 将之前已写的数据冲刷到底层传输 |
writeAndFlush | 等同于调用 write() 并接着调用 flush() |
Netty 内置了一些可开箱即用的传输,但它们所支持的协议不尽相同,因此你必须选择一个和你的应用程序所使用协议相容的传输
名称 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作为基础 |
Epoll | io.netty.channel.epoll | 由 JNI 驱动的 epoll() 和非阻塞 IO,可支持只有在 Linux 上可用的多种特性,比 NIO 传输更快,且完全非阻塞 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作为基础 |
Local | io.netty.channel.local | 可以在 VM 内部通过管道进行通信的本地传输 |
Embedded | io.netty.channel.embedded | Embedded 传输,允许使用 ChannelHandler 而不需要一个真正的基于网络的传输,主要用于测试 |