异步编列

事务开发的过程中,咱们为了下降接口耗时,经常会用到线程池,书写多线程数据获取、同步堵塞获取成果的事务逻辑。

常见的使用方法如下:

Future

@Slf4j
@SpringBootTest
public class OtherTest {
​
  public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
     new LinkedBlockingQueue<>(100));
​
  public static void main(String[] args) {
​
​
   Future<Integer> submit1 = executor.submit(() -> {
     // 事务耗时逻辑1
     return 1;
    });
​
   Future<Integer> submit2 = executor.submit(() -> {
     // 事务耗时逻辑2
     return 2;
    });
​
   Future<Integer> submit3 = executor.submit(() -> {
     // 事务耗时逻辑3
     return 3;
    });
​
   try {
     Integer integer1 = submit1.get();
     Integer integer2 = submit2.get();
     Integer integer3 = submit3.get();
​
     System.out.println(integer1);
     System.out.println(integer2);
     System.out.println(integer3);
    } catch (Exception e) {
     e.printStackTrace();
    }
​
  }
​
}

假定一个接口涉及到3个事务逻辑,如下:

  • 事务逻辑1耗时: 50ms
  • 事务逻辑2耗时: 30ms
  • 事务逻辑3耗时: 70ms

那么假如是传统的串行调用,接口总耗时:150ms

但假如是上面的利用线程池的方法进行调用,那么该接口耗时取决于耗时最长的那个事务逻辑,即该接口耗时为: 70ms

能够看到,接口耗时是有明显下降的~


CompletableFuture

当然,上面尽管对接口进行异步编列后,接口耗时有着下降,但是假如说咱们的耗时事务逻辑有着十几二十个?且事务逻辑之间存在依靠联系?那么咱们怎么办?

很显然,上面的Future就不能满足咱们的需求了,所以从JDK8开端,JDK供给了CompletableFuture东西类,为咱们异步编列供给了很大的便利~

@Slf4j
@SpringBootTest
public class OtherTest {
​
  public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,
     new LinkedBlockingQueue<>(100));
​
  public static void main(String[] args) {
​
​
   CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
     // 事务耗时逻辑1
     return 1;
    }, executor);
​
   CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
     // 事务耗时逻辑2
     return 2;
    }, executor);
​
   CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {
     // 事务耗时逻辑3
     return 3;
    }, executor);
​
   try {
     // 等候使命全部履行结束
     CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get();
​
     System.out.println(completableFuture1.get());
     System.out.println(completableFuture2.get());
     System.out.println(completableFuture3.get());
​
    } catch (Exception e) {
     e.printStackTrace();
    }
​
  }
​
}

由于案例比较简单,无法突出CompletableFuture编列能力比较于Future的优势地点,这个在今后的文章里专门会为咱们讲解这不是本文的要点。


超时中止

在上面的案例中,细心的小伙伴能够发现,不管是CompletableFuture仍是Future,我都是进行堵塞等候使命结束。

这,其实是一个十分风险的行为,假如下游rpc接口呈现波动,那么接口耗时会明显提升,而咱们却进行堵塞获取,线程会被一向堵塞无法及时开释,那么随着不断的请求进来,线程池线程、行列很快都会被打满,新使命都会被拒绝掉,然后影响用户体会,然后影响你的工资,然后影响你的作业。

异步超时中断,知其然,也要知其所以然~

所以,为了杜绝这种情况呈现,咱们在获取使命成果的时分需求设置等候时刻~

FutureCompletableFutureget方法都支撑传入等候时刻~

异步超时中断,知其然,也要知其所以然~


Future超时中止机制

Future供给了get方法来供咱们堵塞获取使命成果,也支撑传入超时时刻,下面来了解下源码

public V get(long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException {
 // 参数校验
 if (unit == null)
  throw new NullPointerException();
 
 int s = state;
 
 // 堵塞等候,假如超越超时时刻使命还未完结,那么抛出超时反常
 if (s <= COMPLETING &&
    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
  throw new TimeoutException();
 return report(s);
}

堵塞等候,timedtrue代表存在超时时刻

private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
 
  long startTime = 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    int s = state;
    // 使命状况 > COMPLETING阐明现已履行结束
    if (s > COMPLETING) {
      // 当时线程不必等候了,将等候节点里的Thread设置为null
      if (q != null)
        q.thread = null;
      return s;
     }
    else if (s == COMPLETING)
      // COMPLETING是使命履行结束到真实将使命设置为完结态的一个中间状况
      // 当使命的处于COMPLETING时,阐明使命现已履行完了,但此刻cpu时刻不行没有持续履行
      // 此刻需求yield一下,让其他线程履行,然后将使命正确设置为完结状况
      Thread.yield();
    else if (Thread.interrupted()) {
      // 假如当时线程被打断了,则把当时线程从等候该使命完结的堵塞线程链表中删除
      removeWaiter(q);
      // 抛出打断反常
      throw new InterruptedException();
     }
    else if (q == null) {
      // 假如是超时等候,且等候时刻<=0,则直接返回当时使命状况
      if (timed && nanos <= 0L)
        return s;
      // 初始化一个等候当时使命履行完的节点,内部包括
      q = new WaitNode();
     }
    else if (!queued)
      // 将WaitNode排队到线程等候链表中
      queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    else if (timed) {
      // 堵塞等候,存在超时时刻
      final long parkNanos;
      if (startTime == 0L) { // first time
        startTime = System.nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
       } else {
        long elapsed = System.nanoTime() - startTime;
        if (elapsed >= nanos) {
          removeWaiter(q);
          return state;
         }
        parkNanos = nanos - elapsed;
       }
      if (state < COMPLETING)
        LockSupport.parkNanos(this, parkNanos);
     }
    else
      // 堵塞等候,没有超时时刻
      LockSupport.park(this);
   }
}

上面源码注释现已比较完善了,但咱们仍是要总结一下

  • 使命COMPLETING状况,是使命履行结束到真实将使命设置为完结态的一个中间状况(见FutureTask的run方法
  • get方法不管是否存在超时时刻,底层都是经过LockSupportpark、unpark方法来达到堵塞的目的
  • 对于每个使命,其内部会维护一个等候当时使命完结的线程链表waiters

CompletableFuture超时中止机制

而从JDK 9开端,CompletableFuture 也供给了 orTimeoutcompleteTimeout 方法,来进行异步超时控制。

CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();

依据上面代码,咱们能够了解到,会等候completableFuture1, completableFuture2, completableFuture3三个使命履行1秒钟

假如超越1秒,则会抛出java.util.concurrent.TimeoutException

源码如下:

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
  if (unit == null)
    throw new NullPointerException();
  if (result == null)
    whenComplete(new Canceller(Delayer.delay(new Timeout(this),
                         timeout, unit)));
  return this;
}
​
public CompletableFuture<T> completeOnTimeout(T value, long timeout,
                       TimeUnit unit) {
 if (unit == null)
  throw new NullPointerException();
 if (result == null)
  whenComplete(new Canceller(Delayer.delay(
   new DelayedCompleter<T>(this, value),
   timeout, unit)));
 return this;
}
static final class Delayer {
 static ScheduledFuture<?> delay(Runnable command, long delay,
                 TimeUnit unit) {
  // 延时使命
  return delayer.schedule(command, delay, unit);
  }
 
 static final ScheduledThreadPoolExecutor delayer;
 static {
  // 单线程
   (delayer = new ScheduledThreadPoolExecutor(
   1, new DaemonThreadFactory())).
   setRemoveOnCancelPolicy(true);
  }
}
​
​
static final class Timeout implements Runnable {
 final CompletableFuture<?> f;
 Timeout(CompletableFuture<?> f) { this.f = f; }
 public void run() {
  // 假如CompletableFuture不为null,且守时使命没有被撤销
  if (f != null && !f.isDone())
   // 设置超时反常
   f.completeExceptionally(new TimeoutException());
  }
}
​
static final class DelayedCompleter<U> implements Runnable {
 final CompletableFuture<U> f;
 final U u;
 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
 public void run() {
  if (f != null)
   // 将使命成果设置为咱们给定的value
   f.complete(u);
  }
}
static final class Canceller implements BiConsumer<Object, Throwable> {
  final Future<?> f;
  Canceller(Future<?> f) { this.f = f; }
  public void accept(Object ignore, Throwable ex) {
    // 假如没有反常,且超时使命存在且没有被撤销,那么则撤销超时使命
    // 由于此刻阐明,CompletableFuture的使命在超时时刻内完结了,则不需求在监控超时
    if (ex == null && f != null && !f.isDone())
      f.cancel(false);
   }
}

经过对上面源码的了解,咱们能够知道

CompletableFutureorTimeoutcompleteOnTimeout底层其实都是经过ScheduledThreadPoolExecutor来完结的

当咱们对一个CompletableFuture设置了超时时刻后,底层其实会经过ScheduledThreadPoolExecutor发动一个延时使命,延时时刻便是咱们设置的超时时刻,此刻有分为两种情况

  1. 使命在超时时刻之内完结,那么在使命完结之后,会去经过cancel(false)撤销延时使命
  2. 使命履行时刻超越设定的超时时刻,则为该使命设置TimeoutException,让主线程感知~

Future cancel原理

另外,咱们还能看到,CompletableFuture 的延时使命并没有进行try-catch,此处能够了解下->ScheduledThreadPoolExecutor有坑嗷~

orTimeoutcompleteOnTimeout的区别就在于

  • 假如是orTimeout,那么超时后会抛出超时反常
  • 假如是completeOnTimeout不会抛出反常,则是将使命成果设置为咱们传入的value

扩展知识点

在上面了解CompletableFutureorTimeoutcompleteOnTimeout时,咱们知道了其底层是经过ScheduledThreadPoolExecutor来完结的,但经过源码发现,ScheduledThreadPoolExecutor只要一个线程去处理

static final ScheduledThreadPoolExecutor delayer;
static {
   (delayer = new ScheduledThreadPoolExecutor(
    1, new DaemonThreadFactory())).
    setRemoveOnCancelPolicy(true);
}

那么,当呈现很多设置了超时时刻且时刻个不一致的CompletableFuture时,由所以单线程处理,或许咱们给使命设置的超时时刻是1000ms,但实际或许由于行列排队,真实处理超时的超时时刻会 > 1000ms

也便是说orTimeoutcompleteOnTimeout设置的超时时刻并不会那么精确


结束

我是 皮皮虾 ,会在今后的日子里跟咱们一同学习,一同前进!

觉得文章不错的话,能够在 关注我,或者是我的公众号——JavaCodes