第十八章 Fork/Join框架

    与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法

    fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。

    Fork/Join的运行流程大致如下所示:

    需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:

    通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。

    工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

    工作窃取流程如下图所示:

    工作窃取算法流程

    值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

    另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

    前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类来实现任务。

    ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

    fork()方法:使用线程池中的空闲线程异步提交任务

    1. // 本文所有代码都引自Java 8
    2. public final ForkJoinTask<V> fork() {
    3. Thread t;
    4. // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
    5. // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
    6. if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
    7. ((ForkJoinWorkerThread)t).workQueue.push(this);
    8. else
    9. // 如果不是则将线程加入队列
    10. // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
    11. ForkJoinPool.common.externalPush(this);
    12. return this;
    13. }

    其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里

    join()方法:等待处理任务的线程处理完毕,获得返回值。

    来看下join()的源码:

    1. public final V join() {
    2. int s;
    3. // doJoin()方法来获取当前任务的执行状态
    4. if ((s = doJoin() & DONE_MASK) != NORMAL)
    5. // 任务异常,抛出异常
    6. reportException(s);
    7. // 任务正常完成,获取返回值
    8. return getRawResult();
    9. }
    10. /**
    11. * doJoin()方法用来返回当前任务的执行状态
    12. **/
    13. private int doJoin() {
    14. int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    15. // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
    16. return (s = status) < 0 ? s :
    17. // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
    18. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
    19. // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
    20. // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
    21. // doExec()方法执行任务
    22. (w = (wt = (ForkJoinWorkerThread)t).workQueue).
    23. tryUnpush(this) && (s = doExec()) < 0 ? s :
    24. // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
    25. wt.pool.awaitJoin(w, this, 0L) :
    26. // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
    27. externalAwaitDone();
    28. }

    RecursiveAction和RecursiveTask

    通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveActionRecursiveTask

    两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask

    此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

    18.3.2 ForkJoinPool

    ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。

    ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

    我们来大致看下ForkJoinPool的源码:

    WorkQueue

    双端队列,ForkJoinTask存放在这里。

    当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

    ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

    runState

    ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。

    上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

    下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

    1. public class FibonacciTest {
    2. class Fibonacci extends RecursiveTask<Integer> {
    3. int n;
    4. public Fibonacci(int n) {
    5. this.n = n;
    6. }
    7. // 主要的实现逻辑都在compute()里
    8. @Override
    9. protected Integer compute() {
    10. // 这里先假设 n >= 0
    11. if (n <= 1) {
    12. return n;
    13. } else {
    14. // f(n-1)
    15. Fibonacci f1 = new Fibonacci(n - 1);
    16. f1.fork();
    17. // f(n-2)
    18. Fibonacci f2 = new Fibonacci(n - 2);
    19. f2.fork();
    20. // f(n) = f(n-1) + f(n-2)
    21. return f1.join() + f2.join();
    22. }
    23. }
    24. }
    25. @Test
    26. public void testFib() throws ExecutionException, InterruptedException {
    27. ForkJoinPool forkJoinPool = new ForkJoinPool();
    28. System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
    29. long start = System.currentTimeMillis();
    30. Fibonacci fibonacci = new Fibonacci(40);
    31. System.out.println(future.get());
    32. System.out.println(String.format("耗时:%d millis", end - start));
    33. }
    34. }

    上面例子在本机的输出:

    1. CPU核数:4
    2. 计算结果:102334155
    3. 耗时:9490 millis

    此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:

    普通递归的例子输出:

    1. 计算结果:102334155
    2. 耗时:436 millis

    通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。

    这里我们再用另一种思路来计算:

    1. // 通过循环来计算,复杂度为O(n)
    2. private int computeFibonacci(int n) {
    3. // 假设n >= 0
    4. if (n <= 1) {
    5. return n;
    6. } else {
    7. int first = 1;
    8. int second = 1;
    9. int third = 0;
    10. for (int i = 3; i <= n; i ++) {
    11. // 第三个数是前两个数之和
    12. third = first + second;
    13. // 前两个数右移
    14. first = second;
    15. second = third;
    16. }
    17. return third;
    18. }
    19. }
    20. @Test
    21. public void testComputeFibonacci() {
    22. long start = System.currentTimeMillis();
    23. int result = computeFibonacci(40);
    24. long end = System.currentTimeMillis();
    25. System.out.println("计算结果:" + result);
    26. }

    上面例子在笔者所用电脑的输出为:

    这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的,或者可以用System.nanoTime()统计纳秒的时间。

    为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

    如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

    另外,Java 8 Stream的并行操作底层就是用到了Fork/Join框架,下一章我们将从源码及案例两方面介绍Java 8 Stream的并行操作。


    参考资料