1. CompletableFuture 简介

1.1 概述

CompletableFuture是 Java 8 中引入的一个类,它完结了CompletionStage接口,供给了一组丰厚的办法来处理异步操作和多个使命的成果。它支撑链式操作,能够方便地处理使命的依靠联系和成果转化。相比于传统的Future接口,CompletableFuture更加灵活和强大。

1.2 优势与特色

CompletableFuture的运用具有以下优势和特色:

  • 异步履行:CompletableFuture答应使命在后台线程中异步履行,不会堵塞主线程,进步了应用程序的呼应性和功用。
  • 链式操作:经过CompletableFuture供给的办法,能够方便地对使命进行链式操作,构建杂乱的使命依靠联系,完结高效的使命调度和履行。
  • 反常处理:CompletableFuture供给了丰厚的反常处理办法,能够处理使命履行过程中或许发生的反常,并完结灵活的错误处理和回退机制。
  • 多使命组合:CompletableFuture支撑多个使命的并发履行和成果组合。能够轻松地完结多使命并发处理的场景,进步应用程序的功率和并发性。

2. CompletableFuture 的根本用法

2.1 创立 CompletableFuture 目标

运用CompletableFuture创立异步使命十分简单。能够运用CompletableFuture.supplyAsync()CompletableFuture.runAsync() 办法来创立CompletableFuture目标。

2.1.1 运用CompletableFuture.supplyAsync() 办法

运用CompletableFuture.supplyAsync() 办法来创立 CompletableFuture 目标的示例。该办法用于履行具有回来值的使命,并在使命完结时回来成果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 履行具有回来值的使命
   return "使命成果";
});

在上述示例中,咱们运用CompletableFuture.supplyAsync() 办法创立一个具有回来值的 CompletableFuture 目标,使命会在默许的 ForkJoinPool 中异步履行。

2.1.2 运用CompletableFuture.runAsync() 办法

除了CompletableFuture.supplyAsync() 办法,CompletableFuture 还供给了CompletableFuture.runAsync() 办法用于履行没有回来值的使命。

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
   // 履行没有回来值的使命
});

在上述示例中,咱们运用CompletableFuture.runAsync() 办法创立一个没有回来值的 CompletableFuture 目标,使命会在默许的 ForkJoinPool 中异步履行。

2.1.3 指定自定义线程池

咱们还能够经过指定自定义线程池来创立 CompletableFuture 目标,以满意特定的并发需求。

ExecutorService customExecutor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 履行使命的代码
}, customExecutor);

在上述示例中,咱们经过Executors.newFixedThreadPool(10) 创立了一个固定巨细为 10 的自定义线程池,并将其传递给CompletableFuture.supplyAsync() 办法来履行异步使命。

2.2 获取使命成果

获取CompletableFuture使命的成果有多种办法。最常用的办法是运用join() 办法堵塞当时线程,直到使命完结并回来成果。

2.2.1 运用join() 办法

join() 办法是 CompletableFuture 类供给的一种获取使命成果的办法,它会堵塞当时线程,直到使命完结并回来成果。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 履行使命的代码
   return "使命成果";
});
​
String result = future.join();

在上述示例中,咱们运用join() 办法获取使命的成果,并将成果赋值result变量。假如使命还未完结,join() 办法会堵塞当时线程,直到使命完结。

join() 办法和get() 办法十分相似,但join() 办法不会抛出InterruptedExceptionExecutionException反常,而是将反常包装在CompletionException中抛出。因而,它更适合在 Lambda 表达式或流式操作中运用。

2.2.2 运用get() 办法

get() 办法也是 CompletableFuture 类供给的一种获取使命成果的办法,它会堵塞当时线程,直到使命完结并回来成果。与join() 办法不同的是,get() 办法会抛出InterruptedExceptionExecutionException反常,需求进行反常处理。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 履行使命的代码
   return "使命成果";
});
​
try {
   String result = future.get();
} catch (InterruptedException | ExecutionException e) {
   // 反常处理逻辑
}

在上述示例中,咱们运用get() 办法获取使命的成果,并在或许抛出反常的情况下进行反常处理。假如使命还未完结,get() 办法会堵塞当时线程,直到使命完结。

get() 办法的反常处理较为繁琐,需求捕获InterruptedExceptionExecutionException反常,并进行相应的处理。因而,在 Lambda 表达式或流式操作中,推荐运用join() 办法。

2.3 异步回调办法

CompletableFuture 供给了一系列办法来处理使命的完结事情,完结异步回调。咱们将逐个介绍这些办法的差异和用法。

thenApply()

办法签名:thenApply(Function<? super T, ? extends U> fn)

  • 输入参数:上一阶段的使命成果类型为 T。
  • 回来值:新阶段的使命成果类型为 U。
  • 功用:对上一阶段的使命成果进行转化操作,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
   .thenApply(result -> result * 2)
   .thenApply(result -> result + 1);

在上述示例中,咱们运用 thenApply()办法对上一阶段的成果进行转化,将成果乘以 2,并将转化后的成果加 1。每个 thenApply()办法都回来一个新的 CompletableFuture 目标,能够持续链式调用。

thenAccept()

办法签名:thenAccept(Consumer<? super T> action)

  • 输入参数:上一阶段的使命成果类型为 T。
  • 回来值:CompletableFuture,没有回来值。
  • 功用:对上一阶段的使命成果进行消费操作,没有回来值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
   .thenAccept(result -> System.out.println("使命成果:" + result));

在上述示例中,咱们运用 thenAccept()办法对上一阶段的成果进行消费,将成果打印输出。thenAccept()办法没有回来值,仅用于消费使命成果。

thenRun()

办法签名:thenRun(Runnable action)

  • 输入参数:无。
  • 回来值:CompletableFuture,没有回来值。
  • 功用:在上一阶段使命完结后履行给定的 Runnable 使命,没有输入参数和回来值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42)
   .thenRun(() -> System.out.println("使命履行结束"));

在上述示例中,咱们运用 thenRun()办法在上一阶段使命完结后履行一个 Runnable 使命,输出一条使命履行结束的音讯。

2.4 多使命组合回调

CompletableFuture 还供给了一些办法来组合多个使命的成果,完结更杂乱的异步处理逻辑。

thenCombine()

办法签名:thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)

  • 输入参数:另一个 CompletionStage 目标和一个 BiFunction 函数,函数的输入参数分别为上一阶段的使命成果类型 T 和另一个 CompletionStage 目标的使命成果类型 U,函数的回来值类型为 V。
  • 回来值:新阶段的使命成果类型为 V。
  • 功用:当两个 CompletionStage 目标都完结时,将它们的使命成果传递给给定的 BiFunction 函数进行组合处理,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);

在上述示例中,咱们运用 thenCombine()办法将两个使命的成果进行组合,将它们的成果相加并回来新的 CompletableFuture 目标。

thenCompose()

办法签名:thenCompose(Function<? super T, ? extends CompletionStage> fn)

  • 输入参数:一个 Function 函数,函数的输入参数为上一阶段的使命成果类型 T,函数的回来值为另一个 CompletionStage 目标。
  • 回来值:新阶段的使命成果类型为 U。
  • 功用:当上一阶段的使命完结后,将成果传递给给定的 Function 函数,该函数回来一个新的 CompletionStage 目标,新阶段的使命成果类型为 U。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result * 2));

在上述示例中,咱们运用 thenCompose()办法将上一阶段的成果传递给一个 Function 函数,该函数回来一个新的 CompletionStage 目标。新阶段的使命成果为上一阶段成果的两倍。

allOf()

办法签名:allOf(CompletableFuture<?>… cfs)

  • 输入参数:多个 CompletableFuture 目标。
  • 回来值:CompletableFuture,没有回来值。
  • 功用:等候一切给定的 CompletableFuture 目标都完结,回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

在上述示例中,咱们运用 allOf()办法等候一切的 CompletableFuture 目标都完结,回来一个新的 CompletableFuture 目标。这样咱们就能够在该目标上进跋涉一步的处理,例如获取各个 CompletableFuture 的成果。

3. 反常处理与错误处理

3.1 反常处理办法

CompletableFuture 供给了多种办法来处理异步使命履行中或许发生的反常。常用的办法有:

  • exceptionally(Function<Throwable, ? extends T> fn) :当 CompletableFuture 履行过程中发生反常时,运用指定的函数进行反常处理,并回来一个新的 CompletableFuture 目标,其间包含处理成果或默许值。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   throw new RuntimeException("使命履行反常");
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
   System.out.println("反常处理:" + ex.getMessage());
   return 0; // 默许值
});
  • handle(BiFunction<? super T, Throwable, ? extends U> fn) :当 CompletableFuture 履行完结时,运用指定的函数处理成果或反常,并回来一个新的 CompletableFuture 目标。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
   if (ex != null) {
     System.out.println("反常处理:" + ex.getMessage());
     return "默许值";
   } else {
     return "成果:" + result;
   }
});

3.2 错误处理与反常链

CompletableFuture 还支撑反常链,能够将多个 CompletableFuture 的反常连接起来,形成反常链。能够运用exceptionally()handle() 办法来完结反常链的处理。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
   throw new RuntimeException("使命履行反常");
});
CompletableFuture<Integer> handledFuture = future.exceptionally(ex -> {
   System.out.println("反常处理:" + ex.getMessage());
   throw new CustomException("自定义反常", ex);
});

在上述示例中,咱们经过exceptionally() 办法处理使命的反常,并抛出一个自定义反常,并将原始反常作为反常链的一部分传递下去。

4. 自定义线程池与资源管理

4.1 默许线程池与 ForkJoinPool

CompletableFuture 默许运用 ForkJoinPool 线程池来履行异步使命。ForkJoinPool 是一种根据工作盗取算法的线程池,适用于使命分解和并行计算。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 异步使命的代码
});

在上述代码中,CompletableFuture 会在默许的 ForkJoinPool 中异步履行使命。

4.2 自定义线程池

除了运用默许线程池,咱们还能够自定义线程池来满意特定的需求。自定义线程池能够经过Executors类来创立。

ExecutorService customExecutor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 异步使命的代码
}, customExecutor);

在上述代码中,咱们创立了一个固定巨细为 10 的自定义线程池,并将其传递给 CompletableFuture 来履行异步使命。

4.3 资源管理与关闭线程池

在运用自定义线程池时,需求留意及时关闭线程池以释放资源。能够运用ExecutorServiceshutdown()shutdownNow() 办法来关闭线程池。

ExecutorService customExecutor = Executors.newFixedThreadPool(10);
// 异步使命代码
customExecutor.shutdown();

在上述代码中,咱们在使命完结后调用了shutdown() 办法来关闭线程池。

5. 并发使命的调度与操控

5.1 异步使命的并发度操控

CompletableFuture 答应咱们操控并发使命的履行数量。能够经过自定义线程池的巨细来限制并发度。

ExecutorService customExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
   // 异步使命1的代码
}, customExecutor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
   // 异步使命2的代码
}, customExecutor);
// ...

在上述代码中,咱们经过自定义线程池的巨细为 5 来限制并发使命的数量。

5.2 使命的超时处理

CompletableFuture 还供给了超时处理的功用,能够操控使命的最大履行时间。能够运用completeOnTimeout() 办法来完结超时处理。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 异步使命的代码
}).completeOnTimeout("默许值", 5, TimeUnit.SECONDS);

在上述代码中,咱们指定使命的最大履行时间为 5 秒,假如使命在规定时间内没有完结,将回来默许值。

5.3 中止与撤销使命

在某些情况下,咱们或许需求中止或撤销正在履行的使命。CompletableFuture 供给了cancel() 办法来撤销使命的履行。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
   // 异步使命的代码
});
boolean canceled = future.cancel(true);

在上述代码中,咱们调用cancel() 办法来撤销使命的履行,并传递一个布尔值表明是否中止正在履行的使命。

6. CompletableFuture 的进阶应用

6.1 CompletableFuture 与 IO 操作

CompletableFuture 在处理 IO 操作时十分有用。能够将 IO 操作封装为 CompletableFuture 使命,运用 CompletableFuture 的异步特性进步 IO 操作的功率。

CompletableFuture<String> readData = CompletableFuture.supplyAsync(() -> {
  // 履行读取数据的IO操作
  return "读取的数据";
});
CompletableFuture<Void> processData = readData.thenAccept(data -> {
  // 处理读取到的数据
  System.out.println("读取到的数据:" + data);
  // 履行处理数据的操作
});
CompletableFuture<Void> writeData = processData.thenRun(() -> {
  // 履行写入数据的IO操作
  System.out.println("数据写入完结");
});
​
writeData.join();

在上述代码中,咱们运用 CompletableFuture 处理了一个包含读取数据、处理数据和写入数据的 IO 操作流程。经过异步履行和链式操作,能够有效地运用 CPU 和 IO 资源,进步程序的呼应性和吞吐量。

6.2 CompletableFuture 与网络恳求

CompletableFuture 也能够很好地与网络恳求结合运用。咱们能够运用 CompletableFuture 建议多个网络恳求,并在一切恳求完结后处理成果。

CompletableFuture<String> request1 = CompletableFuture.supplyAsync(() -> {
  // 建议网络恳求1
  return "恳求1成果";
});
CompletableFuture<String> request2 = CompletableFuture.supplyAsync(() -> {
  // 建议网络恳求2
  return "恳求2成果";
});
CompletableFuture<String> request3 = CompletableFuture.supplyAsync(() -> {
  // 建议网络恳求3
  return "恳求3成果";
});
CompletableFuture<Void> allRequests = CompletableFuture.allOf(request1, request2, request3);
allRequests.thenRun(() -> {
  // 一切恳求完结后的处理逻辑
  String result1 = request1.join();
  String result2 = request2.join();
  String result3 = request3.join();
  // 对恳求成果进行处理
});

在上述代码中,咱们运用 CompletableFuture 建议了三个网络恳求,并经过allOf() 办法等候一切恳求完结。在一切恳求完结后,咱们能够运用join() 办法获取各个恳求的成果,并进行后续处理。

7. 实战事例

事务背景: 在电商项目的售后事务中,当客服接收到用户的售后申请时,需求进行一系列操作,包含查询订单信息、查询 ERP 中的商品信息、查询用户信息,以及创立售后工单。

代码完结:

public CompletableFuture<Void> processAfterSalesRequest(String orderId, String customerId) {
   CompletableFuture<Order> orderFuture = CompletableFuture.supplyAsync(() -> getOrderInfo(orderId));
   CompletableFuture<Inventory> inventoryFuture = CompletableFuture.supplyAsync(() -> getInventoryInfo(orderId));
   CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUserInfo(customerId));
   return CompletableFuture.allOf(orderFuture, inventoryFuture, userFuture)
     .thenApplyAsync(ignored -> {
       Order order = orderFuture.join();
       Inventory inventory = inventoryFuture.join();
       User user = userFuture.join();
       // 创立售后工单
       createAfterSalesTicket(order, inventory, user);
       return null;
     });
}
private Order getOrderInfo(String orderId) {
   // 查询订单信息的逻辑
   // ...
   return order;
}
​
private Inventory getInventoryInfo(String orderId) {
   // 查询ERP中商品信息的逻辑
   // ...
   return inventory;
}
private User getUserInfo(String customerId) {
   // 查询用户信息的逻辑
   // ...
   return user;
}
private void createAfterSalesTicket(Order order, Inventory inventory, User user) {
   // 创立售后工单的逻辑
   // ...
}

在上述代码中,咱们运用CompletableFuture.supplyAsync() 办法分别查询订单信息、ERP 中的商品信息和用户信息,然后运用CompletableFuture.allOf() 办法等候一切查询使命完结。完结后,咱们能够经过join() 办法获取各个查询使命的成果,并将成果传递给createAfterSalesTicket() 办法来创立售后工单。

8. 总结

CompletableFuture 是供给了丰厚的功用和办法。它能简化并发使命处理,进步系统功用和呼应性。经过了解其根本用法、进阶应用和最佳实践,咱们能够灵活处理异步回调、使命组合、反常处理和资源管理。

欢迎点赞收藏加重视