关于 Websocket 传输的消息类型, 允许的参数包括以下三类
因此对于不同的消息类型, 可以有不同参数类型的 onMessage() 方法, 分别用于处理不同格式的内容, 对于传输文件, 需要使用 ByteBuffer 类型的参数
void onMessage(ByteBuffer byteBuffer, Session session)
在处理过程中和普通的文件传输是一样的, 需要将文件分片传输, 并约定合适的消息头用于判断文件传输的阶段, 在服务端根据不同的阶段进行文件创建, 写入和结束.
与前一篇项目结构相同, 只需要修改 SocketServer 和 SocketClient
完整示例代码: https://github.com/MiltonLai/websocket-demos/tree/main/ws-demo02
增加了 onMessage(ByteBuffer byteBuffer, Session session)
方法用于处理二进制消息, 在方法中
@Component @ServerEndpoint("/websocket/server/{sessionId}") public class SocketServer { //... @OnMessage public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException { if (byteBuffer.limit() == 0) { return; } byte mark = byteBuffer.get(0); if (mark == 1) { log.info("mark 1"); byteBuffer.get(); String info = new String( byteBuffer.array(), byteBuffer.position(), byteBuffer.limit() - byteBuffer.position()); FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class); byteChannel = Files.newByteChannel( Path.of("D:/data/" + fileInfo.getFileName()), new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE}); //ack ByteBuffer buffer = ByteBuffer.allocate(4096); buffer.put((byte) 2); buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8)); buffer.flip(); session.getBasicRemote().sendBinary(buffer); } else if (mark == 3) { log.info("mark 3"); byteBuffer.get(); byteChannel.write(byteBuffer); } else if (mark == 5) { log.info("mark 5"); //ack ByteBuffer buffer = ByteBuffer.allocate(4096); buffer.clear(); buffer.put((byte) 6); buffer.put("receive end".getBytes(StandardCharsets.UTF_8)); buffer.flip(); session.getBasicRemote().sendBinary(buffer); byteChannel.close(); byteChannel = null; } } //... public static class FileInfo implements Serializable { private String fileName; private long fileSize; public String getFileName() {return fileName;} public void setFileName(String fileName) {this.fileName = fileName;} public long getFileSize() {return fileSize;} public void setFileSize(long fileSize) {this.fileSize = fileSize;} } }
client 测试类, 连接后可以在命令行向 server 发送消息
首先是消息处理中增加了 void onMessage(ByteBuffer bytes)
, 这个是用来接收服务端回传的ACK的, 根据第一个字节, 判断服务端的处理结果. 这里使用了一个 condition.notify()
用来通知发送线程继续发送
其次是消息发送中, 用输入的1
触发文件发送. 文件发送在 void sendFile(WebSocketClient webSocketClient, Object condition)
方法中进行, 通过一个 condition 对象, 在文件开始传输和结束传输时控制线程的暂停和继续. byteBuffer.flip()
用于控制 byteBuffer 从写状态变为读状态, 用于发送. flip is used to flip the ByteBuffer from "reading from I/O" (putting) to "writing to I/O" (getting).
public class SocketClient { private static final Logger log = LoggerFactory.getLogger(SocketClient.class); public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { Object condition = new Object(); WebSocketClient wsClient = new WebSocketClient(new URI("ws://127.0.0.1:8763/websocket/server/10001")) { //... @Override public void onMessage(ByteBuffer bytes) { //To overwrite byte mark = bytes.get(0); if (mark == 2) { synchronized (condition) { condition.notify(); } log.info("receive ack for file info"); } else if (mark == 6){ synchronized (condition) { condition.notify(); } log.info("receive ack for file end"); } } @Override public void onClose(int i, String s, boolean b) { log.info("On close: {}, {}, {}", i, s, b); } @Override public void onError(Exception e) { log.error("On error: {}", e.getMessage()); } }; wsClient.connect(); log.info("Connecting ..."); while (!ReadyState.OPEN.equals(wsClient.getReadyState())) { } log.info("Connected"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String line = scanner.next(); if ("1".equals(line)) sendFile(wsClient, condition); else wsClient.send(line); } } public static void sendFile(WebSocketClient webSocketClient, Object condition){ new Thread(() -> { try { SeekableByteChannel byteChannel = Files.newByteChannel( Path.of("/home/milton/Backup/linux/apache-tomcat-8.5.58.tar.gz"), new StandardOpenOption[]{StandardOpenOption.READ}); ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); byteBuffer.put((byte)1); String info = "{\"fileName\": \"greproto.tar.gz\", \"fileSize\":"+byteChannel.size()+"}"; byteBuffer.put(info.getBytes(StandardCharsets.UTF_8)); byteBuffer.flip(); webSocketClient.send(byteBuffer); synchronized (condition) { condition.wait(); } byteBuffer.clear(); byteBuffer.put((byte)3); while (byteChannel.read(byteBuffer) > 0) { byteBuffer.flip(); webSocketClient.send(byteBuffer); byteBuffer.clear(); byteBuffer.put((byte)3); } byteBuffer.clear(); byteBuffer.put((byte)5); byteBuffer.put("end".getBytes(StandardCharsets.UTF_8)); byteBuffer.flip(); webSocketClient.send(byteBuffer); synchronized (condition) { condition.wait(); } byteChannel.close(); } catch (InterruptedException|IOException e) { log.error(e.getMessage(), e); } }).start(); } }
示例是一个普通的 Spring Boot jar项目, 可以通过mvn clean package进行编译, 再通过java -jar ws-demo01.jar运行, 启动后工作在8763端口
将 SocketClient.java 中的文件路径 D:/WorkJava/tmp/greproto.tar.gz
换成自己本地的文件路径, 运行 SocketClient, 可以观察到服务端接收到的消息. 如果输入1
并回车, 就会触发客户端往服务端传输文件