本文基于OkHttp 4.3.1源码分析 OkHttp - 官方地址 OkHttp - GitHub代码地址
本篇主要从OkHttp的两个请求示例开始,对Okhttp的初始化工作,和请求从构造、分发到执行的流程进行源码分析介绍
OkHttp整体流程(本文覆盖红色部分)
本文覆盖代码流程图
使用OkHttp一般流程,初始化一个共享OkHttpClient,构建Request,然后OkHttpClient根据Request构建Call,接着执行call,最后进行Response处理
public class GetExample { OkHttpClient client = new OkHttpClient(); // 构建共享的Client String run(String url) throws IOException { Request request = new Request.Builder() // 构建request .url(url) .build(); // 构建Call,执行 try (Response response = client.newCall(request).execute()) { return response.body().string(); } } public static void main(String[] args) throws IOException { GetExample example = new GetExample(); String response = example.run("https://raw.github.com/square/okhttp/master/README.md"); System.out.println(response); } } 复制代码
public final class AsynchronousGet { private final OkHttpClient client = new OkHttpClient();// 构建共享Client public void run() throws Exception { // 构建Request Request request = new Request.Builder() .url("http://publicobject.com/helloworld.txt") .build(); // 构建Call,执行,回调接受处理 client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { e.printStackTrace(); } @Override public void onResponse(Call call, Response response) throws IOException { try (ResponseBody responseBody = response.body()) { if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); Headers responseHeaders = response.headers(); for (int i = 0, size = responseHeaders.size(); i < size; i++) { System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i)); } System.out.println(responseBody.string()); } } }); } public static void main(String... args) throws Exception { new AsynchronousGet().run(); } } 复制代码
/* * Factory for [calls][Call], which can be used to send HTTP requests and read their responses. * ## OkHttpClients Should Be Shared * ## Customize Your Client With newBuilder() * ## Shutdown Isn't Necessary * / open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { // 3. 成员变量初始化 ... // 1. 内部无参数构造函数 constructor() : this(Builder()) // 4. OkHttpClient函数初始化 init { // 初始化证书和拦截器等判断逻辑 ... } // 2. Builder构造函数 class Builder constructor() { ... } } 复制代码
Builder模式,提供自定义配置化能力,同时有一份无需关心的默认配置
class Builder constructor() { internal var dispatcher: Dispatcher = Dispatcher() internal var connectionPool: ConnectionPool = ConnectionPool() internal val interceptors: MutableList<Interceptor> = mutableListOf() internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory() internal var retryOnConnectionFailure = true internal var authenticator: Authenticator = Authenticator.NONE internal var followRedirects = true internal var followSslRedirects = true internal var cookieJar: CookieJar = CookieJar.NO_COOKIES internal var cache: Cache? = null internal var dns: Dns = Dns.SYSTEM internal var proxy: Proxy? = null internal var proxySelector: ProxySelector? = null internal var proxyAuthenticator: Authenticator = Authenticator.NONE internal var socketFactory: SocketFactory = SocketFactory.getDefault() internal var sslSocketFactoryOrNull: SSLSocketFactory? = null internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT internal var certificateChainCleaner: CertificateChainCleaner? = null internal var callTimeout = 0 internal var connectTimeout = 10_000 internal var readTimeout = 10_000 internal var writeTimeout = 10_000 internal var pingInterval = 0 } 复制代码
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher @get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool @get:JvmName("interceptors") val interceptors: List<Interceptor> = builder.interceptors.toImmutableList() @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> = builder.networkInterceptors.toImmutableList() @get:JvmName("eventListenerFactory") val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory @get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure @get:JvmName("cookieJar") val cookieJar: CookieJar = builder.cookieJar @get:JvmName("cache") val cache: Cache? = builder.cache .... 复制代码
Request 对应HTTP请求中的Request,OkHttp依旧是Builder构建模式构建Request
支持url、method、headers、body的配置
open class Builder { internal var url: HttpUrl? = null internal var method: String internal var headers: Headers.Builder internal var body: RequestBody? = null // 构造Request open fun build(): Request { return Request( checkNotNull(url) { "url == null" }, method, headers.build(), body, tags.toImmutableMap() ) } } 复制代码
通过Request.Builder构造Request实例
class Request internal constructor( @get:JvmName("url") val url: HttpUrl, @get:JvmName("method") val method: String, @get:JvmName("headers") val headers: Headers, @get:JvmName("body") val body: RequestBody?, internal val tags: Map<Class<*>, Any> ) { ... } 复制代码
OkHttpClient 实现了Call.Factory,作为Call的构造工厂类
override fun newCall(request: Request): Call { // 执行 RealCall的构造call方法 return RealCall.newRealCall(this, request, forWebSocket = false) } 复制代码
构造Call真正方法,另外创建了一个 发射器,接下来先了解下Call
companion object { fun newRealCall( client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean ): RealCall { return RealCall(client, originalRequest, forWebSocket).apply { // 构造了一个 发射器,它是应用层和网络层交互的桥梁,后面会着重介绍 transmitter = Transmitter(client, this) } } } 复制代码
Call定义为一个准备好执行的请求,它是能被取消的,且它只能被执行一次(http请求也是一次执行) 包括一个核心成员变量 Transmitter ,两个重要方法 execute(同步) 和 enqueue(异步)
internal class RealCall private constructor( val client: OkHttpClient, /** The application's original request unadulterated by redirects or auth headers. */ val originalRequest: Request, val forWebSocket: Boolean ) : Call { // 发射机 private lateinit var transmitter: Transmitter // 同步请求执行方法 override fun execute(): Response { ... } // 异步请求执行方法 override fun enqueue(responseCallback: Callback) { ... }} // 取消请求 override fun cancel() { transmitter.cancel() } 复制代码
override fun execute(): Response { // 检查是否已经执行 synchronized(this) { check(!executed) { "Already Executed" } executed = true } // 过期时间逻辑,如果配置了会有WatchDog线程进行Watch然后执行退出逻辑 transmitter.timeoutEnter() // 通知start,最后会通过 EventListener 发出时间,主要目的是收集 metrics events transmitter.callStart() try { // 调用client的dispatcher分发器执行call(将call加入同步call队列) client.dispatcher.executed(this) // 通过拦截器责任链模式进行请求和返回处理等一系类逻辑 return getResponseWithInterceptorChain() } finally { client.dispatcher.finished(this) } } 复制代码
fun getResponseWithInterceptorChain(): Response { // 构建所有的拦截器 val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors // client配置的拦截器 interceptors += RetryAndFollowUpInterceptor(client) // 重试机制拦截器 interceptors += BridgeInterceptor(client.cookieJar) // 请求和返回桥(http信息配置和解析)拦截器 interceptors += CacheInterceptor(client.cache) // 缓存拦截 interceptors += ConnectInterceptor // 连接拦截器 if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) // 执行拦截器 // 拦截器核心处理类 val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis) var calledNoMoreExchanges = false try { // 链式调用 val response = chain.proceed(originalRequest) if (transmitter.isCanceled) { // 处理取消 response.closeQuietly() throw IOException("Canceled") } return response //返回请求结果 } catch (e: IOException) { calledNoMoreExchanges = true throw transmitter.noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null) } } } 复制代码
override fun enqueue(responseCallback: Callback) { synchronized(this) { check(!executed) { "Already Executed" } executed = true } transmitter.callStart() // 构造 AsyncCall ,接着分发器入队操作 client.dispatcher.enqueue(AsyncCall(responseCallback)) } 复制代码
class Dispatcher constructor() { // 默认最大请求数 64 @get:Synchronized var maxRequests = 64 // 默认最大并发Host 5 @get:Synchronized var maxRequestsPerHost = 5 // 线程池执行器 默认创建可缓存线程池 @get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("OkHttp Dispatcher", false)) } return executorServiceOrNull!! } /** 准备好的异步Call对列 */ private val readyAsyncCalls = ArrayDeque<AsyncCall>() /** 执行中异步call对列 */ private val runningAsyncCalls = ArrayDeque<AsyncCall>() /** 同步call对列 */ private val runningSyncCalls = ArrayDeque<RealCall>() // 异步Call 入队操作 internal fun enqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call) // same host情况下的复用逻辑处理 if (!call.get().forWebSocket) { val existingCall = findExistingCallWithHost(call.host()) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() // 准备线程池执行器,及执行 } // 执行Call private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { // 遍历所有的ready 异步 call val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost().incrementAndGet() executableCalls.add(asyncCall) // 加入此次执行队列缓存 runningAsyncCalls.add(asyncCall) // 加入正在执行队列 } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) // 将线程执行器传入call,在Call中进行执行 } return isRunning } /** 同步call,添加到队列 */ @Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) } } 复制代码
fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { // 线程池 执行器执行 executorService.execute(this) success = true } } // 执行方法 override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false transmitter.timeoutEnter() try { // 同“同步请求” 最终执行拦截器链式调用 val response = getResponseWithInterceptorChain() signalledCallback = true // 响应 回调 responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { ... } finally { client.dispatcher.finished(this) } } } 复制代码