简介
Fork/Join
模型是ExecutorService
的接口实现,可以帮助你利用多个处理器。它被设计用可以递归地分解成更小的任务,目的是所有可用的处理能力来提高应用程序性能,与分而治之思路类似。
与任何一个ExecutorService
实现一样,Fork/Join
模型将任务分配到线程池中的工作线程中。但Fork/Join
框架与其他的区别是采用了工作窃取算法,工作线程任务完成后可能会从仍然忙碌的其他线程窃取任务。
Fork/Join
模型的核心是ForkJoinPool
,该类的扩展AbstractExecutorService
。ForkJoinPool
实现核心工作窃取算法,可以执行ForkJoinTask
任务。
基本使用
使用Fork/Join模型第一步应该编写核心任务代码。大题逻辑如下:
if(我的任务足够小){ 直接工作 }else{ 任务划分成两份, 执行并等待结果。 }
把这段代码封装到一个ForkJoinTask
的子类中。通常做法是继承 RecursiveTask
或 RecursiveAction
。
RecursiveTask:有返回值。
RecursiveAction:无返回值。
代码样例
- package com.itunic.concurrent;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.ForkJoinTask;
- import java.util.concurrent.RecursiveTask;
- public class CountTask extends RecursiveTask<Long> {
- private static final long serialVersionUID = 1L;
- private long start;
- private long end;
- // 区分任务颗粒度
- private static final int THRESHOLD = 2;
- public CountTask(long start, long end) {
- this.start = start;
- this.end = end;
- }
- @Override
- protected Long compute() {
- long sum = 0;
- boolean canCompute = (end - start) <= THRESHOLD;
- // 判断任务的颗粒度是否足够小
- if (canCompute) {
- for (long i = start; i < end; i++) {
- sum += i;
- }
- } else {
- // 将数据切分
- long middle = (start + end) / 2;
- CountTask task1 = new CountTask(start, middle);
- CountTask task2 = new CountTask(middle, end);
- // 发起两个线程任务
- invokeAll(task1, task2);
- // 等待线程返回结果
- long result1 = task1.join();
- long result2 = task2.join();
- sum = result1 + result2;
- }
- return sum;
- }
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- ForkJoinPool pool = new ForkJoinPool();
- ForkJoinTask<Long> future = pool.submit(new CountTask(1, 10));
- System.out.printf("统计结果为:%s",future.get());
- }
- }
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2head3ycz2qss