前语

  当咱们面临需求一起履行多个使命的状况时,往往需求消耗很多的时刻和精力来编写杂乱的并发代码。但是有一种技能能够帮助咱们轻松地处理这种状况。经过forkJoin,咱们能够简单地完成多个使命的并行履行,然后进步运用程序的功能和呼应才能。在本文中,咱们将深入探讨forkJoin的作业原理和运用办法。

分治

  fork-join形式是根据分治思维的并行计算形式之一。该形式将一个大的使命分割成多个小的子使命,然后并行履行这些子使命,最后将它们的成果兼并起来得到最终的成果。在这个过程中,每个子使命的履行能够进一步分解为更小的子使命,直到达到某个阈值,这时候使命将被串行履行。这种递归的分治思维使得fork-join形式能够有效地运用计算机的多核处理才能,然后进步程序的功能和效率。

分治思维算法

归并排序

归并排序是一种根据分治思维的排序算法。其中心思维是将一个数组分红两个子数组,别离对其进行排序后再将其兼并起来。具体完成过程如下:

  1. 分解:将一个数组分红两个子数组,别离对其进行排序。能够运用递归来完成这一过程。
  2. 兼并:将排序后的两个子数组兼并成一个有序的数组。
  3. 递归:对两个子数组递归进行分解和排序,直到每个子数组的长度为1。

时刻杂乱度为O(nlogn)。

分治思想之ForkJoin

public class Merge {
    public static void main(String[] args) {
        int[] arr = { 5, 2, 8, 4, 7, 1, 3, 6 };
        mergeSort(arr, 0, arr.length - 1);
        System.out.println(Arrays.toString(arr));
    }
    /**
     * 拆分
     * @param arr 数组
     * @param left 数组第一个元素
     * @param right 数组最后一个元素
     */
    public static void mergeSort(int[] arr,int left,int right){
        if (left < right) {
            int mid = (left + right) / 2;
            mergeSort(arr, left, mid);
            mergeSort(arr, mid + 1, right);
            merge(arr, left, mid, right);
        }
    }
    /**
     * 兼并
     * @param arr 数组
     * @param left 数组第一个元素
     * @param mid 数组分割的元素
     * @param right 数组最后一个元素
     */
    public static void merge(int[] arr,int left,int mid,int right){
        //创立临时数组,用于存放兼并后的数组
        int[] temp = new int[right - left + 1];
        //左面数组的开始指针
        int i = left;
        //右面数组的开始指针
        int j = mid + 1;
        //临时数组的下标
        int k = 0;
        //数组左面和数组右面都还有值就去遍历比较赋值
        while (i <= mid && j <= right) {
            //数组左面的值小于或者等于数组右面的值就把左面的值赋值到临时数组中
            //而且把左面的指针和临时数组的指针+1
            //否则相反
            if (arr[i] <= arr[j]) {
                temp[k] = arr[i];
                k++;
                i++;
            } else {
                temp[k] = arr[j];
                k++;
                j++;
            }
        }
        //把剩下数组塞进去
        while (i <= mid) {
            temp[k] = arr[i];
            k++;
            i++;
        }
        while (j <= right) {
            temp[k] = arr[j];
            k++;
            j++;
        }
        //讲临时数组中的元素复制会回原数组中
        for (int p = 0; p < temp.length; p++) {
            arr[left + p] = temp[p];
        }
    }
}

快速排序

快速排序(Quick Sort)是一种根据分治思维的排序算法,它采用了递归的方式将一个大的数组分解成多个子数组,别离进行排序后再将它们兼并起来。其基本思维是选取一个基准元素,将数组中小于该元素的值放在左面,大于该元素的值放在右边,然后递归地对左右两个子数组进行排序。具体的过程如下:

  1. 选取一个基准元素(通常是数组中的第一个元素)。
  2. 将数组中小于该元素的值放在左面,大于该元素的值放在右边。
  3. 对左右两个子数组别离递归进行快速排序。
  4. 兼并左右两个已排序的子数组。

快速排序的时刻杂乱度为O(nlogn),它是一种原地排序算法,不需求额外的存储空间,因此空间杂乱度为O(1)。尽管快速排序的最坏时刻杂乱度为O(n^2),但是在实际运用中,快速排序的平均时刻杂乱度和最好时刻杂乱度均为O(nlogn),因此是一种非常高效的排序算法

分治思想之ForkJoin

public class QuickSort {
    public static void main(String[] args) {
        int[] arr = { 5, 2, 8, 4, 7, 1, 3, 6 };
        quickSort(arr, 0, arr.length - 1);
        System.out.println(Arrays.toString(arr));
    }
    public static void quickSort(int[] arr,int left,int right){
        if(left<right){
            int pivotIndex  = partition(arr, left, right);
            quickSort(arr,left,pivotIndex-1);
            quickSort(arr,pivotIndex+1,right);
        }
    }
    public static int partition(int[] arr,int left,int right){
        //取第一个元素为基准元素
        int pivot = arr[left];
        int i = left+1;
        int j = right;
        while (i<=j){
            //当时指针方位数小于基准元素就持续移动指针直到遇到大的
            while (i<=j && arr[i] < pivot){
                i++;
            }
            //当时指针方位数大于基准元素就持续移动指针直到遇到小的
            while (i<=j && arr[j] > pivot){
                j--;
            }
            if(i<j){
                //交流元素方位
                swap(arr,i,j);
                i++;
                j--;
            }
        }
        //跳出循环说明i和j相遇,j的值一定是大于基准元素,要和基准元素进行交流
        swap(arr,left,j);
        //返回基准元素方位
        return j;
    }
    public static void swap(int[] array, int i, int j) {
        int temp = array[i];
        array[i] = array[j];
        array[j] = temp;
    }
}

Fork/Join

  Fork/Join框架的主要组成部分是ForkJoinPool、ForkJoinTask。ForkJoinPool是一个线程池,它用于办理ForkJoin使命的履行。ForkJoinTask是一个抽象类,用于表示能够被分割成更小部分的使命。

ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于办理Fork/Join使命的线程。ForkJoinPool类包含一些重要的办法,例如submit()、invoke()、shutdown()、awaitTermination()等,用于提交使命、履行使命、关闭线程池和等候使命的履行成果。ForkJoinPool类中还包含一些参数,例如线程池的巨细、作业线程的优先级、使命行列的容量等,能够根据具体的运用场景进行设置。

结构器

ForkJoinPool中有四个中心参数,用于控制线程池的并行数、作业线程的创立、反常处理和形式指定等。

  • int parallelism:指定并行等级(parallelism level)。ForkJoinPool将根据这个设定,决定作业线程的数量。如果未设置的话,将运用Runtime.getRuntime().availableProcessors()来设置并行等级;
  • ForkJoinWorkerThreadFactory factory:ForkJoinPool在创立线程时,会经过factory来创立。注意,这里需求完成的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默许的 DefaultForkJoinWorkerThreadFactory负责线程的创立作业;
  • UncaughtExceptionHandler handler:指定反常处理器,当使命在运行中犯错时,将由设定的处理器处理;
  • boolean asyncMode:设置行列的作业形式。当asyncMode为true时,将运用先进先出行列,而为false时则运用后进先出的形式。

作业窃取法

在Fork-Join形式中,使命被分配给一个线程池中的作业线程来履行。当一个作业线程履行完自己分配的使命后,它能够从其他作业线程的使命行列中偷取(Steal)使命来履行,这便是所谓的作业窃取(Work Stealing)。

运用

public class SumTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 1000;
    private int[] array;
    private int start;
    private int end;
    public SumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int mid = (start + end) / 2;
            SumTask leftTask = new SumTask(array, start, mid);
            SumTask rightTask = new SumTask(array, mid, end);
            leftTask.fork();
            rightTask.fork();
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            return leftResult + rightResult;
        }
    }
    public static void main(String[] args) {
        int[] array = new int[10000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1;
        }
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        SumTask task = new SumTask(array, 0, array.length);
        int result = forkJoinPool.invoke(task);
        System.out.println(result);
    }
}