client 发送消息到服务器端,服务器端回复消息也就是回送消息。
如上图,数据在传输的时候,需要在尾部追加换行符,也就是说原来5个字节的数据,在实际传输时,是有6个字节长度的。
在客户端有多个情况下,客户端都会向服务器端进行发送消息;想要在PC发送消息给服务器端时,也让安卓、平板等终端都能收到,其操作应该是,当PC端发送一条消息到服务器端之后,服务器端得到该数据后,它会把这条数据发送(回送)给当前连接的客户端。而这些当前连接的客户端收到这条消息后,就实现了把PC消息发送到手机的过程。
代码分为四个module,分别为lib-clink、sample-client、sample-foo、sample-server。
(1)lib-clink
该module为提供工具类进行校验与流处理。
(2)sample-client ,客户端代码,需要依赖 lib-clink、sample-foo两个module
(3)sample-foo ,基础的共用类代码
(4)sample-server,服务端代码,需要依赖 lib-clink、sampl-foo两个module
(5)lib-clink、sample-foo的工具类、基础数据类参考前面 TCP点对点传输的代码逻辑
初版代码和TCP点对点传输的基本一致,聊天室主要在TCPServer端进行转发,所以Client不需要代码重构。
初版代码和TCP点对点传输的基本一致,要实现聊天室消息接收则需要进行重构。主要重构 TCPServer.java 、ClientHandler.java类。
(1)ClientHandler.java - 消息转发
原有的消息在收到后就只是打印到控制台
// 打印到屏幕 System.out.println(str);
而实现聊天室功能需要将收到的消息进行通知出去。这里可以通过 CloseNotify() 接口进行实现。这里对该接口进行改造,并新增转发的接口方法来将消息通知回去。
/** * 消息回调 */ public interface ClientHandlerCallback { // 自身不安比通知 void onSelfClosed(ClientHandler handler); // 收到消息时通知 void onNewMessageArrived(ClientHandler handler,String msg); }
在将消息打印到屏幕的同时,将消息通知出去:
// 打印到屏幕 System.out.println(str); clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);
调用onNewMessageArrived()方法从而进行转发。这里主要是把当前收到的消息传递回去,同时也要把自身传递回去。
(2)自身描述信息的构建
新增clientInfo类变量:
private final String clientInfo;
自身描述信息初始化:
public ClientHandler(Socket socket, ClientHandlerCallback clientHandlerCallback) throws IOException { this.socket = socket; this.readHandler = new ClientReadHandler(socket.getInputStream()); this.writeHandler = new ClientWriteHandler(socket.getOutputStream()); this.clientHandlerCallback = clientHandlerCallback; // 新增自身描述信息 this.clientInfo = "A[" + socket.getInetAddress().getHostAddress() + "] P[" + socket.getPort() + "]"; System.out.println("新客户端连接:" + clientInfo); } public String getClientInfo() { return clientInfo; }
(3)重构TCPServer.java
重构 clientHandler.ClientHandlerCallback的两个回调方法,这里要将之提到TCPServer.java类上。
让TCPServer.java 实现 clientHandler.ClientHandlerCallback接口。并实现两个方法:
@Override public synchronized void onSelfClosed(ClientHandler handler) { } @Override public void onNewMessageArrived(ClientHandler handler, String msg) { }
并将 客户端构建溢出线程的remove操作迁移到 onSelfClosed() 方法实现内:
@Override public synchronized void onSelfClosed(ClientHandler handler) { clientHandlerList.remove(handler); }
原有的ClientHandler异步线程处理逻辑如下:
// 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
重构后,如下:
// 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client,TCPServer.this);
消息转发:
/** * 转发消息给其他客户端 * @param handler * @param msg */ @Override public void onNewMessageArrived(ClientHandler handler, String msg) { // 打印到屏幕 System.out.println("Received-" + handler.getClientInfo() + ":" + msg); // 转发 forwardingThreadPoolExecutor.execute(()->{ for (ClientHandler clientHandler : clientHandlerList){ if(clientHandler.equals(handler)){ // 跳过自己 continue; } // 向其他客户端投递消息 clientHandler.send(msg); } }); }
(4)基于synchronized 解决多线程操作的安全问题:
由于这里有对 clientHandlerList集合的删除、添加、遍历等操作,这涉及到对所有客户端的操作,在多线程的环境下,默认的List不是线程安全的,所以存在多线程的安全问题。
public void stop() { if (mListener != null) { mListener.exit(); } synchronized (TCPServer.this){ for (ClientHandler clientHandler : clientHandlerList) { clientHandler.exit(); } clientHandlerList.clear(); } // 停止线程池 forwardingThreadPoolExecutor.shutdownNow(); } public synchronized void broadcast(String str) { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.send(str); } } /** * 删除当前消息 * @param handler */ @Override public synchronized void onSelfClosed(ClientHandler handler) { clientHandlerList.remove(handler); } /** * 转发消息给其他客户端 * @param handler * @param msg */ @Override public void onNewMessageArrived(ClientHandler handler, String msg) { // 打印到屏幕 System.out.println("Received-" + handler.getClientInfo() + ":" + msg); // 转发 }
这里加类锁来保证删除操作的线程安全。
关于添加操作的线程安全问题解决如下:
try { // 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client,TCPServer.this); // 读取数据并打印 clientHandler.readToPrint(); // 添加同步处理 synchronized (TCPServer.this) { clientHandlerList.add(clientHandler); } } catch (IOException e) { e.printStackTrace(); System.out.println("客户端连接异常:" + e.getMessage()); }
(5)异步转发
// 转发 clientHandlerCallback.onNewMessageArrived(ClientHandler.this,str);
在ClientHandler.java中,上述代码所在的线程是主要线程,会一直有消息进来,所以不能做同步处理,那样会导致当前线程阻塞,从而导致后面进来的消息无法及时处理。
所以当 onNewMessageArrived()将消息抛出去之后,TCPServer.java的实现要采取异步转发的方式退给其他客户端。创建一个新的单例线程池来做转发的操作:
新增转发线程池:
// 转发线程池 private final ExecutorService forwardingThreadPoolExecutor; public TCPServer(int port) { this.port = port; this.forwardingThreadPoolExecutor = Executors.newSingleThreadExecutor(); }
转发投递消息给其他客户端:
/** * 转发消息给其他客户端 * @param handler * @param msg */ @Override public void onNewMessageArrived(ClientHandler handler, String msg) { // 打印到屏幕 System.out.println("Received-" + handler.getClientInfo() + ":" + msg); // 转发 forwardingThreadPoolExecutor.execute(()->{ synchronized (TCPServer.this){ for (ClientHandler clientHandler : clientHandlerList){ if(clientHandler.equals(handler)){ // 跳过自己 continue; } // 向其他客户端投递消息 clientHandler.send(msg); } } }); }
防止客户端下线后,依旧重复发送的问题:
ClientHandler.java - ClientWriteHandler
/** * 发送到客户端 * @param str */ void send(String str) { // 如果已经发送完成,就返回 if(done){ return; } executorService.execute(new WriteRunnable(str)); }
启动一个Server和三个Client:
如上图,Server启动后,每启动一个Client都会将Client信息加入到Server的list列表,每次由Server进行转发或发送消息给各个终端。
如上图,是Server和3个Client之间的模拟聊天。此时在Server中,当user1发送消息时,Server将消息本地读取之后,再转发给其他的两个非当前client对象。如果是由Server回复的,则当前client为空,会给所有的Client都发送消息。
综上,根据前面的思路、逻辑,实现了一个简单的聊天室消息转发的功能。