敞开成长之旅!这是我参加「日新方案 12 月更文挑战」的第37天,点击检查活动概况

文章中源码的OkHttp版别为4.10.0

implementation 'com.squareup.okhttp3:okhttp:4.10.0'

1.简略运用

  • okHttp的简略运用代码如下:
//创立OkHttpClient目标
val client = OkHttpClient().newBuilder().build()
//创立Request目标
val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/list/408/1/json")    //添加恳求的url地址
    .build()                                                    //回来一个Request目标
//建议恳求
fun request() {
    val response = client
        .newCall(request)                                       	//创立一个Call目标
        .enqueue(object : Callback {                            	//调用enqueue办法履行异步恳求
            override fun onFailure(call: Call, e: IOException) {
                TODO("Not yet implemented")
            }
            override fun onResponse(call: Call, response: Response) {
                TODO("Not yet implemented")
            }
        })
    }
  • 工作流程有四步:
    • 创立OkHttpClient目标
    • 创立Request目标
    • 创立Call目标
    • 开端建议恳求,enqueue为异步恳求,execute为同步恳求

2.OkHttpClient目标是怎么创立的

val client = OkHttpClient().newBuilder().build()

OkHttpClient目标的创立是一个典型的建造者形式,先看一下newBuilder办法做了什么,源码如下:

//创立了一个Builder目标
open fun newBuilder(): Builder = Builder(this)
class Builder constructor() {
    //调度器
    internal var dispatcher: Dispatcher = Dispatcher()
    //拦截器调集
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    //网络拦截器调集
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
	...
}

newBuilder中创立了一个Builder目标,Builder目标的结构函数中界说了许多的变量,这儿只保留了3个重要的。

下面看一下build办法做了什么

//这个this就是上面创立的Builder目标
fun build(): OkHttpClient = OkHttpClient(this)

okHttpClient源码如下

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
  @get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()
  @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()
	...
}

经过OkHttpClient目标的源码能够得知,Builder创立的调度器、拦截器终究都会交给OkHttpClient,这是建造者形式的特定。

3.Request目标是怎么创立的

val request = Request.Builder()
    .url("https://wanandroid.com/wxarticle/list/408/1/json")    //添加恳求的url地址
    .build()                                                    //回来一个Request目标
open class Builder {
    //恳求地址
    internal var url: HttpUrl? = null
    //恳求办法
    internal var method: String
    //恳求头
    internal var headers: Headers.Builder
    //恳求体
    internal var body: RequestBody? = null
	...
}

4.创立Call目标

val call = client.newCall(request)
override fun newCall(request: Request): Call {
    //newRealCall中传递了三个参数,第一个参数是OkHttpClient自身,第二个参数request,
    //第三个不用重视
    return RealCall.newRealCall(this, request, forWebSocket = false)
}

接着咱们先来看一下RealCall是什么

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
    ...
}

从源码可知RealCallCall的子类,那么Call又是什么呢,往下看

//调用是预备好要履行的恳求。也能够撤销调用。
//由于该目标表明单个恳求/呼应对(流),因而不能履行两次。
interface Call : Cloneable {
    //回来建议此调用的原始恳求。
    fun request(): Request
    @Throws(IOException::class)
    //同步恳求,当即调用恳求,并阻塞,直到呼应能够处理或呈现错误。
    fun execute(): Response
    //异步恳求,承受回调参数
    fun enqueue(responseCallback: Callback)
    //撤销恳求
    fun cancel()
    //假如此调用已被履行或进入行列,则回来true。多次履行调用是错误的。
    fun isExecuted(): Boolean
    //是否是撤销状况
    fun isCanceled(): Boolean
    //超时时间,
    fun timeout(): Timeout
    //创立一个与此调用相同的新调用,即便该调用现已进入行列或履行,该调用也能够被加入行列或履行。
    public override fun clone(): Call
    fun interface Factory {
        fun newCall(request: Request): Call
    }
}

5.建议恳求

  • 以异步恳求为例进行分析
call.enqueue(object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        println("onFailure:$e")
    }
    override fun onResponse(call: Call, response: Response) {
        println("onResponse:${response.body.toString()}")
    }
})

RealCallCall的子类所以enqueue的详细实现是在RealCall

override fun enqueue(responseCallback: Callback) {
     //检查是否进行了二次恳求
     check(executed.compareAndSet(false, true)) { "Already Executed" }
    //恳求后当即调用,相当于监听恳求的开端事情
    callStart()
    //将恳求交给调度器来决定什么时候开端恳求
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}
private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
}
  • 疑问:client.dispatcher.enqueue是怎么决定什么时候开端恳求的
    • 已知client就是OkHttpClient
    • dispatcher是调度器,先来看一下它的源码
class Dispatcher constructor() {
  	//并发履行的最大恳求数。上面的恳求行列在内存中,等候正在运转的调用完结。
	//假如在调用这个函数时有超越maxRequests的恳求在运转,那么这些恳求将保持在运转状况。
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
        require(maxRequests >= 1) { "max < 1: $maxRequests" }
        synchronized(this) {
            field = maxRequests
        }
        promoteAndExecute()
    }
   	//每台主机可并发履行的最大恳求数。这约束了URL主机名的恳求。
    //留意,对单个IP地址的并发恳求依然可能超越此约束:
    //多个主机名可能同享一个IP地址或经过同一个HTTP代理路由。
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
        require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
        synchronized(this) {
            field = maxRequestsPerHost
        }
        promoteAndExecute()
    }
    //线程安全的单例形式,线程池的获取用于线程调度。
    @get:Synchronized
    @get:JvmName("executorService") val executorService: ExecutorService
    get() {
        if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                       SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
        }
        return executorServiceOrNull!!
    }
    //界说预备发送的行列
    private val readyAsyncCalls = ArrayDeque<AsyncCall>()
    //界说异步发送行列
    private val runningAsyncCalls = ArrayDeque<AsyncCall>()
    //界说同步发送行列
    private val runningSyncCalls = ArrayDeque<RealCall>()
    ...
}
    • 再来看一下dispatcher.enqueue的源码
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //将call目标添加到预备发送的行列,这个call目标来自AsyncCall,稍后再讲
        readyAsyncCalls.add(call)
        //修正AsyncCall,使其同享对同一主机的现有运转调用的AtomicInteger。
        if (!call.get().forWebSocket) {
            //同享一个现已存在的正在运转调用的AtomicInteger
            val existingCall = findExistingCallWithHost(call.host())
            //统计发送数量
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
    }
    //预备发送恳求
    promoteAndExecute()
}
//将契合条件的call从readyAsyncCalls(预备发送的行列)添加到runningAsyncCalls(异步发送行列)中
//并在服务器上履行它们
//不能在同步时调用,由于履行调用能够调用用户代码
//假如调度程序当前正在运转,则为true。
private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))
    //搜集所有需求履行的恳求
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
        val i = readyAsyncCalls.iterator()
        //遍历预备发送的行列
        while (i.hasNext()) {
            val asyncCall = i.next()
            //判别现已发送的恳求是大于等于最大恳求个数64,是则跳出循环
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //判别并发恳求个数是否大于等于最大并发个数5,假如是则跳出循环
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
            //从预备行列中删除
            i.remove()
            //计数+1
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            //将目标添加到异步发送行列中
            runningAsyncCalls.add(asyncCall)
        }
        isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
        val asyncCall = executableCalls[i]
        //提交任务到线程池
        asyncCall.executeOn(executorService)
    }
    return isRunning
}
    • 终究恳求是经过AsyncCallexecuteOn发送出去的,AsyncCall是什么
internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
	...
}
    • 接收了一个回调,承继了Runnable
fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()
    //暂时界说为未履行成功
    var success = false
    try {
        //运用线程履行自身
        executorService.execute(this)
        //履行成功
        success = true
    } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        //发送失利
        responseCallback.onFailure(this@RealCall, ioException)
    } finally {
        if (!success) {
            //发送结束
            client.dispatcher.finished(this) // 中止运转
        }
    }
}
    • AsyncCall将自己加入到线程池,然后线程池敞开线程履行自己的run办法,那么AsyncCall加入了一个怎样的线程池呢?
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
    if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                                                   SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}
    • 这儿界说了一个缓存线程池,具有缓存功用且假如一个恳求履行结束后在60s内再次建议就会复用方才那个线程,提高了功能。
    • 交给线程池后,线程池会敞开线程履行AsyncCall的run办法
override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
        //界说呼应标志位,用于表明恳求是否成功
        var signalledCallback = false
        timeout.enter()
        try {
            //发送恳求并得到结果
            val response = getResponseWithInterceptorChain()
            //过程未出错
            signalledCallback = true
            //回调结果
            responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            if (signalledCallback) {
                // 不要两次发出回调信号!
                Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
            } else {
                //失利回调
                responseCallback.onFailure(this@RealCall, e)
            }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
            client.dispatcher.finished(this)
        }
    }
}

在恳求得到结果后最后会调用finish表明完结,这儿的finish又做了什么呢?

/**
 * 由[AsyncCall.run]用于表明完结。
 */
internal fun finished(call: AsyncCall) {
	call.callsPerHost.decrementAndGet()
	finished(runningAsyncCalls, call)
}
/** 
 * 由[Call.execute]用于表明完结。
 */
internal fun finished(call: RealCall) {
	finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
    	//将完结的任务从行列中删除
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
        	idleCallback = this.idleCallback
    }
    //用于将等候行列中的恳求移入异步行列,并交由线程池履行
    val isRunning = promoteAndExecute()
    //假如没有恳求需求履行,回调搁置callback
    if (!isRunning && idleCallback != null) {
    	idleCallback.run()
    }
}