主要实现:
原有的main逻辑如下:
重构后如下:
/** * @ClassName Server * @Description TODO * @Author wushaopei * @Date 2022/2/27 13:00 * @Version 1.0 */ public class Server { public static void main(String[] args) throws IOException { TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER); boolean isSucceed = tcpServer.start(); if(!isSucceed){ System.out.println("Start TCP server failed."); } UDPProvider.start(TCPConstants.PORT_SERVER); // 键盘输入: BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String str; do { str = bufferedReader.readLine(); tcpServer.broadcast(str); } while (!"00bye00".equalsIgnoreCase(str)); UDPProvider.stop(); tcpServer.stop(); } }
重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。
创建 ClientHandler.java 重构收发消息操作:
/** * @ClassName ClientHandler * @Description TODO * @Author wushaopei * @Date 2022/2/27 16:44 * @Version 1.0 */ public class ClientHandler { private final Socket socket; private final ClientReadHandler readHandler; private final ClientWriteHandler writeHandler; private final CloseNotiry closeNotiry; public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException { this.socket = socket; this.readHandler = new ClientReadHandler(socket.getInputStream()); this.writeHandler = new ClientWriteHandler(socket.getOutputStream()); this.closeNotiry = closeNotiry; System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort()); } }
重构接收消息的操作:
/** * 接收数据 */ class ClientReadHandler extends Thread { private boolean done = false; private final InputStream inputStream; ClientReadHandler(InputStream inputStream){ this.inputStream = inputStream; } @Override public void run(){ super.run(); try { // 得到输入流,用于接收数据 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { // 客户端拿到一条数据 String str = socketInput.readLine(); if(str == null){ System.out.println("客户端已无法读取数据!"); // 退出当前客户端 ClientHandler.this.exitBySelf(); break; } // 打印到屏幕 System.out.println(str); }while (!done); socketInput.close(); }catch (IOException e){ if(!done){ System.out.println("连接异常断开"); ClientHandler.this.exitBySelf(); } }finally { // 连接关闭 CloseUtils.close(inputStream); } } void exit(){ done = true; CloseUtils.close(inputStream); } }
创建一个单独的线程进行接收消息,该线程不需要关闭。
重构发送消息:
/** * 发送数据 */ class ClientWriteHandler { private boolean done = false; private final PrintStream printStream; private final ExecutorService executorService; ClientWriteHandler(OutputStream outputStream) { this.printStream = new PrintStream(outputStream); // 发送消息使用线程池来实现 this.executorService = Executors.newSingleThreadExecutor(); } void exit(){ done = true; CloseUtils.close(printStream); executorService.shutdown(); } void send(String str) { executorService.execute(new WriteRunnable(str)); } class WriteRunnable implements Runnable{ private final String msg; WriteRunnable(String msg){ this.msg = msg; } @Override public void run(){ if(ClientWriteHandler.this.done){ return; } try { ClientWriteHandler.this.printStream.println(msg); }catch (Exception e){ e.printStackTrace(); } } } }
TCPServer调用发送消息的逻辑:
public void broadcast(String str) { for (ClientHandler client : clientHandlerList){ // 发送消息 client.send(str); } }
private List<ClientHandler> clientHandlerList = new ArrayList<>(); /** * 监听客户端链接 */ private class ClientListener extends Thread { private ServerSocket server; private boolean done = false; private ClientListener(int port) throws IOException { server = new ServerSocket(port); System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort()); } @Override public void run(){ super.run(); System.out.println("服务器准备就绪~"); // 等待客户端连接 do{ // 得到客户端 Socket client; try { client = server.accept(); }catch (Exception e){ continue; } try { // 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler)); // 启动线程 clientHandler.readToPrint(); clientHandlerList.add(clientHandler); } catch (IOException e) { e.printStackTrace(); System.out.println("客户端连接异常: " + e.getMessage()); } }while (!done); System.out.println("服务器已关闭!"); } void exit(){ done = true; try { server.close(); }catch (IOException e){ e.printStackTrace(); } } }
clientHandlerList作为类变量,用于管理当前用户的信息。接收与发送都使用该变量。
1.4 Socket、流的退出与关闭:
/** * 退出、关闭流 */ public void exit(){ readHandler.exit(); writeHandler.exit(); CloseUtils.close(socket); System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort()); } /** * 发送消息 * @param str */ public void send(String str){ writeHandler.send(str); } /** * 接收消息 */ public void readToPrint() { readHandler.exit(); } /** * 接收、发送消息异常,自动关闭 */ private void exitBySelf() { exit(); closeNotiry.onSelfClosed(this); } /** * 关闭流 */ public interface CloseNotiry{ void onSelfClosed(ClientHandler handler); }
public static void main(String[] args) { // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机 ServerInfo info = UDPSearcher.searchServer(10000); System.out.println("Server:" + info); if( info != null){ try { TCPClient.linkWith(info); }catch (IOException e){ e.printStackTrace(); } } }
static class ReadHandler extends Thread{ private boolean done = false; private final InputStream inputStream; ReadHandler(InputStream inputStream){ this.inputStream = inputStream; } @Override public void run(){ try { // 得到输入流,用于接收数据 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { // 客户端拿到一条数据 String str = null; try { str = socketInput.readLine(); }catch (SocketTimeoutException e){ } if(str == null){ System.out.println("连接已关闭,无法读取数据!"); break; } // 打印到屏幕 System.out.println(str); }while (!done); socketInput.close(); }catch (IOException e){ if(!done){ System.out.println("连接异常断开:" + e.getMessage()); } }finally { // 连接关闭 CloseUtils.close(inputStream); } } void exit(){ done = true; CloseUtils.close(inputStream); } }
创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。
private static void write(Socket client) throws IOException { // 构建键盘输入流 InputStream in = System.in; BufferedReader input = new BufferedReader(new InputStreamReader(in)); // 得到Socket输出流,并转换为打印流 OutputStream outputStream = client.getOutputStream(); PrintStream socketPrintStream = new PrintStream(outputStream); boolean flag = true; do { // 键盘读取一行 String str = input.readLine(); // 发送到服务器 socketPrintStream.println(str); // 从服务器读取一行 if("00bye00".equalsIgnoreCase(str)){ break; } }while(flag); // 资源释放 socketPrintStream.close(); }
在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。
public static void linkWith(ServerInfo info) throws IOException { Socket socket = new Socket(); // 超时时间 socket.setSoTimeout(3000); // 端口2000;超时时间300ms socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));// System.out.println("已发起服务器连接,并进入后续流程~"); System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort()); System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort()); try { ReadHandler readHandler = new ReadHandler(socket.getInputStream()); readHandler.start(); // 发送接收数据 write(socket); }catch (Exception e){ System.out.println("异常关闭"); } // 释放资源 socket.close(); System.out.println("客户端已退出~"); }
原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。
服务端重构后日志:
J:\folder\JDK1.8\bin\java -ja…… - Channel\out\production\classes" server.Server 服务器信息: 0.0.0.0/0.0.0.0 P:30401 服务器准备就绪~ UDDProvider Started. ServerProvider receive from ip:169.254.178.74 port:169.254.178.74 port:49878 dataValid:true ServerProvider response to:169.254.178.74 port:30202 dataLen: 50 新客户链接: /169.254.178.74 P:51094 ping pong 00bye0000bye00 客户端已无法读取数据! 客户端已退出:/169.254.178.74 P:51094
客户端重构后执行日志:
J:\folder\JDK1.8\bin\java -javaagent:J:\……- Channel\out\production\classes" client.Client UDPSearcher Started. UDPSearcher start listen. UDPSearcher sendBroadcast started. UDPSearcher sendBroadcast finished. UDPSearch receive form ip:169.254.178.74 port:30201 dataValid:true UDPSearcher Finished. UDPSearcher listener finished. Server:ServerInfo{sn='4cd6143b-205f-4e80-9b22-a6eae8e85cdd', port=30401, address='169.254.178.74'} 已发起服务器连接,并进入后续流程~ 客户端信息: /169.254.178.74 P:51094 服务器信息:/169.254.178.74 P:30401 ping pong 00bye00 客户端已退出~ Process finished with exit code 0