根据上一篇 核心流程 ,我们大致知道了okhttp的内部运转,但是对于网络请求的缓存、连接复用以及网络监控的功能的实现,我们是只知其然,而不知其所以然,我们只知道是负责各个功能的拦截器帮助我们完成了底层的任务,但是却不知道原理是什么。今天跟着笔者继续学习Okhttp各个拦截器的原理实现。限于篇幅,在文章中,笔者只对相关拦截器的intercept方法进行了分析,但是并没有展开,有兴趣的同学可以自己去看。
我们知道Okhttp的核心功能的实现就是一套由拦截器组成的责任链机制,而这个责任链的入口就是getResponseWithInterceptorChain方法,我们再次贴出它的源码:
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
...
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
...
} finally {
...
}
}
上面拦截器的添加顺序实际上就是后面它们的执行顺序。
可以看到第一个添加的是应用拦截器,这个是在我们创建request的时候添加的,这个过程一般用于添加一些自定义的header、参数、网关接入等信息。所以这边我们就简单介绍一下
我说的所谓的工作原理其实就是拦截器的intercept方法是怎么运行的,当然在最后的请求拦截器(CallServerInterceptor)会有所不同,因为不需要向下递归了,而是开始回溯。
(实际上整个拦截链再笔者看来就是一个递归的过程。)
给大家贴一个interceptor的源码,它实际上是一个接口,然后内部有一个intercept方法还有一个接口Chain。这个Chain的实现类是RealInterceptorChain.java 这个源码就不贴了,没记错上篇文章给了。
fun interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response
companion object {
inline operator fun invoke(crossinline block: (chain: Chain) -> Response): Interceptor =
Interceptor { block(it) }
}
interface Chain {
fun request(): Request
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
实际上在拦截器内部,我们执行intercept方法,内部的逻辑大概分为 request部分的逻辑、调用Chain的proceed方法进入下一个拦截器,然后根据回溯得到的reponse 做处理。最后再返回response。
override fun intercept(chain : inteceptor.Chain ) :Response{
val request = chain.request();
// Request阶段,该拦截器在Request阶段负责做的事情
// 调用RealInterceptorChain.proceed(),其实是在递归调用下一个拦截器的intercept()方法
response = chain.proceed(request, streamAllocation, null, null);
// Response阶段,完成了该拦截器在Response阶段负责做的事情,然后返回到上一层的拦截器。
return response;
}
应用拦截器最先被触发,并且在一次请求中只会被触发一次。
应用拦截器的作用通常由程序员设置,我们继承Interceptor类重写它的intercept方法,当我们在构建request时将该拦截器加入,那么在后续的拦截链中就会执行 我们在内部intercept方法中写的逻辑了,这里我就不给大家示例了,想要了解的可以自行查阅。
如果我们在构建request时没有加入应用拦截器,那么实际上第一次执行的就是RetryAndFollowUpInterceptor了,它的作用看名称也能明白 重试与重定向,重定向是什么意思呢?我们学过计算机网络知道,当我们访问一个站点时,该站点可能不能处理,需要浏览器重新向新的站点发起请求,并且返回3** 状态码 以及在返回的数据中的location字段中放入需要重定向的url。所以当本次请求返回数据时回溯到该层时,就进行一个判断如果location的字段值不为空,那么就会进行一次重定向请求。具体的话我们来看源码:
#RetryAndFollowUpInterceptor
//拦截器的intercept方法
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newRoutePlanner = true
var recoveredFailures = listOf<IOException>()
//是个死循环,不过没关系,内部有限制机制
while (true) {
//在ConnectionInterceptor中会用到
//第一次进入 循环时 newRoutePlanner为true,此时会创建一个ExchangeFinder对象,负责获取一条安全可靠的链接来携带请求
call.enterNetworkInterceptorExchange(request, newRoutePlanner, chain)
var response: Response
var closeActiveExchange = true
//如果本次请求被取消,那么抛出错误,跳出循环
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
//在这里我们调用了责任链的proceed方法
response = realChain.proceed(request)
newRoutePlanner = true
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
//进入recover方法,在该方法中如果错误是可以恢复的,那么将返回true进行重试,否则返回false,直接抛出错误。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
//由于是重试,将该参数设置为false。
newRoutePlanner = false
continue
}
//在这里清空response的部分附加是因为本次可能是重试或者重定向,后续由下游的拦截器重新配置。
// Clear out downstream interceptor's additional request headers, cookies, etc.
response = response.newBuilder()
.request(request)
.priorResponse(priorResponse?.stripBody())
.build()
//该对象否则编码和解码
val exchange = call.interceptorScopedExchange
//在这个方法中,会根据response中的字段,判断是否进行重定向,若重定向则会返回新的request,否则返回null
val followUp = followUpRequest(response, exchange)
//不重定向直接返回
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
//一次性请求,不再重试或者重定向
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body.closeQuietly()
//重定向次数超过一定次数就会抛出错误
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
至于我为什么这么称呼它,我们先来看看官方给的注解
从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续调用网络。最后,它从网络响应构建用户响应。
洋文版咱也贴一下 :
Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.
有没有对于这个解释的结构很熟悉,没错,在开头,笔者就描述了大多数拦截器在其intercept方法中的逻辑结构,就是: 第一步处理request,第二步进入下一个拦截器,第三步根据第二步获得的response进行处理,最后返回response给上一层。
话不多说,上源码:
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//首先获得request
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
//在请求头部添加字段 Content-Type 也就是body的编码类型
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
//如果出现contentLength不为 -1 则需要设置content-length字段,界定是否传输完成
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
//分块传输,所以我们不需要Content-Length来界定是否传输完成。
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
//添加Host头部字段
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
//添加连接类型
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
//对非分片传输且未设置压缩方式的请求,这里默认使用gzip压缩。
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
//添加cookie 用来跟踪用户身份
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//用户代理
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkRequest = requestBuilder.build()
//进入下一个拦截器,并获取响应数据
val networkResponse = chain.proceed(networkRequest)
//cookie管理
cookieJar.receiveHeaders(networkRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(networkRequest)
//解压缩、去头部字段
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
看完源码之后,我们可以清晰的知道,所谓的桥梁就是在请求发送之前,对请求进行加头部字段,发送之后获得response对响应进行拆头部字段,cookie管理以及解压缩。
利用好缓存其实就可以为我们节约大量的资源消耗,也就是说可以避免发送大量的重复请求,如果击中缓存,我们就可以直接使用本地缓存进行加载,而不去进行网络请求,这也是为什么Okhttp会在这里设置缓存拦截器的一个重要原因。现在我们来看看源码,看缓存拦截器到底是怎么工作的吧:
源码 :
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//获得缓存策略,并判断是否使用网络。
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest // null 表示不允许使用网络
val cacheResponse = strategy.cacheResponse // null表示未击中缓存
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
//缓存候选人不适用,则关闭它。
cacheCandidate.body.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 如果我们被禁止使用网络并且缓存不足,则失败。
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// If we don't need the network, we're done.
//如果我们不需要网络,并且缓存可用,那么我们就返回缓存。
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(cacheResponse.stripBody())
.build().also {
listener.cacheHit(call, it)
}
}
//到这一步了,其实缓存可能命中(协商缓存)也可能未命中,但是都得通过网络去判断。
if (cacheResponse != null) {
//缓存命中
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
//缓存未命中
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
//进入下一层 并获取网络响应
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
//关闭缓存,防止泄露
cacheCandidate.body.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
//上面命中缓存
if (cacheResponse != null) {
//协商缓存成功,服务端返回的是304重定向,所以在这里利用缓存构建响应数据
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(cacheResponse.stripBody())
.networkResponse(networkResponse.stripBody())
.build()
networkResponse.body.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
//在剥离Content-Enconding 字段前更新缓存
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body.closeQuietly()
}
}
//利用网络返回数据,构建响应数据
val response = networkResponse!!.newBuilder()
.cacheResponse(cacheResponse?.stripBody())
.networkResponse(networkResponse.stripBody())
.build()
//如果缓存对象不为空,则设置缓存
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
//提交数据给缓存,同时记录缓存未命中
listener.cacheMiss(call)
}
}
}
//只缓存GET请求的数据
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
其实,okhttp就是将整个网络请求过程进行了分层,所以对于计算机基础学好是真的很重要,上层永远在变,但是底层基础想要改变则需要长时间的演变。所以请注重基础!
这一层是完成网络请求中,连接的建立,比如什么TCP握手,TLS/SSL握手这些操作的。该层的源码看起来是很少的,但是实际上完成的的很多,也比较难分析,所以笔者不会像上面的拦截器一样只是对其Intercept方法进行简单分析就ok了,笔者会适当展开,当然限于水平,也只能适当!
咱们看源码(很少):
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
// 别看下面这一行代码,分析起来,可以重新写一篇了!!!
val exchange = realChain.call.initExchange(realChain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
进入到initExchange方法内部看看:
在这之前,笔者介绍一下 Exchange 和 ExchangeFinder以及codec
Exchange :一次数据交换、可以理解为一条连接客户端和服务端的连接。
ExchangeFinder 可以理解为寻找这条连接的工具,寻找一条可用的连接以及相应的一些信息。
codec:编解码器。
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...
val exchangeFinder = this.exchangeFinder!!
//寻找可用的连接
val connection = exchangeFinder.find()
//获取格式兼容的编码器 http1.1/2.0
val codec = connection.newCodec(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
那么实际上val connection = exchangeFinder.find() 这行代码 才是重点!!!!它要寻找一条可用的连接。我们知道okhttp是有连接池的,所以在这一步,我们会优先利用可复用的连接,实在不行才会创建。还记得在RetryAndFollowInterceptor笔者说过 exchangeFinder的初始化吗?我们在这就用到了。实际上它按照客户端是否支持快速回退有两种不同的实现,感兴趣的可以去看下。那么这里笔者就只能写到这个层面了。外链。
这个拦截器也是由我们自己添加的,主要的作用是用于监测网络。具体就不展开写了。自行了解。
在ConnectionInterceptor完成TCP、SSL/TLS、获取编码解码器 等操作之后,就会交由最后一个拦截器CallServerInterceptor去发送请求与服务器交互数据。
override fun intercept(chain: Interceptor.Chain): Response {
...
try {
//使用对应的编码器进行http请求头的构建
exchange.writeRequestHeaders(request)
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
} catch (e: IOException) {
if (e is ConnectionShutdownException) {
throw e // No request was sent so there's no response to read.
}
if (!exchange.hasFailure) {
throw e // Don't attempt to read the response; we failed to send the request.
}
sendRequestException = e
}
try {
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the
// actual response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.stripBody()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body.contentLength() > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body.contentLength()}")
}
return response
} catch (e: IOException) {
if (sendRequestException != null) {
sendRequestException.addSuppressed(e)
throw sendRequestException
}
throw e
}
}