我眼中的并发编程——Fork/Join模型

简介

Fork/Join模型是ExecutorService的接口实现,可以帮助你利用多个处理器。它被设计用可以递归地分解成更小的任务,目的是所有可用的处理能力来提高应用程序性能,与分而治之思路类似。

与任何一个ExecutorService实现一样,Fork/Join模型将任务分配到线程池中的工作线程中。但Fork/Join框架与其他的区别是采用了工作窃取算法,工作线程任务完成后可能会从仍然忙碌的其他线程窃取任务。

Fork/Join模型的核心是ForkJoinPool,该类的扩展AbstractExecutorServiceForkJoinPool实现核心工作窃取算法,可以执行ForkJoinTask任务。

基本使用

使用Fork/Join模型第一步应该编写核心任务代码。大题逻辑如下:

if(我的任务足够小){
     直接工作
}else{
     任务划分成两份,
      执行并等待结果。
}

把这段代码封装到一个ForkJoinTask的子类中。通常做法是继承 RecursiveTask或 RecursiveAction

RecursiveTask:有返回值。
RecursiveAction:无返回值。

代码样例

 

  1. package com.itunic.concurrent;
  2. import java.util.concurrent.ExecutionException;
  3. import java.util.concurrent.ForkJoinPool;
  4. import java.util.concurrent.ForkJoinTask;
  5. import java.util.concurrent.RecursiveTask;
  6. public class CountTask extends RecursiveTask<Long> {
  7.     private static final long serialVersionUID = 1L;
  8.     private long start;
  9.     private long end;
  10.     // 区分任务颗粒度
  11.     private static final int THRESHOLD = 2;
  12.     public CountTask(long start, long end) {
  13.         this.start = start;
  14.         this.end = end;
  15.     }
  16.     @Override
  17.     protected Long compute() {
  18.         long sum = 0;
  19.         boolean canCompute = (end - start) <= THRESHOLD;
  20.         // 判断任务的颗粒度是否足够小
  21.         if (canCompute) {
  22.             for (long i = start; i < end; i++) {
  23.                 sum += i;
  24.             }
  25.         } else {
  26.             // 将数据切分
  27.             long middle = (start + end) / 2;
  28.             CountTask task1 = new CountTask(start, middle);
  29.             CountTask task2 = new CountTask(middle, end);
  30.             // 发起两个线程任务
  31.             invokeAll(task1, task2);
  32.             // 等待线程返回结果
  33.             long result1 = task1.join();
  34.             long result2 = task2.join();
  35.             sum = result1 + result2;
  36.         }
  37.         return sum;
  38.     }
  39.     public static void main(String[] args) throws InterruptedException, ExecutionException {
  40.         ForkJoinPool pool = new ForkJoinPool();
  41.         ForkJoinTask<Long> future = pool.submit(new CountTask(110));
  42.         System.out.printf("统计结果为:%s",future.get());
  43.     }
  44. }

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2head3ycz2qss

  • 我眼中的并发编程——Fork/Join模型已关闭评论
  • 222 views
  • A+
所属分类:Java
avatar