本文正在参与「金石方案」

什么是Fork/Join

Fork/Join结构是一个完结了ExecutorService接口的多线程处理器,它专为那些能够经过递归分解成更细小的使命而规划,最大化的运用多核处理器来进步应用程序的功能。

与其他ExecutorService相关的完结相同的是,Fork/Join结构会将使命分配给线程池中的线程。而与之不同的是,Fork/Join结构在履行使命时运用了作业盗取算法

fork在英文里有分叉的意思,join在英文里连接、结合的意思。望文生义,fork便是要使一个大使命分解成若干个小使命,而join便是最后将各个小使命的成果结合起来得到大使命的成果。

Fork/Join的运转流程大致如下所示:

聊聊Fork/Join并发框架

需求留意的是,图里的次级子使命能够一向分下去,一向分到子使命满足小停止。用伪代码来表示如下:

solve(使命):
  if(使命现已划分到满足小):
    顺序履行使命
  else:
    for(划分使命得到子使命)
      solve(子使命)
    结合一切子使命的成果到上一层循环
    return 终究结合的成果

经过上面伪代码能够看出,咱们经过递归嵌套的核算得到终究成果,这儿有体现分而治之(divide and conquer) 的算法思想。

作业盗取算法

作业盗取算法指的是在多线程履行不同使命行列的过程中,某个线程履行完自己行列的使命后从其他线程的使命行列里盗取使命来履行。

作业盗取流程如下图所示:

聊聊Fork/Join并发框架

值得留意的是,当一个线程盗取另一个线程的时分,为了削减两个使命线程之间的竞争,咱们一般运用双端行列来存储使命。被盗取的使命线程都从双端行列的头部拿使命履行,而盗取其他使命的线程从双端行列的尾部履行使命。

另外,当一个线程在盗取使命时要是没有其他可用的使命了,这个线程会进入阻塞状况以等候再次“作业”。

Fork/Join的具体完结

Fork/Join结构简单来讲便是对使命的分割与子使命的合并,所以要完结这个结构,先得有使命

在Fork/Join结构里供给了抽象类ForkJoinTask来完结使命。

ForkJoinTask

ForkJoinTask是一个相似一般线程的实体,可是比一般线程轻量得多。

fork()办法

其实fork()只做了一件事,那便是把使命推入当时作业线程的作业行列里

来看下fork()源码

public final ForkJoinTask<V> fork() {
  Thread t;
  // ForkJoinWorkerThread是履行ForkJoinTask的专有线程,由ForkJoinPool办理
  // 先判别当时线程是否是ForkJoin专有线程,假如是,则将使命push到当时线程所负责的行列里去
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
     ((ForkJoinWorkerThread)t).workQueue.push(this);
  else
     // 假如不是则将线程加入行列
    // 没有显式创建ForkJoinPool的时分走这儿,提交使命到默许的common线程池中
    ForkJoinPool.common.externalPush(this);
  return this;
}

join()办法

Join() 的主要作用是阻塞当时线程并等候获取成果。

来看下join()的源码:

public final V join() {
  int s;
  // doJoin()办法来获取当时使命的履行状况
  if ((s = doJoin() & DONE_MASK) != NORMAL)
    // 使命反常,抛出反常
    reportException(s);
  // 使命正常完结,获取回来值
  return getRawResult();
}
/**
 * doJoin()办法用来回来当时使命的履行状况
 **/
private int doJoin() {
  int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
  // 先判别使命是否履行结束,履行结束直接回来成果(履行状况)
  return (s = status) < 0 ? s :
  // 假如没有履行结束,先判别是否是ForkJoinWorkThread线程
   ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    // 假如是,先判别使命是否处于作业行列顶端(意味着下一个就履行它)
    // tryUnpush()办法判别使命是否处于当时作业行列顶端,是回来true
    // doExec()办法履行使命
     (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    // 假如是处于顶端而且使命履行结束,回来成果
    tryUnpush(this) && (s = doExec()) < 0 ? s :
    // 假如不在顶端或许在顶端却没未履行结束,那就调用awitJoin()履行使命
    // awaitJoin():运用自旋使使命履行完结,回来成果
    wt.pool.awaitJoin(w, this, 0L) :
  // 假如不是ForkJoinWorkThread线程,履行externalAwaitDone()回来使命成果
  externalAwaitDone();
}

下面是ForkJoinPool.join()的流程图:

聊聊Fork/Join并发框架

一般情况下咱们不需求直接继承 ForkJoinTask 类,而只需求继承它的子类,Fork/Join 结构供给了以下两个子类:

  • RecursiveAction:用于没有回来成果的使命。
  • RecursiveTask :用于有回来成果的使命。

ForkJoinPool

ForkJoinPool是用于履行ForkJoinTask使命的履行(线程)池。

ForkJoinPool办理着履行池中的线程和使命行列,此外,履行池是否还接受使命,显现线程的运转状况也是在这儿处理。

ForkJoinPool的源码如下:

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
  // 使命行列
  volatile WorkQueue[] workQueues;  
​
  // 线程的运转状况
  volatile int runState; 
​
  // 创建ForkJoinWorkerThread的默许工厂,能够经过结构函数重写
  public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
​
  // 公用的线程池,其运转状况不受shutdown()和shutdownNow()的影响
  static final ForkJoinPool common;
​
  // 私有结构办法,没有任何安全查看和参数校验,由makeCommonPool直接调用
  // 其他结构办法都是源自于此办法
  // parallelism: 并行度,
  // 默许调用java.lang.Runtime.availableProcessors() 办法回来可用处理器的数量
  private ForkJoinPool(int parallelism,
             ForkJoinWorkerThreadFactory factory, // 作业线程工厂
             UncaughtExceptionHandler handler, // 回绝使命的handler
             int mode, // 同步模式
             String workerNamePrefix) { // 线程名prefix
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
   }
}

WorkQueue

双端行列,ForkJoinTask寄存在这儿。

当作业线程在处理自己的作业行列时,会从行列首取使命来履行(FIFO);假如是盗取其他行列的使命时,盗取的使命坐落所属使命行列的队尾(LIFO)。

ForkJoinPool与传统线程池最明显的区别便是它保护了一个作业行列数组

runState

ForkJoinPool的运转状况。

Fork/Join的反常处理

ForkJoinTask 在履行的时分可能会抛出反常,可是咱们没办法在主线程里直接捕获反常,所以 ForkJoinTask 供给了 isCompletedAbnormally() 办法来查看使命是否现已抛出反常或现已被取消了,而且能够经过 ForkJoinTask 的 getException 办法获取反常。运用如下代码:

if(task.isCompletedAbnormally()){
   System.out.println(task.getException());
}

getException 办法回来 Throwable 对象,假如使命被取消了则回来 CancellationException。假如使命没有完结或许没有抛出反常则回来 null。

Fork/Join的运用

上面咱们说ForkJoinPool负责办理线程和使命,ForkJoinTask完结fork和join操作,所以要运用Fork/Join结构就离不开这两个类了,只是在实际开发中咱们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来代替ForkJoinTask。

下面咱们用一个核算斐波那契数列第n项的比方来看一下Fork/Join的运用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89

假如设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {
​
 static class Fibonacci extends RecursiveTask<Integer> {
​
    int n;
​
    public Fibonacci(int n) {
      this.n = n;
     }
​
    // 主要的完结逻辑都在compute()里
    @Override
    protected Integer compute() {
      // 这儿先假定 n >= 0
      if (n <= 1) {
        return n;
       } else {
        // f(n-1)
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        // f(n-2)
        Fibonacci f2 = new Fibonacci(n - 2);
        f2.fork();
        // f(n) = f(n-1) + f(n-2)
        return f1.join() + f2.join();
       }
     }
   }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

上面比方在本机的输出:

CPU核数:6
102334155
耗时:5222 millis

需求留意的是,上述核算时刻复杂度为O(2^n),跟着n的增加核算功率会越来越低,这也是上面的比方中n不敢取太大的原因。

此外,也并不是一切的使命都适合Fork/Join结构,比方上面的比方使命划分过于细小反而体现不出功率,下面咱们试试用一般的递归来求f(n)的值,看看是不是要比运用Fork/Join快:

// 一般递归,复杂度为O(2^n)
public int plainRecursion(int n) {
  if (n == 1 || n == 2) {
    return 1;
   } else {
    return plainRecursion(n -1) + plainRecursion(n - 2);
   }
}
​
@Test
public void testPlain() {
  long start = System.currentTimeMillis();
  int result = plainRecursion(40);
  long end = System.currentTimeMillis();
  System.out.println("核算成果:" + result);
  System.out.println(String.format("耗时:%d millis", end -start));
}

一般递归的比方输出:

核算成果:102334155
耗时:436 millis

经过输出能够很明显的看出来,运用一般递归的功率都要比运用Fork/Join结构要高许多。

这儿咱们再用另一种思路来核算:

// 经过循环来核算,复杂度为O(n)
private static int computeFibonacci(int n) {
    // 假定n >= 0
    if (n <= 1) {
        return n;
    } else {
        int first = 1;
        int second = 1;
        int third = 0;
        for (int i = 3; i <= n; i ++) {
            // 第三个数是前两个数之和
            third = first + second;
            // 前两个数右移
            first = second;
            second = third;
        }
        return third;
    }
}
public static void main(String[] args) {
    long start = System.currentTimeMillis();
    int result = computeFibonacci(40);
    long end = System.currentTimeMillis();
    System.out.println("核算成果:" + result);
    System.out.println(String.format("耗时:%d millis",  end -start));
}

上面比方在笔者所用电脑的输出为:

核算成果:102334155
耗时:0 millis

这儿耗时为0不代表没有耗时,是标明这儿核算的耗时简直能够忽略不计,咱们能够在自己的电脑试试,即使是n取大许多量级的数据(留意int溢出的问题)耗时也是很短的。

为什么在这儿一般的递归或循环功率更快呢?因为Fork/Join是运用多个线程协作来核算的,所以会有线程通讯和线程切换的开支。

假如要核算的使命比较简单(比方咱们事例中的斐波那契数列),那当然是直接运用单线程会更快一些。但假如要核算的东西比较复杂,核算机又是多核的情况下,就能够充分运用多核CPU来进步核算速度。