package chatroom.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; /****** @author 阿昌 @create 2021-09-13 20:23 聊天室服务端 ******* */ public class ChatServer { //服务端启动的方法 public void startServer() throws IOException { //1、创建Selector选择器 Selector selector = Selector.open(); //2、创建ServerSocketChannel通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //3、为channel通道绑定端口号 serverSocketChannel.bind(new InetSocketAddress(9090)); serverSocketChannel.configureBlocking(false);//设置非阻塞模式 //4、把serverSocketChannel绑定到selector上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器启动......."); //5、循环监听是否有连接连入 while (true) { int select = selector.select(); //如果为0,则为没连接,没有获取到,就跳出循环 if (select == 0) { continue; } //获取可用channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); //遍历 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //移除 set 集合当前 selectionKey iterator.remove(); //6、根据就绪状态,调用对应方法实现具体业务操作 if (selectionKey.isAcceptable()) { //6.1 如果 accept 状态 acceptOperator(serverSocketChannel, selector); } if (selectionKey.isReadable()) { //6.2 如果可读状态 readOperator(selector, selectionKey); } } } } //处理可读状态操作 private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException { //1 从 SelectionKey 获取到已经就绪的通道 SocketChannel channel = (SocketChannel) selectionKey.channel(); //2 创建 buffer ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //3 循环读取客户端消息 int readLength = channel.read(byteBuffer); String message = "";//用于接收解码后的信息 //表示里面有数据 if (readLength > 0) { //切换读模式 byteBuffer.flip(); //读取内容 message += Charset.forName("UTF-8").decode(byteBuffer); } //4 将 channel 再次注册到选择器上,监听可读状态 channel.register(selector, SelectionKey.OP_READ); //5 把客户端发送消息,广播到其他客户端 if (message.length() > 0) { //广播给其他客户端 System.out.println(message); castOtherClient(message, selector, channel); } } //广播到其他客户端 private void castOtherClient(String message, Selector selector, SocketChannel channel) throws IOException { //1 获取所有已经接入 channel Set<SelectionKey> selectionKeySet = selector.keys(); //2 循环想所有 channel 广播消息 for (SelectionKey selectionKey : selectionKeySet) { //获取每个 channel Channel tarChannel = selectionKey.channel(); //不需要给自己发送 if (tarChannel instanceof SocketChannel && tarChannel != channel) {//不向自己广播 ((SocketChannel) tarChannel).write(Charset.forName("UTF-8").encode(message)); } } } //处理接入状态操作 private void acceptOperator(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException { //1 接入状态,创建 socketChannel SocketChannel accept = serverSocketChannel.accept(); //2 把 socketChannel 设置非阻塞模式 accept.configureBlocking(false); //3 把 channel 注册到 selector 选择器上,监听可读状态 accept.register(selector, SelectionKey.OP_READ); //4 客户端回复信息 accept.write(Charset.forName("UTF-8").encode("欢迎进入聊天室,请注意隐私安全")); } public static void main(String[] args) throws IOException { new ChatServer().startServer(); } }
package chatroom.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; /****** @author 阿昌 @create 2021-09-13 20:45 聊天室客户端 ******* */ public class ChatClient { //启动方法 public void startClient(String name) throws IOException { //连接服务端 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9090)); //接收服务端响应数据 Selector selector = Selector.open(); socketChannel.configureBlocking(false);//设置非阻塞连接 socketChannel.register(selector, SelectionKey.OP_READ);//将通道注册到selector上 //创建线程,来接收服务端的响应信息 new Thread(new ClientThread(selector)).start(); //向服务端发送信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String msg = scanner.nextLine(); if (msg.length()>0){ //写入通道消息,让他发送给服务端 socketChannel.write(Charset.forName("UTF-8").encode(name+": "+msg)); } } } }
package chatroom.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; /****** @author 阿昌 @create 2021-09-13 20:57 客户端异步监听服务端响应Runnable类 ******* */ public class ClientThread implements Runnable { private Selector selector; public ClientThread(Selector selector) { this.selector = selector; } @Override public void run() { try { while (true) { //获取 channel 数量 int readChannels = selector.select(); if (readChannels == 0) { continue; } //获取可用的 channel Set<SelectionKey> selectionKeys = selector.selectedKeys(); //遍历集合 Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); //移除 set 集合当前 selectionKey iterator.remove(); //如果可读状态 if (selectionKey.isReadable()) { //处理可读状态操作 readOperator(selector, selectionKey); } } } } catch (IOException e) { e.printStackTrace(); } } //处理可读状态操作 private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException { //1 从 SelectionKey 获取到已经就绪的通道 SocketChannel socketChannel =(SocketChannel) selectionKey.channel(); //2 创建 buffer ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //3 循环读取客户端消息 int readLength = socketChannel.read(byteBuffer); String message = "";//用于存储解码后的消息 if (readLength > 0) { //切换读模式 byteBuffer.flip(); //读取内容 message += Charset.forName("UTF-8").decode(byteBuffer); } //4 将 channel 再次注册到选择器上,监听可读状态 socketChannel.register(selector, SelectionKey.OP_READ); //5 把客户端发送消息,广播到其他客户端 if (message.length() > 0) { //广播给其他客户端 System.out.println(message); } } }
public class AClient { public static void main(String[] args) { try { new ChatClient().startClient("阿昌一号"); } catch (IOException e) { e.printStackTrace(); } } }
public class BClient { public static void main(String[] args) { try { new ChatClient().startClient("阿昌二号"); } catch (IOException e) { e.printStackTrace(); } } }
服务端启动后,AClient和BClient都发送一条消息后的效果