为什么要运用Future
线程获取到运转成果有几种方式
public class Sum {
private Sum(){}
public static int sum(int n){
int sum = 0;
for (int i = 0; i < n; i++) {
sum += n;
}
return sum;
}
}
Thread.sleep()
private static int sum_sleep = 0;
Thread thread = new Thread(() -> sum_sleep = Sum.sum(100));
thread.start();
TimeUnit.SECONDS.sleep(1);
System.out.printf("get result by thread.sleep: %d\n", sum_sleep);
运用sleep()
办法获取,这种办法,有不可控性,也许sleep
1秒钟,可是线程还没有履行完结,可能会导致获取到的成果不准确。
Thread.join()
private static int sum_join = 0;
Thread thread = new Thread(() -> sum_join = Sum.sum(100));
thread.start();
thread.join();
System.out.printf("get result by thread.join: %d\n", sum_join);
循环
private static int sum_loop = 0;
private static volatile boolean flag;
Thread thread = new Thread(() -> {
sum_loop = Sum.sum(100);
flag = true;
});
thread.start();
int i = 0;
while (!flag) {
i++;
}
System.out.printf("get result by loopLock: %d\n", sum_loop);
notifyAll() / wait()
private static class NotifyAndWaitTest {
private Integer sum = null;
private synchronized void sum_wait_notify() {
sum = Sum.sum(100);
notifyAll();
}
private synchronized Integer getSum() {
while (sum == null) {
try {
wait();
} catch (Exception e) {
e.printStackTrace();
}
}
return sum;
}
}
private static void getResultByNotifyAndWait() throws Exception {
NotifyAndWaitTest test = new NotifyAndWaitTest();
new Thread(test::sum_wait_notify).start();
System.out.printf("get result by NotifyAndWait: %d\n", test.getSum());
}
Lock & Condition
private static class LockAndConditionTest {
private Integer sum = null;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public void sum() {
try {
lock.lock();
sum = Sum.sum(100);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public Integer getSum() {
try {
lock.lock();
while (Objects.isNull(sum)) {
try {
condition.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return sum;
}
}
private static void getResultByLockAndCondition() throws Exception {
LockAndConditionTest test = new LockAndConditionTest();
new Thread(test::sum).start();
System.out.printf("get result by lock and condition: %d\n", test.getSum());
}
BlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
new Thread(() -> queue.offer(Sum.sum(100))).start();
System.out.printf("get result by blocking queue: %d\n", queue.take());
CountDownLatch
private static int sum_countDownLatch = 0;
private static void getResultByCountDownLatch() {
CountDownLatch latch = new CountDownLatch(1);
new Thread(
() -> {
sum_countDownLatch = Sum.sum(100);
latch.countDown();
})
.start();
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.printf("get result by countDownLatch: %d\n", sum_countDownLatch);
}
CyclicBarrier
private static int sum_cyclicBarrier = 0;
private static void getResultByCycleBarrier() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(
() -> {
sum_cyclicBarrier = Sum.sum(100);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
})
.start();
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.printf("get result by cyclicBarrier: %d\n", sum_cyclicBarrier);
}
Semaphore
private static int sum_semaphore = 0;
private static void getResultBySemaphore() {
Semaphore semaphore = new Semaphore(0);
new Thread(
() -> {
sum_semaphore = Sum.sum(100);
semaphore.release();
})
.start();
try {
semaphore.acquire();
System.out.printf("get result by semaphore: %d\n", sum_semaphore);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面提到的获取线程履行成果的办法,暂时根据之前学到的内容,我只能想到这些。这些完结方式也不是很高雅,不是最佳实践。
线程池,利用ThreadPoolExecutor
的execute(Runnable command)
办法,利用这个办法虽然能够提交使命,可是却没有办法获取使命履行成果。
那么咱们假如需求获取使命的履行成果并且高雅的完结,能够经过Future
接口和Callable
接口配合完结, 本文将会经过详细的例子解说怎么运用Future
。
Future
最主要的作用是,比方当做比较耗时运算的时候,假如咱们一直在原地等候办法回来,显然是不明智的,全体程序的运转效率会大大下降。咱们能够把运算的进程放到子线程去履行,再经过Future
去控制子线程履行的核算进程,终究获取到核算成果。这样一来就能够把整个程序的运转效率进步,是一种异步的思想。
怎么运用Future
要想运用Future
首先得先了解一下Callable
。Callable
接口比较于 Runnable
的一大优势是能够有回来成果,那这个回来成果怎么获取呢?就能够用 Future
类的 get 办法来获取 。因此,Future
相当于一个存储器,它存储了 Callable
的call
办法的使命成果。
一般情况下,Future,Callable,ExecutorService
是一起运用的,ExecutorService
里相关的代码如下:
// 提交 Runnable 使命
// 由于Runnable接口的run办法没有回来值,所以,Future仅仅是用来断言使命现已完毕,有点相似join();
Future<?> submit(Runnable task);
// 提交 Callable 使命
// Callable里的call办法是有回来值的,所以这个办法回来的Future对象能够经过调用其get()办法来获取使命的执
//行成果。
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 使命及成果引证
// Future的回来值便是传给submit()办法的参数result。
<T> Future<T> submit(Runnable task, T result);
详细运用办法如下:
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> Sum.sum(100));
System.out.printf("get result by Callable + Future: %d\n", future.get());
executor.shutdown();
Future完结原理
Future根本概述
Future
接口5个办法:
// 撤销使命
boolean cancel(boolean mayInterruptIfRunning);
// 判别使命是否已撤销
boolean isCancelled();
// 判别使命是否已完毕
boolean isDone();
// 获得使命履行成果 堵塞,被调用时,假如使命还没有履行完,那么调用get()办法的线程会堵塞。直到使命履行完
// 才会被唤醒
get();
// 获得使命履行成果,支撑超时
get(long timeout, TimeUnit unit);
-
cancel(boolean mayInterruptIfRunning)
:- 用来撤销异步使命的履行。
- 假如异步使命现已完结或许现已被撤销,或许由于某些原因不能撤销,则会回来
false
。 - 假如使命还没有被履行,则会回来
true
并且异步使命不会被履行。 - 假如使命现已开端履行了可是还没有履行完结,若
mayInterruptIfRunning
为true
,则会立即中止履行使命的线程并回来true
,若mayInterruptIfRunning
为false
,则会回来true
且不会中止使命履行线程。
-
isCanceled()
:- 判别使命是否被撤销。
- 假如使命在完毕(正常履行完毕或许履行反常完毕)前被撤销则回来
true
,不然回来false
。
-
isDone()
:- 判别使命是否现已完结,假如完结则回来
true
,不然回来false
。 - 使命履行进程中发生反常、使命被撤销也归于使命已完结,也会回来
true
。
- 判别使命是否现已完结,假如完结则回来
-
get()
:- 获取使命履行成果,假如使命还没完结则会堵塞等候直到使命履行完结。
- 假如使命被撤销则会抛出
CancellationException
反常。 - 假如使命履行进程发生反常则会抛出
ExecutionException
反常。 - 假如堵塞等候进程中被中止则会抛出
InterruptedException
反常。
-
get(long timeout,Timeunit unit)
:- 带超时时刻的
get()
版别,上面叙述的get()
办法,相同适用这里。 - 假如堵塞等候进程中超时则会抛出
TimeoutException
反常。
- 带超时时刻的
运用IDEA,查看Future
的完结类其实有很多,比方FutureTask,ForkJoinTask,CompletableFuture
等,其他根本是继承了ForkJoinTask
完结的内部类。
本篇文章主要解说FutureTask
的完结原理
FutureTask根本概述
FutureTask
为 Future
供给了根底完结,如获取使命履行成果(get)
和撤销使命(cancel)
等。假如使命没有完结,获取使命履行成果时将会堵塞。一旦履行完毕,使命就不能被重启或撤销(除非运用runAndReset
履行核算)。FutureTask
常用来封装 Callable
和 Runnable
,也能够作为一个使命提交到线程池中履行。除了作为一个独立的类之外,此类也供给了一些功能性函数供咱们创立自定义 task
类运用。FutureTask 的线程安全由CAS
来保证。
// 创立 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创立线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交 FutureTask
es.submit(futureTask);
// 获取核算成果
Integer result = futureTask.get();
// 创立 FutureTask
FutureTask<Integer> futureTask
= new FutureTask<>(()-> 1+2);
// 创立并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取核算成果
Integer result = futureTask.get();
FutureTask
能够很简单获取子线程的履行成果。
FutureTask完结原理
构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
FutureTask
供给了两个构造器
-
Callable
接口有回来,将callable
赋值给this.callable
。
-
Runnable
接口无回来,假如想要获取到履行成果,需求传V result
给FutureTask
,FutureTask
将Runnable
和result
封装成Callable
,再将callable
赋值给this.callable
。 - 状况初始化状况为
NEW
FutureTask
内置状况有:
private volatile int state; // 可见性
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
- NEW 初始状况
- COMPLETING 使命现已履行完(正常或许反常),准备赋值成果,可是这个状况会时刻会比较短,归于中间状况。
- NORMAL 使命现已正常履行完,并已将使命回来值赋值到成果
- EXCEPTIONAL 使命履行失利,并将反常赋值到成果
- CANCELLED 撤销
- INTERRUPTING 准备测验中止履行使命的线程
- INTERRUPTED 对履行使命的线程进行中止(未必中止到)
状况转化:
run()履行流程
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set()
protected void set(V v) {
// state变量,经过CAS操作,将NEW->COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 将成果赋值给outcome特点
outcome = v;
// state状况直接赋值为NORMAL,不需求CAS
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
setException()
protected void setException(Throwable t) {
// state变量,经过CAS操作,将NEW->COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 将反常赋值给outcome特点
outcome = t;
// state状况直接赋值为EXCEPTIONAL,不需求CAS
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
finishCompletion()
set()
和setException()
两个办法终究都调用了finishCompletion()
办法,完结一些善后工作,详细流程如下:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// 移除等候线程
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 自旋遍历等候线程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒等候线程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 使命完结后调用函数,自定义扩展
done();
callable = null; // to reduce footprint
}
handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
// 在中止者中止线程之前可能会延迟,所以咱们只需求让出CPU时刻片自旋等候
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
get()履行流程
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// awaitDone用于等候使命完结,或使命由于中止或超时而停止。回来使命的完结状况。
s = awaitDone(false, 0L);
return report(s);
}
详细流程:
awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
// 获取到当时状况
int s = state;
// 假如当时状况不为NEW或许COMPLETING
if (s > COMPLETING) {
if (q != null)
q.thread = null;
// 直接回来state
return s;
}
// COMPLETING是一个很时刻短的状况,调用Thread.yield希望让出时刻片,之后重试循环。
else if (s == COMPLETING)
Thread.yield();
// 假如堵塞线程被中止则将当时线程从堵塞行列中移除
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 新进来的线程增加等候节点
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
/*
* 这是Treiber Stack算法入栈的逻辑。
* Treiber Stack是一个根据CAS的无锁并发栈完结,
* 更多能够参考https://en.wikipedia.org/wiki/Treiber_Stack
*/
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
// 超时,移除栈中节点。
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
// 未超市并且状况为NEW,堵塞当时线程
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
removeWaiter()
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 假如当时节点仍有效,则置pred为当时节点,继续遍历。
if (q.thread != null)
pred = q;
/*
* 当时节点已无效且有前驱,则将前驱的后继置为当时节点的后继完结删去节点。
* 假如前驱节点已无效,则从头遍历waiters栈。
*/
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
/*
* 当时节点已无效,且当时节点没有前驱,则将栈顶置为当时节点的后继。
* 失利的话从头遍历waiters栈。
*/
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
report()
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
cancel()履行流程
public boolean cancel(boolean mayInterruptIfRunning) {
// 状况机不是NEW 或CAS更新状况 流转到INTERRUPTING或许CANCELLED失利,不允许cancel
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 假如要求中止履行中的使命,则直接中止使命履行线程,并更新状况机为终究状况INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
经典事例
引证极客时刻-java并发编程课程的事例烧水泡茶:
并发编程能够总结为三个核心问题:分工,同步和互斥。编写并发程序,首先要做分工。
- T1负责洗水壶,烧开水,泡茶这三道工序
- T2负责洗茶壶,洗茶杯,拿茶叶三道工序。
- T1在履行泡茶这道工序需求等到T2完结拿茶叶的工作。(join,countDownLatch,堵塞行列都能够完结)
// 创立使命 T2 的 FutureTask
FutureTask<String> ft2
= new FutureTask<>(new T2Task());
// 创立使命 T1 的 FutureTask
FutureTask<String> ft1
= new FutureTask<>(new T1Task(ft2));
// 线程 T1 履行使命 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 履行使命 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等候线程 T1 履行成果
System.out.println(ft1.get());
// T1Task 需求履行的使命:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
FutureTask<String> ft2;
// T1 使命需求 T2 使命的 FutureTask
T1Task(FutureTask<String> ft2){
this.ft2 = ft2;
}
@Override
String call() throws Exception {
System.out.println("T1: 洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1: 烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取 T2 线程的茶叶
String tf = ft2.get();
System.out.println("T1: 拿到茶叶:"+tf);
System.out.println("T1: 泡茶...");
return " 上茶:" + tf;
}
}
// T2Task 需求履行的使命:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
String call() throws Exception {
System.out.println("T2: 洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2: 洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2: 拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return " 龙井 ";
}
}
// 一次履行成果:
//T1: 洗水壶...
//T2: 洗茶壶...
//T1: 烧开水...
//T2: 洗茶杯...
//T2: 拿茶叶...
//T1: 拿到茶叶: 龙井
//T1: 泡茶...
//上茶: 龙井