为什么要运用Future

线程获取到运转成果有几种方式

public class Sum {
 private Sum(){}
 public static int sum(int n){
  int sum = 0;
  for (int i = 0; i < n; i++) {
   sum += n;
   }
  return sum;
  }
}

Thread.sleep()

private static int sum_sleep = 0;
Thread thread = new Thread(() -> sum_sleep = Sum.sum(100));
thread.start();
TimeUnit.SECONDS.sleep(1);
System.out.printf("get result by thread.sleep: %d\n", sum_sleep);

运用sleep()办法获取,这种办法,有不可控性,也许sleep1秒钟,可是线程还没有履行完结,可能会导致获取到的成果不准确。

Thread.join()

private static int sum_join = 0;
Thread thread = new Thread(() -> sum_join = Sum.sum(100));
thread.start();
thread.join();
System.out.printf("get result by thread.join: %d\n", sum_join);

循环

private static int sum_loop = 0;
private static volatile boolean flag;
​
Thread thread = new Thread(() -> {
 sum_loop = Sum.sum(100);
 flag = true;
});
thread.start();
int i = 0;
while (!flag) {
 i++;
}
System.out.printf("get result by loopLock: %d\n", sum_loop);

notifyAll() / wait()

private static class NotifyAndWaitTest {
​
  private Integer sum = null;
​
  private synchronized void sum_wait_notify() {
   sum = Sum.sum(100);
   notifyAll();
   }
​
  private synchronized Integer getSum() {
   while (sum == null) {
    try {
     wait();
     } catch (Exception e) {
     e.printStackTrace();
     }
    }
   return sum;
   }
}
private static void getResultByNotifyAndWait() throws Exception {
  NotifyAndWaitTest test = new NotifyAndWaitTest();
  new Thread(test::sum_wait_notify).start();
  System.out.printf("get result by NotifyAndWait: %d\n", test.getSum());
}

Lock & Condition

private static class LockAndConditionTest {
​
  private Integer sum = null;
  private final Lock lock = new ReentrantLock();
  private final Condition condition = lock.newCondition();
​
  public void sum() {
   try {
    lock.lock();
    sum = Sum.sum(100);
    condition.signalAll();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
   }
​
  public Integer getSum() {
   try {
    lock.lock();
    while (Objects.isNull(sum)) {
     try {
      condition.await();
      } catch (Exception e) {
      throw new RuntimeException(e);
      }
     }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    lock.unlock();
    }
   return sum;
   }
}
​
private static void getResultByLockAndCondition() throws Exception {
 LockAndConditionTest test = new LockAndConditionTest();
 new Thread(test::sum).start();
 System.out.printf("get result by lock and condition: %d\n", test.getSum());
}

BlockingQueue

BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1);
new Thread(() -> queue.offer(Sum.sum(100))).start();
System.out.printf("get result by blocking queue: %d\n", queue.take());

CountDownLatch

private static int sum_countDownLatch = 0;
​
private static void getResultByCountDownLatch() {
 CountDownLatch latch = new CountDownLatch(1);
​
 new Thread(
      () -> {
      sum_countDownLatch = Sum.sum(100);
      latch.countDown();
      })
    .start();
 try {
  latch.await();
  } catch (Exception e) {
  e.printStackTrace();
  }
 System.out.printf("get result by countDownLatch: %d\n", sum_countDownLatch);
}

CyclicBarrier

private static int sum_cyclicBarrier = 0;
​
private static void getResultByCycleBarrier() {
 CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
 new Thread(
   () -> {
   sum_cyclicBarrier = Sum.sum(100);
   try {
    cyclicBarrier.await();
    } catch (Exception e) {
    e.printStackTrace();
    }
   })
   .start();
 try {
  cyclicBarrier.await();
  } catch (Exception e) {
  e.printStackTrace();
  }
 System.out.printf("get result by cyclicBarrier: %d\n", sum_cyclicBarrier);
}

Semaphore

private static int sum_semaphore = 0;
private static void getResultBySemaphore() {
 Semaphore semaphore = new Semaphore(0);
 new Thread(
   () -> {
   sum_semaphore = Sum.sum(100);
   semaphore.release();
   })
   .start();
​
 try {
  semaphore.acquire();
  System.out.printf("get result by semaphore: %d\n", sum_semaphore);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
}

上面提到的获取线程履行成果的办法,暂时根据之前学到的内容,我只能想到这些。这些完结方式也不是很高雅,不是最佳实践。

线程池,利用ThreadPoolExecutorexecute(Runnable command)办法,利用这个办法虽然能够提交使命,可是却没有办法获取使命履行成果。

那么咱们假如需求获取使命的履行成果并且高雅的完结,能够经过Future接口和Callable接口配合完结, 本文将会经过详细的例子解说怎么运用Future

Future最主要的作用是,比方当做比较耗时运算的时候,假如咱们一直在原地等候办法回来,显然是不明智的,全体程序的运转效率会大大下降。咱们能够把运算的进程放到子线程去履行,再经过Future去控制子线程履行的核算进程,终究获取到核算成果。这样一来就能够把整个程序的运转效率进步,是一种异步的思想。

怎么运用Future

要想运用Future首先得先了解一下CallableCallable 接口比较于 Runnable 的一大优势是能够有回来成果,那这个回来成果怎么获取呢?就能够用 Future 类的 get 办法来获取 。因此,Future 相当于一个存储器,它存储了 Callablecall办法的使命成果。

一般情况下,Future,Callable,ExecutorService是一起运用的,ExecutorService里相关的代码如下:

// 提交 Runnable 使命
// 由于Runnable接口的run办法没有回来值,所以,Future仅仅是用来断言使命现已完毕,有点相似join();
Future<?> submit(Runnable task);
// 提交 Callable 使命
// Callable里的call办法是有回来值的,所以这个办法回来的Future对象能够经过调用其get()办法来获取使命的执
//行成果。
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 使命及成果引证 
// Future的回来值便是传给submit()办法的参数result。
<T> Future<T> submit(Runnable task, T result);

详细运用办法如下:

ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> Sum.sum(100));
​
System.out.printf("get result by Callable + Future: %d\n", future.get());
executor.shutdown();

Future完结原理

Future根本概述

Future接口5个办法:

// 撤销使命
boolean cancel(boolean mayInterruptIfRunning);
// 判别使命是否已撤销 
boolean isCancelled();
// 判别使命是否已完毕
boolean isDone();
// 获得使命履行成果 堵塞,被调用时,假如使命还没有履行完,那么调用get()办法的线程会堵塞。直到使命履行完
// 才会被唤醒
get();
// 获得使命履行成果,支撑超时
get(long timeout, TimeUnit unit);
​
  • cancel(boolean mayInterruptIfRunning)

    • 用来撤销异步使命的履行。
    • 假如异步使命现已完结或许现已被撤销,或许由于某些原因不能撤销,则会回来false
    • 假如使命还没有被履行,则会回来true并且异步使命不会被履行。
    • 假如使命现已开端履行了可是还没有履行完结,若mayInterruptIfRunningtrue,则会立即中止履行使命的线程并回来true,若mayInterruptIfRunningfalse,则会回来true且不会中止使命履行线程。
  • isCanceled()

    • 判别使命是否被撤销。
    • 假如使命在完毕(正常履行完毕或许履行反常完毕)前被撤销则回来true,不然回来false
  • isDone()

    • 判别使命是否现已完结,假如完结则回来true,不然回来false
    • 使命履行进程中发生反常、使命被撤销也归于使命已完结,也会回来true
  • get()

    • 获取使命履行成果,假如使命还没完结则会堵塞等候直到使命履行完结。
    • 假如使命被撤销则会抛出CancellationException反常。
    • 假如使命履行进程发生反常则会抛出ExecutionException反常。
    • 假如堵塞等候进程中被中止则会抛出InterruptedException反常。
  • get(long timeout,Timeunit unit):

    • 带超时时刻的get()版别,上面叙述的get()办法,相同适用这里。
    • 假如堵塞等候进程中超时则会抛出TimeoutException反常。

运用IDEA,查看Future的完结类其实有很多,比方FutureTask,ForkJoinTask,CompletableFuture等,其他根本是继承了ForkJoinTask完结的内部类。

java线程-如何优雅地获取线程的执行结果

本篇文章主要解说FutureTask的完结原理

FutureTask根本概述

FutureTaskFuture 供给了根底完结,如获取使命履行成果(get)和撤销使命(cancel)等。假如使命没有完结,获取使命履行成果时将会堵塞。一旦履行完毕,使命就不能被重启或撤销(除非运用runAndReset履行核算)。FutureTask 常用来封装 CallableRunnable,也能够作为一个使命提交到线程池中履行。除了作为一个独立的类之外,此类也供给了一些功能性函数供咱们创立自定义 task 类运用。FutureTask 的线程安全CAS来保证。

// 创立 FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
// 创立线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交 FutureTask 
es.submit(futureTask);
// 获取核算成果
Integer result = futureTask.get();
​
// 创立 FutureTask
FutureTask<Integer> futureTask
 = new FutureTask<>(()-> 1+2);
// 创立并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
// 获取核算成果
Integer result = futureTask.get();
​

FutureTask能够很简单获取子线程的履行成果。

FutureTask完结原理

构造函数

public FutureTask(Callable<V> callable) {
 if (callable == null)
  throw new NullPointerException();
 this.callable = callable;
 this.state = NEW;    // ensure visibility of callable
}
​
public FutureTask(Runnable runnable, V result) {
 this.callable = Executors.callable(runnable, result);
 this.state = NEW;    // ensure visibility of callable
}

FutureTask供给了两个构造器

  • Callable接口有回来,将callable赋值给this.callable
  • Runnable接口无回来,假如想要获取到履行成果,需求传V resultFutureTaskFutureTaskRunnableresult封装成Callable,再将callable赋值给this.callable
  • 状况初始化状况为NEW

FutureTask内置状况有:

private volatile int state; // 可见性
private static final int NEW     = 0;
private static final int COMPLETING  = 1;
private static final int NORMAL    = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED  = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
  • NEW 初始状况
  • COMPLETING 使命现已履行完(正常或许反常),准备赋值成果,可是这个状况会时刻会比较短,归于中间状况。
  • NORMAL 使命现已正常履行完,并已将使命回来值赋值到成果
  • EXCEPTIONAL 使命履行失利,并将反常赋值到成果
  • CANCELLED 撤销
  • INTERRUPTING 准备测验中止履行使命的线程
  • INTERRUPTED 对履行使命的线程进行中止(未必中止到)

状况转化

java线程-如何优雅地获取线程的执行结果

run()履行流程

public void run() {
  if (state != NEW ||
    !RUNNER.compareAndSet(this, null, Thread.currentThread()))
    return;
  try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
      V result;
      boolean ran;
      try {
        result = c.call();
        ran = true;
       } catch (Throwable ex) {
        result = null;
        ran = false;
        setException(ex);
       }
      if (ran)
        set(result);
     }
   } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
      handlePossibleCancellationInterrupt(s);
   }
}

java线程-如何优雅地获取线程的执行结果

set()

protected void set(V v) {
    // state变量,经过CAS操作,将NEW->COMPLETING
  if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    // 将成果赋值给outcome特点
    outcome = v;
    // state状况直接赋值为NORMAL,不需求CAS
    STATE.setRelease(this, NORMAL); // final state
    finishCompletion();
   }
}

setException()

protected void setException(Throwable t) {
  // state变量,经过CAS操作,将NEW->COMPLETING
  if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    // 将反常赋值给outcome特点
    outcome = t;
    // state状况直接赋值为EXCEPTIONAL,不需求CAS
    STATE.setRelease(this, EXCEPTIONAL); // final state
    finishCompletion();
   }
}

finishCompletion()

set()setException()两个办法终究都调用了finishCompletion()办法,完结一些善后工作,详细流程如下:

private void finishCompletion() {
  // assert state > COMPLETING;
  for (WaitNode q; (q = waiters) != null;) {
    // 移除等候线程
    if (WAITERS.weakCompareAndSet(this, q, null)) {
      // 自旋遍历等候线程
      for (;;) {
        Thread t = q.thread;
        if (t != null) {
          q.thread = null;
          // 唤醒等候线程
          LockSupport.unpark(t);
         }
        WaitNode next = q.next;
        if (next == null)
          break;
        q.next = null; // unlink to help gc
        q = next;
       }
      break;
     }
   }
  // 使命完结后调用函数,自定义扩展
  done();
​
  callable = null;    // to reduce footprint
}

handlePossibleCancellationInterrupt()

private void handlePossibleCancellationInterrupt(int s) {
  if (s == INTERRUPTING)
    // 在中止者中止线程之前可能会延迟,所以咱们只需求让出CPU时刻片自旋等候
    while (state == INTERRUPTING)
      Thread.yield(); // wait out pending interrupt
}

get()履行流程

public V get() throws InterruptedException, ExecutionException {
  int s = state;
  if (s <= COMPLETING)
    // awaitDone用于等候使命完结,或使命由于中止或超时而停止。回来使命的完结状况。
    s = awaitDone(false, 0L);
  return report(s);
}

详细流程:

java线程-如何优雅地获取线程的执行结果

awaitDone()

private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
  long startTime = 0L;  // Special value 0L means not yet parked
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    // 获取到当时状况
    int s = state;
    // 假如当时状况不为NEW或许COMPLETING
    if (s > COMPLETING) {
      if (q != null)
        q.thread = null;
      // 直接回来state
      return s;
     }
    // COMPLETING是一个很时刻短的状况,调用Thread.yield希望让出时刻片,之后重试循环。
    else if (s == COMPLETING)
      Thread.yield();
    // 假如堵塞线程被中止则将当时线程从堵塞行列中移除
    else if (Thread.interrupted()) {
      removeWaiter(q);
      throw new InterruptedException();
     }
    
    //  新进来的线程增加等候节点
    else if (q == null) {
      if (timed && nanos <= 0L)
        return s;
      q = new WaitNode();
     }
    else if (!queued)
      /*
       *  这是Treiber Stack算法入栈的逻辑。
       *  Treiber Stack是一个根据CAS的无锁并发栈完结,
       *  更多能够参考https://en.wikipedia.org/wiki/Treiber_Stack
       */
      queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    else if (timed) {
      final long parkNanos;
      if (startTime == 0L) { // first time
        startTime = System.nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
       } else {
        long elapsed = System.nanoTime() - startTime;
        if (elapsed >= nanos) {
          // 超时,移除栈中节点。
          removeWaiter(q);
          return state;
         }
        parkNanos = nanos - elapsed;
       }
      // nanoTime may be slow; recheck before parking
      // 未超市并且状况为NEW,堵塞当时线程
      if (state < COMPLETING)
        LockSupport.parkNanos(this, parkNanos);
     }
    else
      LockSupport.park(this);
   }
}

removeWaiter()

private void removeWaiter(WaitNode node) {
 if (node != null) {
  node.thread = null;
  retry:
  for (;;) {     // restart on removeWaiter race
   for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
    s = q.next;
    // 假如当时节点仍有效,则置pred为当时节点,继续遍历。
    if (q.thread != null)
     pred = q;
    /*
    * 当时节点已无效且有前驱,则将前驱的后继置为当时节点的后继完结删去节点。
    * 假如前驱节点已无效,则从头遍历waiters栈。
    */
    else if (pred != null) {
     pred.next = s;
     if (pred.thread == null) // check for race
      continue retry;
     }
    /*
    * 当时节点已无效,且当时节点没有前驱,则将栈顶置为当时节点的后继。
    * 失利的话从头遍历waiters栈。
    */
    else if (!WAITERS.compareAndSet(this, q, s))
     continue retry;
    }
   break;
   }
  }
}

report()

private V report(int s) throws ExecutionException {
  Object x = outcome;
  if (s == NORMAL)
    return (V)x;
  if (s >= CANCELLED)
    throw new CancellationException();
  throw new ExecutionException((Throwable)x);
}

cancel()履行流程

public boolean cancel(boolean mayInterruptIfRunning) {
  // 状况机不是NEW 或CAS更新状况 流转到INTERRUPTING或许CANCELLED失利,不允许cancel
  if (!(state == NEW && STATE.compareAndSet
      (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
    return false;
  try {  // in case call to interrupt throws exception
    // 假如要求中止履行中的使命,则直接中止使命履行线程,并更新状况机为终究状况INTERRUPTED
    if (mayInterruptIfRunning) {
      try {
        Thread t = runner;
        if (t != null)
          t.interrupt();
       } finally { // final state
        STATE.setRelease(this, INTERRUPTED);
       }
     }
   } finally {
    finishCompletion();
   }
  return true;
}

经典事例

引证极客时刻-java并发编程课程的事例烧水泡茶:

java线程-如何优雅地获取线程的执行结果

并发编程能够总结为三个核心问题:分工,同步和互斥。编写并发程序,首先要做分工。

  1. T1负责洗水壶,烧开水,泡茶这三道工序
  2. T2负责洗茶壶,洗茶杯,拿茶叶三道工序。
  3. T1在履行泡茶这道工序需求等到T2完结拿茶叶的工作。(join,countDownLatch,堵塞行列都能够完结)

java线程-如何优雅地获取线程的执行结果

// 创立使命 T2 的 FutureTask
FutureTask<String> ft2
 = new FutureTask<>(new T2Task());
// 创立使命 T1 的 FutureTask
FutureTask<String> ft1
 = new FutureTask<>(new T1Task(ft2));
// 线程 T1 履行使命 ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程 T2 履行使命 ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等候线程 T1 履行成果
System.out.println(ft1.get());
​
// T1Task 需求履行的使命:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String>{
 FutureTask<String> ft2;
 // T1 使命需求 T2 使命的 FutureTask
 T1Task(FutureTask<String> ft2){
  this.ft2 = ft2;
  }
 @Override
 String call() throws Exception {
  System.out.println("T1: 洗水壶...");
  TimeUnit.SECONDS.sleep(1);
  
  System.out.println("T1: 烧开水...");
  TimeUnit.SECONDS.sleep(15);
  // 获取 T2 线程的茶叶 
  String tf = ft2.get();
  System.out.println("T1: 拿到茶叶:"+tf);
​
  System.out.println("T1: 泡茶...");
  return " 上茶:" + tf;
  }
}
// T2Task 需求履行的使命:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
 @Override
 String call() throws Exception {
  System.out.println("T2: 洗茶壶...");
  TimeUnit.SECONDS.sleep(1);
​
  System.out.println("T2: 洗茶杯...");
  TimeUnit.SECONDS.sleep(2);
​
  System.out.println("T2: 拿茶叶...");
  TimeUnit.SECONDS.sleep(1);
  return " 龙井 ";
  }
}
// 一次履行成果:
//T1: 洗水壶...
//T2: 洗茶壶...
//T1: 烧开水...
//T2: 洗茶杯...
//T2: 拿茶叶...
//T1: 拿到茶叶: 龙井
//T1: 泡茶...
//上茶: 龙井