阅读本文需要 Integer.MAX_VALUE 分钟。
笔者所在的公司主营业务是智能家居,笔者在公司负责的Android端App的开发。关于智能家居,估计现在百分之八九十的童鞋都听过,但真正了解或者使用过的估计就不占多数了。本文不谈行业前景,只谈技术。
为了方便大家更加了解故事的背景,顺便科普一下智能家居,想直奔WebSocket主题的童鞋可以直接第二章节。
智能家居,算是物联网的一个典型应用场景。什么是物联网呢,字面意思就是把众多物体连接起来组成一个网络,英文是Internet of things(IoT)。小到一个你的手机跟蓝牙耳机,大到一个城市的各个角落,究极形态就是“万物互联”。(为什么突然想起万佛朝宗 -_-!)
IoT其实不在意网络的协议,也不在意连接的到底是什么东东,就这种形态本身而言,就是IoT。其实这个概念其实很早就有了,在笔者上大学那会,不怕暴露年龄的说是在09到10年左右就听说过物联网这个词。还记得实验课的GPRS智能抄表系统么,你可能没有觉得多么高大上,不就是水表上插个SIM卡,定期把值发到客户端上么,再也不怕被人上门查水表啦~ 没错,这也是IoT的一种体现。
但这么多年,物联网但一直不温不火。至于为什么最近几年又被提出来呢,智能家居在其中扮演了很重要的作用。另外AI这把火也起到了推波助澜的作用,任何产品都要加上智能二字。所以有了AI + IoT,也就是AIOT。具体就不详述了,以后可以再单开一文详细介绍下物联网。
说回智能家居,智能家居是家庭为单位的物联网,简单说就是你可以通过家里中控或者App控制制家里的灯、插座、监控、电器等等。
文字算个球,一图解千愁,上架构图
简单解释一下,网关的作用是负责连接屋内的所有设备,网关和设备之间一般不会采用HTTP(个别单品除外),而会采用比如zigbee、lora等近场通信协议(或者直接用有线方式更稳定),因为功耗低,比较省电,你想家里如果装了几十个开关面板,总归每个月也能节约十几二十块钱的电费吧。当然最重要的原因还是因为方案也比较成熟。所以使用这类设备之前需要有个“组网”的操作,把设备组到网关上。如果断开了,网关就认为设备离线了。
网关会将设备的状态上报给云平台,云平台就将状态下发给客户端了。同样,客户端控制设备就是个反向的过程。当然如果以后5G普及了,设备直接通过5G网络连接云平台,因为5G功耗低、延迟低、速度快,就可以不需要网关了。
整体的架构还算是比较简单的,当然中间也有很多复杂的逻辑,涉及到比如设备状态、人员、权限、房屋的管理等等这里就不关注了。
在服务器下发设备状态的时候就涉及到了推送。因为智能家居有情景模式的概念,比如回家模式,执行一个请求,咔咔打开十几个灯,所以靠请求的返回值判断状态是不太现实的,只能依靠推送。
由于业务的特殊性,智能家居的推送需要有较高的实时性,比如用户打开了灯(无论在app上打开或者直接打开),app上的灯的状态都需要立即变成开。如果等个3、5秒状态再发生改变,这样的用户体验很不好。
我们最早使用的是某光推送,发现延迟严重,有时需要等十几秒才收到推送,毕竟适用场景毕竟不一样。所以决定自建长连接当推送。经过选型,决定采用本文的主角,WebSocket。
有关WebSocket的协议介绍,推荐观看 WebSocket协议:5分钟从入门到精通。市面上有很多现成的WebSocket连接库,比较著名的有Java-WebSocket,OkHttp也自带WebSocket支持。
最初因为项目内已经接入了OkHttp,所以直接使用了OkHttp。使用的方式很简单,熟悉OkHttp的童鞋应该很懂,
OkHttpClient client = OkHttpClient.Builder().build(); Request request = new Request.Builder().build(); client.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(okhttp3.WebSocket webSocket, Response response) {} @Override public void onMessage(okhttp3.WebSocket webSocket, String text) {} @Override public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {} @Override public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {} }); 复制代码
调用方便,回调状态也很清晰。Java-Websocket也差不多类似,但总体来说有以下几个问题:
无论OkHttp还是Java-WebSocket,都是有心跳机制的。但OkHttp的心跳间隔,也就是pingInterval,在创建client时候就已经固定了,是不支持中途调整的。(Jake大神还在github上回复过相关问题,意思是调用者不需要关注这些。但往往确实有这种需求的啊😢😢😢)
比如应用在前台的时候心跳可能稍微频繁点,但应用出于后台时出于省电的优化,心跳间隔可以设置的稍微长一点。如果需要调整心跳,需要创建新的client,断掉旧的,重新连接。有两种策略:
以上两种策略都需要客户端和服务端先约定好,服务端做额外处理,增加研发成本不说,也有增加出错概率。
通常连接断掉有以下这么几种情况:
关于最后一种情况,可以参见 移动端IM实践:实现Android版微信的智能心跳机制
对于重连,还需要考虑的问题是重连的时间间隔,如果手机网断了,不停地重连同样是失败,所以需要自己制定一个重连次数和时间间隔关系。
总之,重连的逻辑和策略是需要我们自己维护的。
OkHttp中关于状态的回调发生在OkHttp创建的子线程中,根据状态我们需要发起重连。如果这时候我们的应用发生了前后台切换又要创建新连接,各种连接(重连)交织在一起,那真是一团乱麻。需要考虑的问题有:
因为笔者项目内用的是OkHttp,所以最初的解决方案都是针对于OkHttp来做的。大致思路如下:
前面说到过,OkHttp是不支持动态调整心跳的,那OkHttp是怎么维护心跳的呢,我们来分析一下它的源码:
分析源码,我们从调用入口client.newWebSocket开始:
// newWebSocket@OkHttpClient.java /** * 内部使用RealWebSocket类来发起连接 */ @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) { // 注意这里的pingInterval,所以在创建RealWebSocket时就已经固定,没法再修改了。 RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval); webSocket.connect(this); return webSocket; } 复制代码
// RealWebSocket@RealWebSocket.java public RealWebSocket(Request request, WebSocketListener listener, Random random, long pingIntervalMillis) { this.originalRequest = request; this.listener = listener; this.random = random; this.pingIntervalMillis = pingIntervalMillis; // ... // 这里的writerRunnable是用来向服务端发送消息的,在后面详述 this.writerRunnable = new Runnable() { @Override public void run() { try { while (writeOneFrame()) { } } catch (IOException e) { failWebSocket(e, null); } } }; } public void connect(OkHttpClient client) { // WebSocket协议相关的header client = client.newBuilder() .eventListener(EventListener.NONE) .protocols(ONLY_HTTP1) .build(); final Request request = originalRequest.newBuilder() .header("Upgrade", "websocket") .header("Connection", "Upgrade") .header("Sec-WebSocket-Key", key) .header("Sec-WebSocket-Version", "13") .build(); call = Internal.instance.newWebSocketCall(client, request); call.enqueue(new Callback() { @Override public void onResponse(Call call, Response response) { try { checkResponse(response); } catch (ProtocolException e) { failWebSocket(e, response); closeQuietly(response); return; } // Promote the HTTP streams into web socket streams. StreamAllocation streamAllocation = Internal.instance.streamAllocation(call); streamAllocation.noNewStreams(); // Prevent connection pooling! Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation); // Process all web socket messages. try { // 连接成功,调用listern的onOpen listener.onOpen(RealWebSocket.this, response); String name = "OkHttp WebSocket " + request.url().redact(); // 重点1:创建reader和writer,见下文 initReaderAndWriter(name, streams); streamAllocation.connection().socket().setSoTimeout(0); // 重点2:轮询读 loopReader(); } catch (Exception e) { failWebSocket(e, null); } } @Override public void onFailure(Call call, IOException e) { failWebSocket(e, null); } }); } public void initReaderAndWriter(String name, Streams streams) throws IOException { synchronized (this) { this.streams = streams; this.writer = new WebSocketWriter(streams.client, streams.sink, random); // 创建Scheduled线程池 this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false)); if (pingIntervalMillis != 0) { // 利用线程池定期运行PingRunnable,越来越接近真像了 executor.scheduleAtFixedRate( new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS); } if (!messageAndCloseQueue.isEmpty()) { runWriter(); // Send messages that were enqueued before we were connected. } } // 创建reader,用来读取消息 reader = new WebSocketReader(streams.client, streams.source, this); } 复制代码
重点看一下PingRunnable都做了什么
private final class PingRunnable implements Runnable { PingRunnable() { } @Override public void run() { // 发送ping frame writePingFrame(); } } void writePingFrame() { WebSocketWriter writer; int failedPing; synchronized (this) { if (failed) return; writer = this.writer; // 判断是否在等待pong,如果在等待failedPing置为sentPingCount,否则置为-1 failedPing = awaitingPong ? sentPingCount : -1; sentPingCount++; awaitingPong = true; } if (failedPing != -1) { // 运行到此处的条件是上一个pong还没有返回,报错退出 failWebSocket(new SocketTimeoutException("sent ping but didn't receive pong within " + pingIntervalMillis + "ms (after " + (failedPing - 1) + " successful ping/pongs)"), null); return; } try { // 发送一个ping消息 writer.writePing(ByteString.EMPTY); } catch (IOException e) { failWebSocket(e, null); } } 复制代码
那么怎么判断有没有收到pong了,就需要用到reader了。我们来看一下reader里做了什么。
还记得前文重点2标记的loopReader么?
// loopReader@RealWebSocket.java /** Receive frames until there are no more. Invoked only by the reader thread. */ public void loopReader() throws IOException { while (receivedCloseCode == -1) { // This method call results in one or more onRead* methods being called on this thread. // 处理收到的frame reader.processNextFrame(); } } /** * 根据header判断是消息帧还是控制帧,心跳pong属于控制帧 */ void processNextFrame() throws IOException { readHeader(); if (isControlFrame) { readControlFrame(); } else { readMessageFrame(); } } /** * 解析opcode */ private void readControlFrame() throws IOException { // ... switch (opcode) { case OPCODE_CONTROL_PING: frameCallback.onReadPing(controlFrameBuffer.readByteString()); break; case OPCODE_CONTROL_PONG: // 注意:收到pong,触发onReadPing frameCallback.onReadPong(controlFrameBuffer.readByteString()); break; case OPCODE_CONTROL_CLOSE: // ... frameCallback.onReadClose(code, reason); closed = true; break; default: throw new ProtocolException("Unknown control opcode: " + toHexString(opcode)); } } /** * 计数器自增,将awaitingPong状态置为false */ @Override public synchronized void onReadPong(ByteString buffer) { // This API doesn't expose pings. receivedPongCount++; awaitingPong = false; } 复制代码
一路追下来发现,其实收到pong之后,okhttp就是简单地改变awaitingPong标记位的状态,就代表收到了上一个pong消息。
总结一下,okhttp的心跳流程,
我们发现,只要改变线程池执行PingRunable的delayTime,就能达到在不重新创建OkHttpClient的情况下,动态改变ping的速率。
所以可以用反射的方式,shutDown原来的线程池,创建新的线程池,用新的pingInterval执行pingRunnable。
Class clazz = Class.forName("okhttp3.internal.ws.RealWebSocket"); Field field = clazz.getDeclaredField("executor"); field.setAccessible(true); ScheduledExecutorService oldService = (ScheduledExecutorService) field.get(mWebSocket); Class[] innerClasses = Class.forName("okhttp3.internal.ws.RealWebSocket").getDeclaredClasses(); for (Class innerClass : innerClasses) { if ("PingRunnable".equals(innerClass.getSimpleName())) { // 创建新的PingRunnable实例 Constructor constructor = innerClass.getDeclaredConstructor(RealWebSocket.class); constructor.setAccessible(true); Object pingRunnable = constructor.newInstance(mWebSocket); // 创建新的线程池 ScheduledThreadPoolExecutor newService = new ScheduledThreadPoolExecutor(1, Util.threadFactory("ws-ping", false)); newService.scheduleAtFixedRate((Runnable) pingRunnable, interval, interval, unit); field.set(mWebSocket, newService); // shutdown旧的线程池 oldService.shutdown(); } } 复制代码
这样,我们就达成了动态调整心跳的目的。
Java-WebSocket的ping是通过外部调用sendPing()来达到发送ping的目的的,内部没有ping/pong状态机制,所以需要我们自己去维护这个关系。其实可以仿照OkHttp那样,利用定时消息去发送ping,然后解析pong来维护心跳状态。这里就不过多阐述了。
正如前面提过的,断掉重连只要处理好几种断开状态的逻辑即可。好在绝大多数websocket库都分close、error回调,可以区分异常断开和主动断开。
需要自己维护的也就是重连的间隔和重试次数的关系,类似指数退避算法。
针对上文说到的多线程问题,主要的问题点就是发生在同时发生多个连接请求的时候,与其对多线程加各种同步,不如并行该串行。采用类似消息队列的方式,消息的处理放在单独的线程中取做。这样就可以省却了线程的同步,同时也能保证状态的有序性。
其实Android中,利用HandlerThread就可以完美地满足上述两点,最初笔者也确实是利用HandlerThread来做的。后来觉得可以独立于平台,也可以运行在纯Java的平台上,所以还是自己手动实现了。同时自己维护消息队列,也可以更好地自己处理优先级、延迟等。
之前也说过,笔者的项目是基于OkHttp的,但发现状态管理这部分可以单独抽象出来,所以才会有了本文的标题WebSocketGo(以下简称WsGo)。
欢迎star,github.com/Gnepux/WebS…
数据流如下:
Dispatcher就是前文提到的消息队列,就是生产者-消费者模式,维护两个队列,发送command,接收event。
command主要有:CONNECT、DISCONNECT、RECONNECT、CHANGE_PING、SEND
event主要有:OnConnect、onMessage、onSend、onRetry、onDisConnect、onClose
Channel Manager主要负责状态管理。
WebSocket interface可以理解为适配层,负责调用WebSocket库。
Event Listener就是前台的状态回调。
implementation 'com.gnepux:wsgo:1.0.2' // use okhttp implementation 'com.gnepux:wsgo-okwebsocket:1.0.1' // use java websocket implementation 'com.gnepux:wsgo-jwebsocket:1.0.1' 复制代码
WsConfig config = new WsConfig.Builder() .debugMode(true) // true to print log .setUrl(pushUrl) // ws url .setHttpHeaders(headerMap) // http headers .setConnectTimeout(10 * 1000L) // connect timeout .setReadTimeout(10 * 1000L) // read timeout .setWriteTimeout(10 * 1000L) // write timeout .setPingInterval(10 * 1000L) // initial ping interval .setWebSocket(OkWebSocket.create()) // websocket client .setRetryStrategy(retryStrategy) // retry count and delay time strategy .setEventListener(eventListener) // event listener .build(); WsGo.init(config); 复制代码
// 连接 WsGo.getInstance().connect(); // 发送消息 WsGo.getInstance().send("hello from WsGo"); // 断开 WsGo.getInstance().disconnect(1000, "close"); WsGo.getInstance().disconnectNormal("close"); // 改变心跳 WsGo.getInstance().changePingInterval(10, TimeUnit.SECONDS); // 释放 WsGo.getInstance().destroyInstance(); 复制代码
WsGo 已经支持OkHttp and Java WebSocket
// for OkHttp (wsgo-okwebsocket) setWebSocket(OkWebSocket.create()); // for Java WebSocket (wsgo-jwebsocket) setWebSocket(JWebSocket.create()); 复制代码
如果你需要使用其他的WebSocket库或自定义客户端,只需要实现一个WebSocket接口,将对应结果传递给ChannelCallback即可。剩下的连接管理,WsGo会帮你完成。
public interface WebSocket { void connect(WsConfig config, ChannelCallback callback); void reconnect(WsConfig config, ChannelCallback callback); boolean disconnect(int code, String reason); void changePingInterval(long interval, TimeUnit unit); boolean send(String msg); } 复制代码
对于非正常断开,WsGo会自动重连。RetryStrategy指的是重连次数和延时的关系。
WsGo默认有一个DefaultRetryStrategy,如果你需要自己调整,实现RetryStrategy接口里的onRetry方法即可。
public interface RetryStrategy { /** * 重试次数和延迟的关系 * * @param retryCount 已经重试的次数 * @return 延时的时间 */ long onRetry(long retryCount); } 复制代码
添加事件回调。需要注意,回调在WsGo自己创建的一个线程中运行,不在调用线程中。如有必要,需要在会调用手动切换线程。
public interface EventListener { void onConnect(); void onDisConnect(Throwable throwable); void onClose(int code, String reason); void onMessage(String text); void onReconnect(long retryCount, long delayMillSec); void onSend(String text, boolean success); } 复制代码
目前WsGo没有与手机网络的变化通知关联(即断网可以收到回调,网络恢复不会自动重连),因为笔者觉得这部分不属于WebSocket自身的状态管理范畴,而且已经不属于平台无关了。如有有这方便需求,需要自己从外部发起连接。在项目中简单地封装一下就ok啦~
本文稍显长,从最初的智能家居的场景开始,介绍到websocket状态管理现存的问题,然后给出了笔者的解决方案,最后是通用的状态管理框架WebSocketGo。也算是笔者在工作中的一点思考和总结。其实细想,不光WebSocket,所有的长连接应该都会面临相同的问题。但思考的角度和解决的出发点应该都是一样的。