敞开生长之旅!这是我参加「日新计划 2 月更文挑战」的第 3 天,点击查看活动详情

Okio是一个IO库,底层根据Java原生的输入输出流完结。但原生的输入输出流并没有供给超时的检测机制。而Okio完结了这个功能。建议读者先阅读 Android | 完全了解 Okio 之源码篇 ,然后再阅读本篇内容会更好了解。

Timeout 类的规划

讨论超时机制,首先要了解Timeout这个类。Timeout完结了Okio的同步超时检测,这儿的同步指的是“使命履行”和“超时检测”是同步的,有次序的。同步超时不会直接中断使命履行,它首先会查看是否发生超时,然后决定是否中断使命履行。throwIfReached便是一个同步超时检测的办法。

了解 timeout 与 deadline 的差异

timeout中文意为“超时”,deadline中文意为“最终期限”,它们是有显着差异的。
Timeout类中有一系列的timeoutXxx办法,timeoutXxx是用来设置**一次操作完结的最大等候时刻。若这个操作在等候时刻内没有完毕,则以为超时。
deadlineXxx系列办法则是用来设置
一项使命完结的最大等候时刻。**意味着在未来多长时刻内,需要将这项使命完结,不然以为超时。它可能包括一次或屡次的操作。

读取文件的比如

回忆下之前Okio读取文件比如。

public void readFile() {
    try {
        FileInputStream fis = new FileInputStream("test.txt");
        okio.Source source = Okio.source(fis);
        BufferedSource bs = Okio.buffer(source);
        source.timeout().deadline(1, TimeUnit.MILLISECONDS);
        String res = bs.readUtf8();
        System.out.println(res);
    } catch (Exception e){
        e.printStackTrace();
    }
}

在这个比如中,咱们运用deadline设置了超时时刻为1ms,这意味着从现在开端,读取文件的这项使命,必须在未来的1ms内完结,不然以为超时。而读取文件的这项使命,就包括了屡次的文件读取操作。

摇骰子的比如

咱们再来看下面这个摇骰子的程序。Dice是一个骰子类,roll办法表明摇骰子,摇出来的点数latestTotal不会超越12。rollAtFixedRate会敞开一个线程,每隔一段时刻调用roll办法摇一次骰子。awaitTotal办法会当骰子的点数与咱们传递进去的total值相同或许超时而完毕。

private class Dice {
    Random random = new Random();
    int latestTotal;
    // 摇骰子
    public synchronized void roll() {
        latestTotal = 2 + random.nextInt(6) + random.nextInt(6);
        System.out.println("Rolled " + latestTotal);
        notifyAll();
    }
    // 敞开一个线程,每隔一段时刻履行 roll 办法
    public void rollAtFixedRate(int period, TimeUnit timeUnit) {
      Executors.newScheduledThreadPool(0).scheduleAtFixedRate(new Runnable() {
        public void run() {
          roll();
         }
      }, 0, period, timeUnit);
    }
    // 超时检测
    public synchronized void awaitTotal(Timeout timeout, int total) throws InterruptedIOException {
      while (latestTotal != total) {
        timeout.waitUntilNotified(this);
      }
   }
}

timeout()是一个测验骰子类的办法,在主线程中运转。该程序设置每隔3s摇一次骰子,主线程设置超时时刻为6s,期望摇到的点数是20。因为设置的超时是timeoutXxx系列的办法,所以这儿超时的意思是“只要我摇一次骰子的时刻不超越6s,那么我就不会超时,能够一向摇骰子”。因为摇出骰子的最大点数是12,而期望值是20,永远也摇不出来20这个点数,且摇一次骰子的时刻是3s多,也不满足超时的时刻。所以主线程就会一向处于等候状态。

public void timeout(){
    try {
        Dice dice = new Dice();
        dice.rollAtFixedRate(3, TimeUnit.SECONDS);
        Timeout timeout = new Timeout();
        timeout.timeout(6, TimeUnit.SECONDS);
        dice.awaitTotal(timeout, 20);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

现在将timeout()办法修正一下,将timeout.timeout(6, TimeUnit.SECONDS)改为timeout.deadline(6, TimeUnit.SECONDS),之前咱们说过deadlineXxx设置的超时**意味着在未来多长时刻内,需要将这项使命完结。**在摇骰子这儿的意思便是“从现在开端,我只能够摇6s的骰子。超越这个时刻你还在摇,则以为超时”。它重视的是能够摇多久的骰子,而不是摇一次骰子不能超越多久的时刻。

public void timeout(){
    try {
        Dice dice = new Dice();
        dice.rollAtFixedRate(3, TimeUnit.SECONDS);
        Timeout timeout = new Timeout();
        timeout.deadline(6, TimeUnit.SECONDS);
        dice.awaitTotal(timeout, 20);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

上述程序,主线程会在6s后因超时而中止等候,完毕运转。

等候直到唤醒

前面举了两个比如让咱们了解Okio中timeoutdeadline的差异。在摇骰子的比如顶用到了waitUntilNotified这个办法来检测超时,中文意思为“等候直到唤醒”。也便是Java多线程中经典的“等候-唤醒”机制,该机制常常用于多线程之间的通讯。调用waitUntilNotified办法的线程会一向处于等候状态,除非被唤醒或许因超时而抛出反常。下面是该办法的源码。

public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
    try {
      boolean hasDeadline = hasDeadline();
      long timeoutNanos = timeoutNanos();
      // 若没有设置 deadline && timeout,则一向等候直到唤醒
      if (!hasDeadline && timeoutNanos == 0L) {
        monitor.wait(); // There is no timeout: wait forever.
        return;
      }
      // Compute how long we'll wait.
      // 核算等候的时长,若一起设置了deadline 和 timeout,则 deadline 优先
      long waitNanos;
      long start = System.nanoTime();
      if (hasDeadline && timeoutNanos != 0) {
        long deadlineNanos = deadlineNanoTime() - start;
        waitNanos = Math.min(timeoutNanos, deadlineNanos);
      } else if (hasDeadline) {
        waitNanos = deadlineNanoTime() - start;
      } else {
        waitNanos = timeoutNanos;
      }
      // Attempt to wait that long. This will break out early if the monitor is notified.
      long elapsedNanos = 0L;
      if (waitNanos > 0L) {
        long waitMillis = waitNanos / 1000000L;
        // 等候 waitNanos
        monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
        // 核算从等候 waitNanos 到唤醒所用时刻
        elapsedNanos = System.nanoTime() - start;
      }
      // Throw if the timeout elapsed before the monitor was notified.
      // 若等候了 waitNanos 还没唤醒,以为超时
      if (elapsedNanos >= waitNanos) {
        throw new InterruptedIOException("timeout");
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt(); // Retain interrupted status.
      throw new InterruptedIOException("interrupted");
    }
}

查看waitUntilNotified的源码,咱们发现该办法根据“等候-通知”机制,增加了多线程之间的超时检测功能,一个线程用来履行详细的使命,一个线程调用该办法来检测超时。在Okio中的管道就运用了waitUntilNotified这个办法。

AsyncTimeout 类的规划

AsyncTimeout内部保护一个单链表,节点的类型是AsyncTimeout,以到超时之前的剩下时刻升序排序,即超时的剩下时刻越大,节点就在链表越后的位置。对链表的操作,运用了synchronized关键字加类锁,确保在同一时刻,只要一个线程能够对链表进行修正拜访操作。

AsyncTimeout完结了Okio的异步超时检测。这儿的异步指的是“使命履行”和“超时检测”是异步的,在履行使命的一起,也在进行使命的“超时检测”。你会觉得这和上面摇骰子的比如很像,一个线程履行使命,一个线程检测超时。事实上,AsyncTimeout也正是这样完结的,它内部的Watchdog线程便是用来检测超时的。当咱们要对一次操作或一项使命设置超时,运用成对的enter()exit(),模板代码如下。

enter();
// do something
exit();

若上面do something的操作超时,timedOut()办法将会在Watchdog线程被回调。能够看见,这种包裹性的模板代码,灵活性很大,咱们简直能够在其中放置任何想要检测超时的一个或多个操作。

AsyncTimeout 成员变量

下面是AsyncTimeout类主要的成员变量。

private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;
  1. IDLE_TIMEOUT_MILLIS,在单链表中没有节点时,Watchdog线程等候的时刻
  2. head,单链表的头结点,是一个虚假节点。当链表中只存在该节点,以为该链表为空。
  3. inQueue,当时节点是否在链表中。
  4. next,当时节点的下一个节点。
  5. timeoutAt,以当时时刻为基准,当时节点在将来何时超时。

AsyncTimeout 成员办法

scheduleTimeout 有序的将超时节点加入到链表中

scheduleTimeout办法能够将一个超时节点按照超时的剩下时刻有序的刺进到链表傍边。注意该办法运用synchronized修饰,是一个同步办法,能够确保对链表的操作是线程安全的。

private static synchronized void scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    // 若 head 节点为 null, 初始化 head 并启动 Watchdog 线程
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }
    // 核算 node 节点的 timeoutAt 值
    long now = System.nanoTime();
    if (timeoutNanos != 0 && hasDeadline) {
      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
      // Math.min() is undefined for absolute values, but meaningful for relative ones.
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      node.timeoutAt = node.deadlineNanoTime();
    } else {
      throw new AssertionError();
    }
    // Insert the node in sorted order.
    // 回来 node 节点的超时剩下时刻
    long remainingNanos = node.remainingNanos(now);
    // 从 head 节点开端遍历链表, 将 node 节点刺进到适宜的位置
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      // 若当时遍历的节点下一个节点为 null 或许 node 节点的超时剩下时刻小于下一个节点
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        // 将 node 节点刺进到链表
        node.next = prev.next;
        prev.next = node;
        // 若当时遍历的节点是 head, 唤醒 watchdog 线程
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
}

Watchdog 线程

scheduleTimeout办法中,若headnull,则会初始化head并启动Watchdog线程。Watchdog是一个看护线程,因此它会跟着JVM进程的完毕而完毕。前面咱们说过Watchdog线程是用来检测超时的,它会逐一查看链表中的超时节点是否超时,直到链表中所有节点查看完毕后完毕运转。

private static final class Watchdog extends Thread {
    Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }
    public void run() {
      while (true) {
        try {
          // 超时的节点
          AsyncTimeout timedOut;
          // 加锁,同步代码块
          synchronized (AsyncTimeout.class) {
            // 等候节点超时
            timedOut = awaitTimeout();
            // Didn't find a node to interrupt. Try again.
            // 当时该节点没有超时,持续查看
            if (timedOut == null) continue;
            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            // 链表中已经没有超时节点,完毕运转
            if (timedOut == head) {
              head = null;
              return;
            }
          }
          // Close the timed out node.
          // timedOut 节点超时,回调 timedOut() 办法
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
}

awaitTimeout 等候节点超时

Watchdog线程中会调用awaitTimeout办法来等候检测的节点超时,若检测的节点没有超时,该办法回来null。不然回来超时的节点。

static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
    // Get the next eligible node.
    // 检测的节点
    AsyncTimeout node = head.next;
    // The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
    // 若链表为空
    if (node == null) {
      long startNanos = System.nanoTime();
      // Watchdog 线程等候 60s,期间会释放类锁
      AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
      // 等候 60s 后若链表还为空则回来 head,不然回来 null
      return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
              ? head  // The idle timeout elapsed.
              : null; // The situation has changed.
    }
    // node 节点超时剩下的时刻
    long waitNanos = node.remainingNanos(System.nanoTime());
    // The head of the queue hasn't timed out yet. Await that.
    // node 节点超时剩下的时刻 > 0,阐明 node 还未超时,持续等候 waitNanos 后回来 null
    if (waitNanos > 0) {
      // Waiting is made complicated by the fact that we work in nanoseconds,
      // but the API wants (millis, nanos) in two arguments.
      long waitMillis = waitNanos / 1000000L;
      waitNanos -= (waitMillis * 1000000L);
      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
      return null;
    }
    // The head of the queue has timed out. Remove it.
    // node 节点超时了,将 node 从链表中移除并回来
    head.next = node.next;
    node.next = null;
    return node;
}

enter 进入超时检测

剖析完上面三个办法后再来看enter就十分的简单了,enter内部调用了scheduleTimeout办法来增加一个超时节点到链表傍边,而Watchdog线程随即会开端检测超时。

public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    // 更新 inQueue 为 true
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
}

exit 退出超时检测

前面说过,enterexit在检测超时是需要成对出现的。它们之间的代码便是需要检测超时的代码。exit办法的回来值表明enterexit中心检测的代码是否超时。

public final boolean exit() {
    if (!inQueue) return false;
    // 更新 inQueue 为 false
    inQueue = false;
    return cancelScheduledTimeout(this);
}

cancelScheduledTimeout办法会将当时的超时节点从链表中移除。为了确保对链表的操作是线程安全的,该办法也是一个同步办法。咱们知道在awaitTimeout办法中,若某个节点超时了会将它从链表中移除。那么当调用cancelScheduledTimeout发现node不在链表中,则一定表明node超时了。

private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
    // Remove the node from the linked list.
    // 若 node 在链表中,将其移除。
    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
      if (prev.next == node) {
        prev.next = node.next;
        node.next = null;
        return false;
      }
    }
    // The node wasn't found in the linked list: it must have timed out!
    // node 不在链表中,则 node 一定超时了,回来 true
    return true;
}

总结

本文详细讲解了Okio中超时机制的完结原理,主要是TimeoutAsyncTimeout类的源码剖析与解读。相信咱们已经把握了这部分常识,现总结一下文中要点。

  1. Okio 根据等候-唤醒机制,运用Watchdog线程来检测超时。
  2. 当要对某项操作或使命进行超时检测时,将它们放到enterexit的中心。
  3. Okio 对链表的运用十分频繁,在文件读写和超时检测都运用到了链表这个结构。

写在最终

如果你对我感兴趣,请移步到 blogss.cn ,或重视大众号:程序员小北,进一步了解。

  • 如果本文帮助到了你,欢迎点赞和重视,这是我持续创造的动力 ❤️
  • 因为作者水平有限,文中如果有错误,欢迎在谈论区指正 ✔️
  • 本文首发于,未经许可禁止转载 ️