CompletableFuture由Java 8提供,是完结异步化的东西类,上手难度较低,且功用强大,支撑通过函数式编程的办法对各类操作进行组合编排。 CompletableFuture完结了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多使命的协同工作时更加顺畅便利。

1.背景

跟着事务项目数量的增大,体系服务面临的压力也越来越大,这时候体系吞吐量会下降,可是一些中心功用的接口有必要确保高吞吐量,低延迟。这时候咱们就需求对接口进行优化,进步功能,然后确保高吞吐量。这时候CompletableFuture就用很大的用武之地了,咱们一些中心接口处理数据都是串行履行的,可是其实接口的某些数据获取、处理封装并没有前后依靠关系,咱们大可并行处理,这样就能够充分利用cpu资源。

一般咱们的接口调用履行分为同步或许异步:

1.1 同步履行

通常咱们的接口数据查询屡次数据库获取数据然后进行处理,封装回来,或许是屡次rpc调用其他服务获取数据,可是无论什么获取数据的操作,都是串行履行的,也便是操作2有必要要等操作1完结之后在履行,即使操作1和操作2之间没有任何联系

Java新特性:异步编排CompletableFuture

在同步调用的场景下,接口耗时长、功能差,接口响应时长T = T1+T2+T3+……+Tn,这时为了缩短接口的响应时间,一般会运用线程池的办法并行获取数据

1.2 异步履行

Java新特性:异步编排CompletableFuture

运用并行获取数据,大大降低了接口对数据获取,处理的时间

2.CompletableFuture运用

下面咱们通过一个例子来解说CompletableFuture如何运用,商品概况接口回来数据运用CompletableFuture进行数据封装使命进行异步编排:

  private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
       .setNameFormat("product-pool-%d").build();
​
  private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
      Runtime.getRuntime().availableProcessors() * 40,
      0L,
      TimeUnit.MILLISECONDS,
      new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
      namedThreadFactory);
​
/**
   * 运用completableFuture履行多线程使命安排,进步速度,completableFuture能够让某些异步线程使命串行化次序履行
   * 假如不要求某些异步使命串行化次序履行,那么也能够JUC里边另一个countDownLatch完结
   *
   * @param skuId
   * @return
   */
  @Override
  public SkuInfo getSkuDetail(Long skuId) {
    SkuInfo skuInfo = new SkuInfo();
    // 获取sku信息
    CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
      ProductSku sku = productSkuDAO.selectById(skuId);
      skuInfo.setSku(sku);
      return sku;
     }, fixedThreadPool);
    // 异步获取spu信息
    CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
      ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
      skuInfo.setSpu(spu);
      return spu;
     }, fixedThreadPool);
    // 异步获取品牌信息
    CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
      BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
      skuInfo.setBrand(brandDTO);
      return brandDTO;
     }, fixedThreadPool);
    // 异步获取分类信息
    CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
      CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
      skuInfo.setCategory(categoryDTO);
      return categoryDTO;
     }, fixedThreadPool);
    try {
      // 终究等待一切异步使命履行完结回来封装成果
      CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
     } catch (Exception e) {
      log.error("<=======等候一切使命履行进程报错:======>", e);
     }
    return skuInfo;
   }
      
      

2.1 supplyAsync / runAsync

supplyAsync表明创立带回来值的异步使命的,相当于ExecutorService submit(Callable task) 办法,runAsync表明创立无回来值的异步使命,相当于ExecutorService submit(Runnable task)办法,这两办法的效果跟submit是一样的,测验用例如下:

  /**
   * 测验办法CompletableFuture.runAsync:无回来值,
   */
  private static void testRunAsync() {
    CompletableFuture.runAsync(() ->{
      System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
      System.out.println("supplyAsync 是否为看护线程 " + Thread.currentThread().isDaemon());
      int result = 10/2;
      System.out.println("核算成果为:"+ result);
     }, fixedThreadPool);
   }
​
  /**
   * 测验办法CompletableFuture.supplyAsync:有回来值
   * @throws ExecutionException
   * @throws InterruptedException
   */
  private static void testSupplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
      System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
      int result = 10 / 2;
      return result;
     }, fixedThreadPool);
    Integer res = future.get();
    System.out.println("回来成果值为:"+res);
   }

这两办法各有一个重载版本,能够指定履行异步使命的Executor完结,假如不指定,默许运用ForkJoinPool.commonPool(),假如机器是单核的,则默许运用ThreadPerTaskExecutor,该类是一个内部类,每次履行execute都会创立一个新线程,具体能够看CompletableFuture源码。

2.2 thenApply / thenApplyAsync

thenApply 表明某个使命履行完结后履行的动作,即回调办法,会将该使命的履行成果即办法回来值作为入参传递到回调办法中,

 /**
   * 线程串行化
   * 1、thenRun:不能获取上一步的履行成果
   * 2、thenAcceptAsync:能承受上一步成果,可是无回来值
   * 3、thenApplyAsync:能承受上一步成果,有回来值
   *
   */
  private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
      int i = 10 / 2;
      System.out.println("运转成果:" + i);
      try {
        TimeUnit.SECONDS.sleep(5);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      return i;
     }, executor);
    CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
      System.out.println("======使命2启动了..." + res*20);
      return "Hello" + res;
     }, executor);
​
    CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
      System.out.println("======使命3履行了");
     }, executor);
​
    CompletableFuture.allOf(future1, future2, future3).get();
    System.out.println("=======测验结束");
​
   }

thenApplyAsync与thenApply的差异在于,前者是将job2提交到线程池中异步履行,实践履行future2的线程可能是别的一个线程,后者是由履行future1的线程当即履行future2,即两个future都是同一个线程履行的

2.3 exceptionally/whenComplete/handle

exceptionally办法指定某个使命履行反常时履行的回调办法,会将抛出反常作为参数传递到回调办法中,假如该使命正常履行则会exceptionally办法回来的CompletionStage的result便是该使命正常履行的成果;whenComplete是当某个使命履行完结后履行的回调办法,会将履行成果或许履行期间抛出的反常传递给回调办法,假如是正常履行则反常为null,回调办法对应的CompletableFuture的result和该使命共同,假如该使命正常履行,则get办法回来履行成果,假如是履行反常,则get办法抛出反常

 /**
   * 测验whenComplete和exceptionally: 异步办法履行完的处理
   */
  private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
       System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
       Integer num = 10;
       int i = num / 2;
       String s = String.valueOf(null);
       System.out.println("运转成果:" + i);
       return i;
     }, executor).whenComplete((res,exception) -> {
       //虽然能得到反常信息,可是没法修正回来数据
       System.out.println("<=====异步使命成功完结了=====成果是:" + res + "=======反常是:" + exception);
     }).exceptionally(throwable -> {
       //能够感知反常,一起回来默许值
       System.out.println("<=====异步使命成功产生反常了======"+throwable);
       return 10;
     });
    Integer result = future.get();
    System.out.println("<=====终究回来成果result=" + result + "======>");
​
   }
​
  /**
   * 测验handle办法:它是whenComplete和exceptionally的结合
   */
  private static void testHandle() {
     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
       System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
       int i = 10 / 2;
       System.out.println("运转成果:" + i);
       return i;
     }, executor).handle((result,thr) -> {
       if (result != null) {
         return result * 2;
       }
       if (thr != null) {
         System.out.println("异步使命成功完结了...成果是:" + result + "反常是:" + thr);
         return 0;
       }
       return 0;
     });
   }

2.4 组合处理 thenCombine / thenAcceptBoth / runAfterBoth

这三个办法都是将两个CompletableFuture组合起来,只需这两个都正常履行完了才会履行某个使命,差异在于,thenCombine会将两个使命的履行成果作为办法入参传递到指定办法中,且该办法有回来值;thenAcceptBoth相同将两个使命的履行成果作为办法入参,可是无回来值;runAfterBoth没有入参,也没有回来值。留意两个使命中只需有一个履行反常,则将该反常信息作为指定使命的履行成果

  private static void thenCombine() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
    CompletableFuture<String> result = future1.thenCombine(future2, (t, u) -> t+" "+u);
    System.out.println(result.get());
   }
  
    private static void thenAcceptBoth() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f1="+t);
      return t;
     },fixedThreadPool);
​
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f2="+t);
      return t;
     },fixedThreadPool);
   }

2.5 applyToEither / acceptEither / runAfterEither

这三个办法都是将两个CompletableFuture组合起来,只需其间一个履行完了就会履行某个使命,其差异在于applyToEither会将现已履行完结的使命的履行成果作为办法入参,并有回来值;acceptEither相同将现已履行完结的使命的履行成果作为办法入参,可是没有回来值;runAfterEither没有办法入参,也没有回来值。留意两个使命中只需有一个履行反常,则将该反常信息作为指定使命的履行成果

  private static void applyToEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f1="+t);
      return t;
     },fixedThreadPool);
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f2="+t);
      return t;
     },fixedThreadPool);
​
    CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
      System.out.println("applyEither:"+t);
      return t * 2;
     });
​
   }
​
  private static void acceptEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f1="+t);
      return t;
     },fixedThreadPool);
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
      int t = new Random().nextInt(3);
      try {
        TimeUnit.SECONDS.sleep(t);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      System.out.println("f2="+t);
      return t;
     },fixedThreadPool);
​
    CompletableFuture<Void> result = f1.acceptEither(f2, t -> {
      System.out.println("acceptEither:"+t);
     });
​
   }

2.6 allOf / anyOf

allOf回来的CompletableFuture是多个使命都履行完结后才会履行,只需有一个使命履行反常,则回来的CompletableFuture履行get办法时会抛出反常,假如都是正常履行,则get回来null。

  private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
      int i = 10 / 2;
      System.out.println("运转成果:" + i);
      try {
        TimeUnit.SECONDS.sleep(5);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      return i;
     }, executor);
    CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
      System.out.println("======使命2启动了..." + res*20);
      return "Hello" + res;
     }, executor);
​
    CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
      System.out.println("======使命3履行了");
     }, executor);
​
    CompletableFuture.allOf(future1, future2, future3).get();
    System.out.println("=======测验结束");
​
   }

留意,运用CompletableFuture可能有某些异步使命不履行,示例如下:

  private static void testNotExecute() {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
      System.out.println("<======当时线程:" + Thread.currentThread().getName() + "=====线程id: " + Thread.currentThread().getId());
      System.out.println("supplyAsync 是否为看护线程 " + Thread.currentThread().isDaemon());
      int i = 10 / 2;
      System.out.println("运转成果:" + i);
      try {
        TimeUnit.SECONDS.sleep(2);
       } catch (InterruptedException e) {
        e.printStackTrace();
       }
      // 下面不打印
      System.out.println("return之前的打印");
      return i;
     });
   }

形成这个原因是因为Daemon。因为completableFuture这套运用异步使命的操作都是创立成了看护线程。那么咱们没有调用get办法不阻塞这个主线程的时候。主线程履行结束。一切线程履行结束就会导致一个问题,便是看护线程退出。那么咱们没有履行的代码便是因为主线程不再跑使命而关闭导致的。

3.CompletableFuture的完结原理

CompletableFuture源码可知,CompletableFuture中包含两个字段:resultstack。result用于存储当时CF的成果,stack(Completion)表明当时CF完结后需求触发的依靠动作(Dependency Actions),去触发依靠它的CF的核算,依靠动作能够有多个(表明有多个依靠它的CF),以栈(Treiber stack)的方式存储,stack表明栈顶元素。具体原理完结细节,请参考美团技能团队的CompletableFuture原理与实践