初始化一个ServerSocketChannel,绑定端口,然后使用Selector监听accept事件。
当有accept发生时,表示有客户端连接进来了,获取客户端的SocketChannel,然后注册其read事件;用来接收客户端发送的消息。
package chatroom; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; /** * 服务端 * * @author wenei * @date 2021-07-20 20:36 */ public class Server { private static final Logger log = Logger.getLogger(Server.class.getName()); private int port; private List<SocketChannel> clientChannelList = new ArrayList<>(); public Server(int port) { this.port = port; } public void start() throws IOException { // 初始化服务端channel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); // 新建Selector Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { final int selectCount = selector.select(); if (selectCount <= 0) { continue; } final Set<SelectionKey> selectionKeys = selector.selectedKeys(); final Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { // 当有accept事件时,将新的连接加入Selector ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel accept = serverChannel.accept(); accept.configureBlocking(false); clientChannelList.add(accept); accept.register(selector, SelectionKey.OP_READ); log.log(Level.INFO, "新连接 " + accept); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); log.log(Level.INFO, "可读连接 " + socketChannel); ByteBuffer buffer = ByteBuffer.allocate(60); try { /** * 当客户端非正常退出时,read抛出异常,属于被动性关闭; * 当客户端正常返回时,返回-1,但也是readable信号,所以需要处理 */ final int read = socketChannel.read(buffer); if (read == -1) { log.log(Level.INFO, "连接主动关闭:" + socketChannel); clientChannelList.remove(socketChannel); socketChannel.close(); continue; } } catch (IOException e) { log.log(Level.INFO, "连接被动关闭:" + socketChannel); clientChannelList.remove(socketChannel); socketChannel.close(); continue; } buffer.flip(); byte[] bytes = new byte[60]; int index = 0; while (buffer.hasRemaining()) { bytes[index++] = buffer.get(); } bytes[index] = '\0'; log.log(Level.INFO, "接受数据: " + new String(bytes, StandardCharsets.UTF_8).trim()); // 广播 clientChannelList.forEach(channel -> { if (channel != socketChannel) { buffer.flip(); try { channel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } }); // buffer.clear(); } } } } public static void main(String[] args) throws IOException { new Server(10022).start(); } }
使用主线程获取键盘输入,然后传给服务端。
使用子线程接收服务端发送的信息并显示。
package chatroom; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /** * 客户端 * * @author wenei * @date 2021-07-21 9:14 */ public class Client { /** * 客户端接收信息线程 */ static class ClientReceiveThread implements Runnable { /** * 客户端socket */ private SocketChannel socketChannel; public ClientReceiveThread(SocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void run() { try { Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_READ); while (true) { final int selectCount = selector.select(100); if (Thread.currentThread().isInterrupted()) { System.out.println("连接关闭"); socketChannel.close(); return; } if (selectCount <= 0) { continue; } final Set<SelectionKey> selectionKeys = selector.selectedKeys(); final Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { final SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer recvBuffer = ByteBuffer.allocate(60); socketChannel.read(recvBuffer); recvBuffer.flip(); byte[] bytes = new byte[60]; int index = 0; while (recvBuffer.hasRemaining()) { bytes[index++] = recvBuffer.get(); } bytes[index] = '\0'; System.out.println("接受数据: " + new String(bytes, StandardCharsets.UTF_8).trim()); } } } } catch (IOException e) { e.printStackTrace(); } } } private int port; public Client(int port) { this.port = port; } public void start() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(port)); socketChannel.configureBlocking(false); Scanner scanner = new Scanner(System.in); ByteBuffer buffer = ByteBuffer.allocate(60); Thread thread = new Thread(new ClientReceiveThread(socketChannel)); thread.start(); while (true) { String data = scanner.nextLine(); if (data.equals("exit")) { break; } System.out.println("输入数据:" + data); buffer.put(data.getBytes(StandardCharsets.UTF_8)); buffer.flip(); socketChannel.write(buffer); buffer.clear(); } thread.interrupt(); } public static void main(String[] args) throws IOException { new Client(10022).start(); } }