目前做了一个接口:邀请用户成为某课程的管理员,于是我感觉有能在用户被邀请之后能有个立马通知他本人的机(类似微博、朋友圈被点赞后就有立马能收到通知一样),于是就闲来没事搞了一套。
⭐推荐阅读:Websocket 协议简介
WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
为什么使用Websocket?
因为普通的http协议一个最大的问题就是:通信只能由客户端发起,服务器响应(半双工)****,而我们希望可以全双工通信。
因此一句话总结就是:建立websocket(以下简称为ws)连接是为了让服务器主动向前端发消息,而无需等待前端的发起请求调用接口。
我们现在有:
用户A
用户B
Springboot
服务器用户A调用接口邀请用户B成为课程成员
MySQL
的数据表:
course_member_invitation
,记录课程邀请记录,其形式如下(忽略时间等列):id | course_id | account_id | admin_id | is_accepted | bind_message_id |
---|---|---|---|---|---|
邀请id | 课程id | 受邀用户id | 邀请人id(因其本身为课程管理员) | 受邀用户是否接受了邀请 | 绑定的消息id |
course_message
,记录消息记录,其形式如下(忽略时间等列):id | type | account_id | source_id | is_read | is_ignored |
---|---|---|---|---|---|
消息id | 消息类型 | 收信人用户id | 发信人用户id | 是否已读 | 收信人是否忽略 |
course_message_type
,记录消息类型,其形式如下id | name | description |
---|---|---|
消息类型id | 消息类型名称 | 描述 |
RabbitMQ
(因不是重点,所以此处暂不讨论,最后一章叙述)
业务步骤主要涉及两个方法addCourseMemberInvitation
与sendMessage
和一个组件CourseMemberInvitationListener
,分别做:
addCourseMemberInvitation
:
用户A
调用接口,邀请用户B
成为某门课程的管理员Springboot
服务器收到请求,将这一请求生成邀请记录、消息记录,写入下表:
course_member_invitation
course_message
sendMessage
处理发送消息的业务。用户A
sendMessage
:
RabbitMQ
中对应的消息队列。CourseMemberInvitationListener
:
用户B
在线,则可发送。在Springboot中配置Websocket
pom.xml
文件<!-- WebSocket相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
Websocket Server
组件配置初步:com.xxxxx.course.webSocket.WebSocketServer
/** * 进行前后端即时通信 * https://blog.csdn.net/qq_33833327/article/details/105415393 * session: https://www.codeleading.com/article/6950456772/ * @author jojo */ @ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求 @Component public class WebSocketServer { /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的 */ private static int onlineCount = 0; /** * concurrent 包的线程安全Set,用来存放每个客户端对应的 myWebSocket对象 * 根据 用户id 来获取对应的 WebSocketServer 示例 */ private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 通信令牌 */ private String token = ""; /** * 用户id */ private String accountId =""; /** * logger */ private static Logger LOGGER = LoggerUtil.getLogger(); /** * 连接建立成功调用的方法 * * @param session * @param token 用户令牌 */ @OnOpen public void onOpen(Session session, @PathParam("uid") String uid) { this.session = session; this.token = token; //设置超时,同httpSession session.setMaxIdleTimeout(3600000); this.accountId = uid; //存储websocket连接,存在内存中,若有同一个用户同时在线,也会存,不会覆盖原有记录 webSocketMap.put(accountId, this); LOGGER.info("webSocketMap -> " + JSON.toJSONString(webSocketMap.toString())); addOnlineCount(); // 在线数 +1 LOGGER.info("有新窗口开始监听:" + accountId + ",当前在线人数为" + getOnlineCount()); try { sendMessage(JSON.toJSONString("连接成功")); } catch (IOException e) { e.printStackTrace(); throw new ApiException("websocket IO异常!!!!"); } } /** * 关闭连接 */ @OnClose public void onClose() { if (webSocketMap.get(this.token) != null) { webSocketMap.remove(this.token); subOnlineCount(); // 人数 -1 LOGGER.info("有一连接关闭,当前在线人数为:" + getOnlineCount()); } } /** * 收到客户端消息后调用的方法 * 这段代码尚未有在使用,可以先不看,在哪天有需求时再改写启用 * @param message 客户端发送过来的消息 * @param session */ @OnMessage public void onMessage(String message, Session session) { LOGGER.info("收到来自用户 [" + this.accountId + "] 的信息:" + message); if (!StringTools.isNullOrEmpty(message)) { try { // 解析发送的报文 JSONObject jsonObject = JSON.parseObject(message); // 追加发送人(防窜改) jsonObject.put("fromUserId", this.accountId); String toUserId = jsonObject.getString("toUserId"); // 传送给对应 toUserId 用户的 WebSocket if (!StringTools.isNullOrEmpty(toUserId) && webSocketMap.containsKey(toUserId)) { webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString()); } else { // 否则不在这个服务器上,发送到 MySQL 或者 Redis LOGGER.info("请求的userId:" + toUserId + "不在该服务器上"); } } catch (Exception e) { e.printStackTrace(); } } } /** * @param session * @param error */ @OnError public void one rror(Session session, Throwable error) { LOGGER.error("用户错误:" + this.accountId + ",原因:" + error); } /** * 实现服务器主动推送 * * @param message 消息字符串 * @throws IOException */ public void sendMessage(String message) throws IOException { //需要使用同步机制,否则多并发时会因阻塞而报错 synchronized(this.session) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { LOGGER.error("发送给用户 ["+this.accountId +"] 的消息出现错误",e.getMessage()); throw e; } } } /** * 点对点发送 * 指定用户id * @param message 消息字符串 * @param userId 目标用户id * @throws IOException */ public static void sendInfo(String message, String userId) throws Exception { Iterator entrys = webSocketMap.entrySet().iterator(); while (entrys.hasNext()) { Map.Entry entry = (Map.Entry) entrys.next(); if (entry.getKey().toString().equals(userId)) { webSocketMap.get(entry.getKey()).sendMessage(message); LOGGER.info("发送消息到用户id为 [" + userId + "] ,消息:" + message); return; } } //错误说明用户没有在线,不用记录log throw new Exception("用户没有在线"); } private static synchronized int getOnlineCount() { return onlineCount; } private static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } private static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
几点说明:
// WebSocket 通知前端 try { //调用WebsocketServer向目标用户推送消息 WebSocketServer.sendInfo(JSON.toJSONString(courseMemberInvitation),courseMemberInvitation.getAccountId().toString()); LOGGER.info("send to "+courseMemberInvitation.getAccountId().toString()); }
@ServerEndpoint
注解:@ServerEndpoint(value = "/ws/{uid}",configurator = WebSocketConfig.class) //响应路径为 /ws/{uid} 的连接请求
这么注解之后,前端只用发起 `ws://xxx.xxx:xxxx/ws/{uid}`即可开启ws连接(或者`wss`协议,增加TLS), 比如前端js代码这么写:
<script> var socket; /* 启动ws连接 */ function openSocket() { if(typeof(WebSocket) == "undefined") { console.log("您的浏览器不支持WebSocket"); }else{ console.log("您的浏览器支持WebSocket"); //实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接 var socketUrl="http://xxx.xxx.xxx:xxxx/ws/"+$("#uid").val(); socketUrl=socketUrl.replace("https","ws").replace("http","ws"); //转换成ws协议 console.log("正在连接:"+socketUrl); if(socket!=null){ socket.close(); socket=null; } socket = new WebSocket(socketUrl); /* websocket 基本方法 */ //打开事件 socket.onopen = function() { console.log(new Date()+"websocket已打开,正在连接..."); //socket.send("这是来自客户端的消息" + location.href + new Date()); }; //获得消息事件 socket.onmessage = function(msg) { console.log(msg.data); //发现消息进入 开始处理前端触发逻辑 }; //关闭事件 socket.onclose = function() { console.log(new Date()+"websocket已关闭,连接失败..."); //重新请求token }; //发生了错误事件 socket.onerror = function() { console.log("websocket连接发生发生了错误"); } } } /* 发送消息 */ function sendMessage() { if(typeof(WebSocket) == "undefined") { console.log("您的浏览器不支持WebSocket"); }else { console.log("您的浏览器支持WebSocket"); console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}'); socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}'); } } </script>
一切看起来很顺利,我只要放个用户id进去,就可以想跟谁通讯就跟谁通讯咯!
但设想一个场景, 我是小明,uid为250,我想找uid为520的小花聊天,理论上我只要发起ws://xxx.xxx:xxxx/ws/250
请求与服务器连接,小花也发起ws://xxx.xxx:xxxx/ws/520
与服务器建立ws连接,我们就能互发消息了吧!
这时候出现了uid为1的小黄,他竟然想挖墙脚!?他竟然学过js,自己发了ws://xxx.xxx:xxxx/ws/520
跟服务器建立ws连接,而小花根本不想和我发消息,所以实际上是小黄冒充了小花,把小花NTR了(实际上人家并不在乎