OkHttp分为异步恳求和同步恳求两部分

同步恳求

call.execute()

execute代码如下:

#okhttp3.internal.connection.RealCall

override fun execute(): Response {
  check(executed.compareAndSet(false, true)) { "Already Executed" }
   // 1.敞开超时检测
timeout.enter()
  callStart()
  try {
      // 2. 将恳求增加在同步恳求行列
    client.dispatcher.executed(this)
    //3. 调用getResponseWithInterceptorChain获取Response
    return getResponseWithInterceptorChain()
  } finally {
    client.dispatcher.finished(this)
  }
}
  1. 敞开超时检测

  2. 将恳求增加在同步恳求行列

  3. 调用getResponseWithInterceptorChain获取Response

异步恳求

  1. okHttpClient.newCall回来值

首要,咱们看一下okHttpClient.newCall回来的是什么

newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

newCall回来一个RealCall目标

  1. call.enqueue的源码

override fun enqueue(responseCallback: Callback) {
  check(executed.compareAndSet(false, true)) { "Already Executed" }
   //1.  
callStart()
  //2.
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}
  1. callStart()办法,内部调用的EventListener.callStart,是一个全局监听恳求进程的办法
  2. 调用dispatcherenqueue办法,从字面理解为,恳求需要排队履行,这里的恳求是AsyncCall目标,AsyncCall中包括回来结果的监听responseCallback,也便是咱们在调用call.enqueue时传入的callBack

AsyncCall下面会进行详细介绍

  1. Dispatcher的enqueue办法

internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
      //1. 
readyAsyncCalls.add(call)
    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.call.forWebSocket) {
      val existingCall = findExistingCallWithHost(call.host)
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  //2. 
promoteAndExecute()
}

运用同步锁*synchronized*防止线程同步问题:

  1. AsyncCall目标参加readyAsyncCalls这个行列中

  2. promoteAndExecute(),推动并履行,咱们看一下详细干了什么

  3. Dispatcher的promoteAndExecute办法

经过注释,咱们能够很清楚的知道该函数的作用:

将合法的恳求从readyAsyncCalls 行列里移到runningAsyncCalls行列里,而且用ExecutorService履行他们

 /**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
*  @return  true if the dispatcher is currently running calls.
*/
private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()
   //1. 
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
       // 从readyAsyncCalls中移除
      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      //2.  增加到executableCalls/runningAsyncCalls 行列中
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
// Avoid resubmitting if we can't logically progress
  // particularly because RealCall handles a RejectedExecutionException
  // by executing on the same thread.
  if (executorService.isShutdown) {
      ...
  } else {
    for (i in 0 until executableCalls.size) {
    // 3. 
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
  }
  return isRunning
}
  1. 声明一个AsyncCall的集合
  2. 将每个AsynCallreadyAsyncCalls中移除,增加到executableCalls/runningAsyncCalls 行列中
  3. executableCalls中取出AsynCall,调用AsyncCall.executeOn办法,并传入executorService

executorServiceExecutorService目标:

val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
  }

ExecutorService是一个没有核心线程,非核心线程数为Int最大值,线程闲置60s会被回收的线程池

  1. AsyncCall.executeOn

fun executeOn(executorService: ExecutorService) {
  client.dispatcher.assertThreadDoesntHoldLock()
  var success = false
  try {
      //1. 
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    failRejected(e)
  } finally {
    if (!success) {
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}

运用传入的executorService线程池履行AsyncCall使命,而AsyncCall是一个Runnable目标,直接查看其run办法

  1. AsyncCall.run

override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
    // 1. 
    timeout.enter()
    try {
    // 2.
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      // 3. 
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
      } else {
      //4. 
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        canceledException.addSuppressed(t)
        // 5. 
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      client.dispatcher.finished(this)
    }
  }
}
  1. 调用timeout.enter()敞开超时检测

  2. 调用getResponseWithInterceptorChain获取呼应结果

  3. 获取到response后调用responseCallback.response办法

  4. 调用responseCallback.onFailure,失告诉败结果

getResponseWithInterceptorChain源码分析

blog.csdn.net/xuwb123xuwb…

internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    interceptors += client.networkInterceptors
  }
  interceptors += CallServerInterceptor(forWebSocket)
  val chain = RealInterceptorChain(
    call = this,
    interceptors = interceptors,
    index = 0,
    exchange = null,
    request = originalRequest,
    connectTimeoutMillis = client.connectTimeoutMillis,
    readTimeoutMillis = client.readTimeoutMillis,
    writeTimeoutMillis = client.writeTimeoutMillis
  )
  var calledNoMoreExchanges = false
  try {
    val response = chain.proceed(originalRequest)
    if (isCanceled()) {
      response.closeQuietly()
      throw IOException("Canceled")
    }
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      noMoreExchanges(null)
    }
  }
}

简单来说,getResponseWithInterceptorChain办法中,便是构建一个链式拦截器,然后建议恳求,整个进程是经过拦截器的链式调用完结的,即先从左到右调用,再从右到左回来结果

暂时无法在飞书文档外展现此内容

  • RetryAndFollowUpInterceptor:担任失利重试和重定向

  • BridgeInterceptor:担任把用户恳求转换为发送到服务器的恳求,并把服务器的呼应转化为用户需要的呼应

  • CacheInterceptor:担任读取缓存、更新缓存

  • ConnectInterceptor:担任和服务器树立衔接

  • CallServerInterceptor:担任向服务器发送数据,从服务器读取呼应数据

addInterceptor 和 addNetworkInterceptor区别

getResponseWithInterceptorChain办法中能够看到:拦截器是经过职责链模式调用的,

addInterceptor增加的Interceptor会被增加到职责链的初步,因此:

  • 任何情况下都会首要履行

  • 不受重定向和重试机制的影响,即自定义拦截器只会被履行一次

addNetworkInterceptor增加的拦截器处在ConnectInterceptor之后,因此:

  • 如果无网衔接失利,该拦截器不会被调用
  • 重定向和重试几回,该拦截器就会被调用几回
  • 答应操作中心呼应,比方当恳求操作发生重定向(拦截被履行了两次)或许重试等。
  • 不答应调用缓存来short-circuit (短路)这个恳求。(意思便是说不能从缓存池中获取缓存目标回来给客户端,必须经过恳求服务的方法获取呼应,也便是Chain.proceed())
  • 能够监听数据的传输