我们知道,OkHttp是通过Socket连接的,但是到现在为止,我们还没看到和Socket连接相关的操作,本节内容主要就是为了看OkHttp是如何通过ConnectInterceptor做Socket连接的
目录
一、主流程
二、核心代码
三、流程梳理
四、总结
object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { //第一行 val realChain = chain as RealInterceptorChain //第二行 val exchange = realChain.call.initExchange(chain) //第三行 val connectedChain = realChain.copy(exchange = exchange) //第四行 return connectedChain.proceed(realChain.request) } }
看到这个ConnectInterceptor的代码,是不是很简洁,但是并不简单
第一行:获得责任链,这个操作在其他的拦截器中也是有的
第二行:将责任链作为参数,通过RealCall的initExchange获得Exchange对象,这个Exchange对象是干嘛的?好神秘,字面意思也不好理解,我们看下Exchange的注释
/** * Transmits a single HTTP request and a response pair. This layers connection management and events * on [ExchangeCodec], which handles the actual I/O. */
大致翻译:传输单个 HTTP 请求和响应对。这层的连接管理和事件都在ExchangeCodec,它是处理实际 I/O 的
我们可以知道Exchange是通过ExchangeCodec负责管理连接的
第三行:将Exchange对象作为参数传入责任链中,其他参数拷贝,获取新的责任链
第四行:通过新的责任链返回响应数据
通过上面代码,我们知道,核心步骤是第二行的Exchange,接下来,我们以Exchange为入口
先来看initExchange这里,这里负责创建或者在连接池里面找到一个可用的连接
internal fun initExchange(chain: RealInterceptorChain): Exchange { synchronized(this) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) } //在RetryAndFollowUpInterceptor中创建的,前面章节有提到,这里在RealCall中用到 val exchangeFinder = this.exchangeFinder!! //核心代码在这里 //find方法,返回ExchangeCodec,负责请求和响应的编解码 //有两个具体的实现,分别是Http2ExchangeCodec和Http1ExchangeCodec //Http2ExchangeCodec是支持HTTP/2的socket连接,Http1ExchangeCodec是支持HTTP/1.1的 //soceket连接 val codec = exchangeFinder.find(client, chain) //创建Exchange,Exchange是管理连接的角色,通过ExchangeCodec处理I/O操作 val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result synchronized(this) { this.requestBodyOpen = true this.responseBodyOpen = true } if (canceled) throw IOException("Canceled") return result }
上面关键代码已经注释,我主要关注socket连接部分,看下find方法
fun find( client: OkHttpClient, chain: RealInterceptorChain ): ExchangeCodec { try { val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } }
通过findHealthyConnection方法,创建RealConnection,在RealConnection中完成socket连接,tls连接等操作,我们先来看findHealthyConnection方法
findHealthyConnection方法为了找到一个可用的连接,是一个死循环,有两种结果:要么找到一个可用的连接被找到,要么就抛出一个异常
private fun findHealthyConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean, doExtensiveHealthChecks: Boolean ): RealConnection { while (true) { val candidate = findConnection( connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout, pingIntervalMillis = pingIntervalMillis, connectionRetryEnabled = connectionRetryEnabled ) // Confirm that the connection is good. if (candidate.isHealthy(doExtensiveHealthChecks)) { return candidate } // If it isn't, take it out of the pool. candidate.noNewExchanges() // Make sure we have some routes left to try. One example where we may exhaust all the routes // would happen if we made a new connection and it immediately is detected as unhealthy. if (nextRouteToTry != null) continue val routesLeft = routeSelection?.hasNext() ?: true if (routesLeft) continue val routesSelectionLeft = routeSelector?.hasNext() ?: true if (routesSelectionLeft) continue throw IOException("exhausted all routes") } }
重点看下里面的findConnection方法
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { if (call.isCanceled()) throw IOException("Canceled") // Attempt to reuse the connection from the call. //1 val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()! if (callConnection != null) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here // because we already acquired it. if (call.connection != null) { check(toClose == null) return callConnection } // The call's connection was released. toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } // We need a new connection. Give it fresh stats. refusedStreamCount = 0 connectionShutdownCount = 0 otherFailureCount = 0 // Attempt to get a connection from the pool. //2 if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } // Nothing in the pool. Figure out what route we'll try next. val routes: List<Route>? val route: Route if (nextRouteToTry != null) { // Use a route from a preceding coalesced connection. //3 routes = null route = nextRouteToTry!! nextRouteToTry = null } else if (routeSelection != null && routeSelection!!.hasNext()) { // Use a route from an existing route selection. //4 routes = null route = routeSelection!!.next() } else { // Compute a new route selection. This is a blocking operation! //5 第一次执行走这个分支 var localRouteSelector = routeSelector if (localRouteSelector == null) { localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener) this.routeSelector = localRouteSelector } val localRouteSelection = localRouteSelector.next() routeSelection = localRouteSelection routes = localRouteSelection.routes if (call.isCanceled()) throw IOException("Canceled") // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. We have a better chance of matching thanks to connection coalescing. if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } route = localRouteSelection.next() } // Connect. Tell the call about the connecting call so async cancels work. //6 val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) // If we raced another call connecting to this host, coalesce the connections. This makes for 3 // different lookups in the connection pool! if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
ConnectionPool
中查找,看是否有可用连接ConnectionPool
中没有, 使用来自先前合并的连接中查找一个路由这个过程涉及到了RouteSelector,Selection,Route这三个类,我们先看下他们之间的关系
先看下RouteSelector的类注释
/** * Selects routes to connect to an origin server. Each connection requires a choice of proxy server, * IP address, and TLS mode. Connections may also be recycled. */
翻译:选择连接到源服务器的路由。每个连接都需要选择代理服务器, IP 地址和 TLS 模式。连接也可以被回收。
那就很清晰了,是负责选择路由的
Selection可以简单理解一个router路由的集合,上面代码:
val localRouteSelection = localRouteSelector.next()
返回的就是RouteSelection,那接下来就是核心的Route类了,我们继续看一下
class Route( @get:JvmName("address") val address: Address, /** * Returns the [Proxy] of this route. * * **Warning:** This may disagree with [Address.proxy] when it is null. When * the address's proxy is null, the proxy selector is used. */ @get:JvmName("proxy") val proxy: Proxy, @get:JvmName("socketAddress") val socketAddress: InetSocketAddress ) { //省略 }
可以看出,Route是把address,proxy,soketAddress的封装,Selection是List<Route>的一个迭代器,负责遍历route。
RouteSelector的调用路径:
RouteSelector-->>nextProxy-->>resetNextInetSocketAddres
在resetNextInetSocketAddres中,最终获取的ip地址,是通过调用address.dns.lookup,dns是client.dns,是OkHttpclient的构建过程中传递进来Dns.System,在lookup方法里面,调用InetAddress.getAllByName 方法获取到对应域名的IP,也就是默认的Dns实现
val addresses = address.dns.lookup(socketHost)
到这里,完成了DNS解析,也找到了RealConnection,接下来就是socket连接了
直接看RealConnection的connect方法
省略前面 while (true) { try { //如果此路由通过 HTTP 代理隧道传输 HTTPS,则返回 true if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener) if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. break } } else { //socket连接 connectSocket(connectTimeout, readTimeout, call, eventListener) } //各种协议连接在这一步完成;https和非https //https会走TLS协议连接,握手成功后,如果支持HTTP/2,优先走HTTP2; //非Https优先HTTP/2 ,不支持的话,走HTTP/1 establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener) eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol) break } catch (e: IOException) { //省略 } }
这里涉及到一个隧道建立的问题,可以看到在connectTunnel方法,这里先判断是否走隧道连接,如果不需要,就走socket连接,并不是上来就是socket连接;隧道连接的意义就是代理服务器只能转发,不能篡改数据,反之,则相反;
基本流程就到这里了,下面通过一个图来梳理下
以上就是ConnectInterceptor的时序图,涉及到的一些核心类 ;在RealConnection通过newCodec返回ExchangeCodec对象,这个类有两个实现,一个是Http1ExchangeCodec,即HTTP/1.1协议,一个是Http2Exchangecodec,即HTTP/2协议
RealConnection.connectTls是TLS连接,涉及到握手、证书校验等