Future获取异步履行成果

Java线程详解
万字探究线程池优化并发使命的利器

之前咱们详细探究了线程池,在上一篇文章中,咱们只是介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 办法,运用这个办法尽管能够提交使命,可是却没有办法获取使命的履行成果(execute() 办法没有回来值)。而很多场景下,咱们又都是需求获取使命的履行成果的。

Future介绍

Java 经过 ThreadPoolExecutor 供给的 3 个 submit() 办法和 1 个 FutureTask 东西类来支撑获得使命履行成果的需求。下面咱们先来介绍这 3 个 submit() 办法,这 3 个办法的办法签名如下。


// 提交Runnable使命
Future<?> submit(Runnable task);
// 提交Callable使命
<T> Future<T> submit(Callable<T> task);
// 提交Runnable使命及成果引用  
<T> Future<T> submit(Runnable task, T result);

咱们发现它们的回来值都是 Future 接口

Java 异步编程的完美利器:CompletableFuture 指南
Future 接口有 5 个办法:

  • 撤销使命的办法 cancel()
  • 判别使命是否已撤销的办法 isCancelled()
  • 判别使命是否已结束的办法 isDone()
  • 获得使命履行成果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支撑超时机制。

下面咱们简略看下Future的比如:

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<Integer> future = executor.submit(() -> {
            // 模仿耗时核算
            Thread.sleep(2000);
            return 2 + 3;
        });
        System.out.println("异步核算中...");
        // 堵塞等候核算成果
        int result = future.get();
        System.out.println("核算成果: " + result);
        executor.shutdown();
    }
}

FutureTask

FutureTask是一个完结了RunnableFuture接口的类,它既能够作为Runnable目标传递给线程履行,也能够作为Future目标获取使命的成果。因而,咱们能够经过FutureTaskCallable使命转化为可履行的异步使命,并在需求时获取使命的成果。

Java 异步编程的完美利器:CompletableFuture 指南

下面是一个FutureTask的比如:

public class FutureTaskExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Callable<Integer> callable = () -> {
            // 模仿耗时核算
            Thread.sleep(2000);
            return 2 + 3;
        };
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        // 创立线程履行使命
        Thread thread = new Thread(futureTask);
        thread.start();
        System.out.println("异步核算中...");
        // 堵塞等候核算成果
        int result = futureTask.get();
        System.out.println("核算成果: " + result);
    }
}

FutureTask也能够直接作为ExecutorService的参数进行提交,以便履行使命,而无需手动创立线程。这样能够更方便地办理线程池和异步使命。

Future与FutureTask的不足

尽管Future与FutureTask在Java中供给了一种根本的异步编程办法,但它也存在一些不足之处:

  1. 缺少异步回调机制:FutureTaskFuture接口都没有直接供给异步回调的机制。在某些场景下,咱们或许期望在使命完结后当即履行一些操作,而不是堵塞等候成果。需求手动编写额外的代码来完结异步回调逻辑,增加了代码的复杂性。
  2. 无法手动完结或撤销使命:FutureTaskFuture都没有供给自动完结或撤销使命的办法。一旦使命提交,就无法在外部控制其履行状况。这或许会导致无法高雅地办理使命的生命周期和资源。
  3. 堵塞式获取成果:在运用get()办法获取成果时,假如使命还未完结,调用线程会被堵塞,无法进行其他操作。这种堵塞式获取成果的办法或许导致整体功能下降,特别是在多个异步使命一同履行时。
  4. 缺少反常处理灵敏性:FutureFutureTask在处理使命履行过程中的反常时,比较简略且不够灵敏。经过捕获ExecutionException来获取反常信息,或许需求额外的处理逻辑来处理不同类型的反常情况。

为了处理这些问题,Guava供给了ListenableFuture,Java 8引入了CompletableFuture,它们都供给了更丰厚的功能和灵敏性,如异步回调、反常处理、使命组合等。

CompletableFuture运用

CompletableFutur介绍

CompletableFuture供给下面几种办法创立使命,它们之间的差异是:Runnable 接口的 run() 办法没有回来值,而 Supplier 接口的 get() 办法是有回来值的。

//运用内置线程ForkJoinPool.commonPool(),依据supplier构建履行使命
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {  
    return asyncSupplyStage(asyncPool, supplier);  
}  
//指定自定义线程,依据supplier构建履行使命
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,  
Executor executor) {  
    return asyncSupplyStage(screenExecutor(executor), supplier);  
}  
//运用内置线程ForkJoinPool.commonPool(),依据runnable构建履行使命
public static CompletableFuture<Void> runAsync(Runnable runnable) {  
    return asyncRunStage(asyncPool, runnable);  
} 
//指定自定义线程,依据runnable构建履行使命
public static CompletableFuture<Void> runAsync(Runnable runnable,  
Executor executor) {  
    return asyncRunStage(screenExecutor(executor), runnable); 
}

默认情况下 CompletableFuture 会运用公共的 ForkJoinPool 线程池,这个线程池默认创立的线程数是 CPU 的核数(也能够经过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。

假如一切 CompletableFuture 同享一个线程池,那么一旦有使命履行一些很慢的 I/O 操作,就会导致线程池中一切线程都堵塞在 I/O 操作上,然后造成线程饥饿,进而影响整个系统的功能。所以咱们要依据不同的事务类型创立不同的线程池,以防止互相搅扰。

下面是一个CompletableFuture获取异步成果的比如:

public class CompletableFutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            // 模仿耗时核算
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2 + 3;
        });
        System.out.println("异步核算中...");
        future.thenAccept(result -> System.out.println("核算成果: " + result));
        // 堵塞等候使命完结
        future.get();
    }
}

时序依靠联系

通常使命的依靠分为以下几种:

  1. 串行履行
graph TD
    Start --> Future1
    Future1 --> Future2
    Future2 --> Future3
    Future3 --> End
  1. 一切都履行完:
graph TD
    Start --> Future1
    Start --> Future2
    Start --> Future3
    Future1 --> Converge
    Future2 --> Converge
    Future3 --> Converge
    Converge --> End
  1. 恣意完结:
graph TD
    Start --> Future1
    Start --> Future2
    Start --> Future3
    Future1 -.-> Converge
    Future2 -.-> Converge
    Future3 -.-> Converge
    Converge --> End

CompletableFuture的CompletionStage 接口能够清晰地描绘使命之间的这种时序依靠联系。下面咱们看下CompletionStage 接口怎么描绘串行联系、AND 聚合联系、OR 聚合联系以及反常处理。

CompletionStage编排

串行履行

描绘串行联系CompletionStage 接口里边描绘串行联系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四种类型的接口。

  • thenApply 系列函数里参数 fn 的类型是接口 Function,这个接口里与 CompletionStage 相关的办法是 R apply(T t),这个办法既能接纳参数也支撑回来值,所以 thenApply 系列办法回来的是CompletionStage。
  • thenAccept 系列办法里参数 consumer 的类型是接口Consumer,这个接口里与 CompletionStage 相关的办法是 void accept(T t),这个办法尽管支撑参数,但却不支撑回值,所以 thenAccept 系列办法回来的是CompletionStage。
  • thenRun 系列办法里 action 的参数是 Runnable,所以 action 既不能接纳参数也不支撑回来值,所以 thenRun 系列办法回来的也是CompletionStage。这些办法里边 Async 代表的是异步履行 fn、consumer 或许 action。
  • thenCompose 办法,这个系列的办法会新创立出一个子流程,最终成果和 thenApply 系列是相同的。
CompletionStagethenApply(fn);
CompletionStagethenApplyAsync(fn);
CompletionStagethenAccept(consumer);
CompletionStagethenAcceptAsync(consumer);
CompletionStagethenRun(action);
CompletionStagethenRunAsync(action);
CompletionStagethenCompose(fn);
CompletionStagethenComposeAsync(fn);

经过下面的示例代码,咱们能够看一下 thenApply() 办法是怎么运用的。首先经过 supplyAsync() 启动一个异步流程,之后是两个串行操作,尽管这是一个异步流程,但使命①②③却是串行履行的,②依靠①的履行成果,③依靠②的履行成果。


CompletableFuture<String> f0 = 
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " 你好!")  //②
  .thenApply(String::toUpperCase);//③
System.out.println(f0.join());

AND会聚联系

CompletionStage 接口里边描绘 AND 会聚联系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的差异也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

OR 会聚联系

CompletionStage 接口里边描绘 OR 会聚联系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的差异也是源自 fn、consumer、action 这三个核心参数不同。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展现了怎么运用 applyToEither() 办法来描绘一个 OR 会聚联系。


CompletableFuture<String> f1 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    System.sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
CompletableFuture<String> f2 = 
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    System.sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});
CompletableFuture<String> f3 = 
  f1.applyToEither(f2,s -> s);
System.out.println(f3.join());

反常处理

反常处理尽管上面咱们说到的 fn、consumer、action 它们的核心办法都不答应抛出可检查反常,可是却无法约束它们抛出运行时反常。正常事务代码中,咱们能够运用 try{}catch{}来捕获并处理反常,那在异步编程里边,反常该怎么处理呢?

CompletableFuturef0 = CompletableFuture.supplyAsync(()->(3/0))
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage 接口给咱们供给的计划十分简略,比 try{}catch{}还要简略,下面是相关的办法,运用这些办法进行反常处理和串行操作是一样的,都支撑链式编程办法。

CompletionStage exceptionally(fn);
CompletionStagewhenComplete(consumer);
CompletionStagewhenCompleteAsync(consumer);
CompletionStagehandle(fn);
CompletionStagehandleAsync(fn);

下面的示例代码展现了怎么运用 exceptionally() 办法来处理反常,exceptionally() 的运用十分相似于 try{}catch{}中的 catch{},可是因为支撑链式编程办法,所以相对更简略。已然有 try{}catch{},那就必定还有 try{}finally{},whenComplete() 和 handle() 系列办法就相似于 try{}finally{}中的 finally{},无论是否产生反常都会履行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的差异在于 whenComplete() 不支撑回来成果,而 handle() 是支撑回来成果的。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    // 模仿一个或许抛出反常的异步操作
    if (Math.random() < 0.5) {
        throw new RuntimeException("Something went wrong");
    }
    return 10;
});
CompletableFuture<Integer> handleException = future.exceptionally(ex -> {
    // 反常处理逻辑
    System.out.println("Exception occurred: " + ex.getMessage());
    return 0; // 回来默认值或处理后的成果
});
handleException.thenAccept(result -> {
    // 在最终成果完结后进行处理
    System.out.println("Final result: " + result);
});

获取成果

CompletableFuture供给了下面几种常用的办法获取成果:

public  T   get()
public  T   get(long timeout, TimeUnit unit)
public  T   getNow(T valueIfAbsent)
public  T   join()
public CompletableFuture<T> allOf()
public CompletableFuture<T> anyOf()
public CompletableFuture<T> whenComplete()
public <U> CompletableFuture<U> handle()
  1. get()get()办法是最常用的获取CompletableFuture成果的办法之一。它会堵塞当前线程,直到异步操作完结并回来成果,或许抛出反常。假如异步操作抛出反常,get()办法会将反常包装在ExecutionException中抛出。这个办法能够用于同步地获取成果。
  2. join()join()办法与get()办法相似,也会堵塞当前线程,直到异步操作完结并回来成果,或许抛出反常。不同之处在于,join()办法不会抛出ExecutionException,而是直接将反常抛出。这个办法能够用于同步地获取成果。
  3. whenComplete(BiConsumer<? super T,? super Throwable> action)whenComplete()办法答应注册一个回调函数,在异步操作完结后履行该函数。回调函数接纳异步操作的成果(假如成功完结)或反常(假如产生反常),并能够对成果进行进一步处理或履行其他操作。这个办法不会堵塞线程,异步操作完结后当即履行回调函数。
  4. handle(BiFunction<? super T, Throwable, ? extends U> fn)handle()办法相似于whenComplete(),也答应注册一个回调函数,在异步操作完结后履行该函数。不同之处在于,回调函数的回来值会被包装在新的CompletableFuture中回来,而不是疏忽回来值。这个办法能够用于对成果进行处理或转化,并回来包装后的新CompletableFuture。
  5. allOf():就是一切使命都完结时触发。allOf()能够配合get()一同运用。
  6. anyOf():等候恣意一个完结。anyOf()办法回来一个新的CompletableFuture<Object>目标,该目标在恣意一个输入的CompletableFuture完结后完结,并持有该完结的成果。

下面是allOf()的一个比如

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Double> future3 = CompletableFuture.supplyAsync(() -> 3.14);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
allFutures.thenRun(() -> {
    System.out.println("All futures completed");
    Integer result1 = future1.join();
    String result2 = future2.join();
    Double result3 = future3.join();
    System.out.println("Result 1: " + result1);
    System.out.println("Result 2: " + result2);
    System.out.println("Result 3: " + result3);
});