Fork-Join框架

在JDK1.7引入了一种新的并行编程模式“fork-join”,它是实现了“分而治之”思想的Java并发编程框架。网上关于此框架的各种介绍很多,本文从框架特点出发,通过几个例子来进行实用性的介绍。

1 fork-join框架的特点

fork-join框架对新手来说很难理解,因此先从它的特点说起,它有几个特点:

  • 它对问题的解决思路是分而治之,先将一个问题fork(分为)几个子问题,然后子问题又分为孙子问题,直至细分为一个容易计算的问题,然后再将结果依次join(结合)为最终的答案.是不是感觉和云计算中的Map-reduce计算模型很像?思路是一样的,只不过fork-join运行在一个JVM中的多个线程内,而map-reduce运行在分布式计算节点上
  • 在运行线程时,它使用“work-steal”(任务偷取)算法.一般来说,fork-join会启动多个线程(由参数指定,若不指定则默认为CPU核心数量),每个线程负责一个任务队列,并依次从队列头部获得任务并执行.当某个线程空闲时,它会从其他线程的任务队列尾部偷取一个任务来执行,这样就保证了线程的运行效率达到最高。
  • 它面向的问题域是可以大量并行执行的计算任务,例如计算某个大型数组中每个元素的平方(当然这个有些无趣),其计算对象最好是一些独立的元素,不会被其他线程访问,也没有同步、互斥要求,更不要涉及IO或者无限循环.当然此框架也可以执行普通的并发编程任务,但是这时就失去了性能优势
  • 细分的计算任务有一个粗略的优化标准,即可以在100~10000条指令中执行完毕

了解以上思路后,来看看fork-join框架提供的几个工具类:

  • ForkJoinPool:支持fork-join框架的线程池,所有ForkJoinTask任务都必须在其中运行,线程池主要使用invoke()、invokeAll()等方法来执行任务,当然也可以使用原有的execute()和submit()方法;
  • ForkJoinTask:支持fork-join框架的任务抽象类,它是Future接口,它代表一个支持fork()和join()方法的任务;
  • RecursiveAction:ForkJoinTask的两个具体子类之一,代表没有返回值的ForkJoinTask任务; RecursiveTask:ForkJoinTask的两个具体子类之一,代表有返回值的ForkJoinTask任务。

2 RecursiveAction

先来看一个使用RecursiveAction的例子,这段代码的目的是计算一个大型数组中每个元素x的一个公式的值,这个公式是sin(x)+cos(x)+tan(x):

public class RecursiveActionExam {
    private final static int NUMBER = 10000000;

    public static void main(String[] args) {
        double[] array = new double[NUMBER];
        for (int i = 0; i < NUMBER; i++) {
            array[i] = i;
        }
        long startTime = System.currentTimeMillis();
        System.out.println(Runtime.getRuntime().availableProcessors());
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.invoke(new ComputeTask(array, 0, array.length));
        long endTime = System.currentTimeMillis();
        System.out.println("Time span = " + (endTime - startTime));
    }
}

class ComputeTask extends RecursiveAction {
    final double[] array;
    final int lo, hi;

    ComputeTask(double[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }

    protected void compute() {
        if (hi - lo < 2) {
            for (int i = lo; i < hi; ++i)
                array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
        } else {
            int mid = (lo + hi) >>> 1;
            invokeAll(new ComputeTask(array, lo, mid),
                    new ComputeTask(array, mid, hi));
        }
    }
}

再看看单线程的情况:

public class RecursiveSequenceExam {
    private final static int NUMBER = 10000000;

    public static void main(String[] args) {
        double[] array = new double[NUMBER];
        for (int i = 0; i < NUMBER; i++) {
            array[i] = i;
        }

        long startTime = System.currentTimeMillis();
        for (int i = 0; i < NUMBER; i++) {
            array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Time span = " + (endTime - startTime));
    }
}

运行结果是Time span = 12030。

由于我的CPU是4核的,再看看4线程的情况:

public class Recusive4ThreadExam {
    private final static int NUMBER = 10000000;

    public static void main(String[] args) throws InterruptedException {
        double[] array = new double[NUMBER];
        for (int i = 0; i < NUMBER; i++) {
            array[i] = i;
        }

        long startTime = System.currentTimeMillis();
        ExecutorService service = Executors.newFixedThreadPool(4);
        service.execute(new ArrayTask(array, 0, NUMBER / 4));
        service.execute(new ArrayTask(array, NUMBER / 4, NUMBER / 2));
        service.execute(new ArrayTask(array, NUMBER / 2, NUMBER*3 / 4));
        service.execute(new ArrayTask(array, NUMBER*3 / 4, NUMBER ));
        service.shutdown();
        service.awaitTermination(1,TimeUnit.DAYS);
        long endTime = System.currentTimeMillis();
        System.out.println("Time span = " + (endTime - startTime));

    }
}

class ArrayTask implements Runnable {
    final double[] array;
    final int lo, hi;

    ArrayTask(double[] array, int lo, int hi) {
        this.array = array;
        this.lo = lo;
        this.hi = hi;
    }

    @Override
    public void run() {
        for (int i = lo; i < hi; ++i)
            array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
    }
}

运行结果是Time span = 4064。可以看出由于fork-join框架采用了任务偷取算法,比普通4线程快了一点点。

3 RecursiveTask

下面来看一个更有意义的场景,寻找一个大型数组的最小值,这里我使用RecursiveTask(其实使用RecursiveAction也行,在它内部用一个成员变量保存结果即可)。代码如下:

public class RecursiveFindMax {
    private static Random rand = new Random(47);
    private static final int NUMBER = 1000000;

    public static void main(String[] args) {
        double[] array = new double[NUMBER];
        for (int i = 0; i < NUMBER; i++) {
            array[i] = rand.nextDouble();
        }

        ForkJoinPool pool = new ForkJoinPool();
        TaskFindMax task = new TaskFindMax(0, array.length - 1, array);
        pool.invoke(task);
        System.out.println("MaxValue = " + task.join());
    }
}

class TaskFindMax extends RecursiveTask<Double> {
    private final int lo;
    private final int hi;
    private final double[] array;
    //you can change THRESHOLD to get better efficiency
    private final static int THRESHOLD = 10;

    TaskFindMax(int lo, int hi, double[] array) {
        this.lo = lo;
        this.hi = hi;
        this.array = array;
    }

    @Override
    protected Double compute() {
        if ((hi - lo) < THRESHOLD) {
            double max = array[lo];
            for (int i = lo; i < hi; i++) {
                max = Math.max(max, array[i + 1]);
            }
            return max;
        } else {
            int mid = (lo + hi) >>> 1;
            TaskFindMax lhs = new TaskFindMax(lo, mid, array);
            TaskFindMax rhs = new TaskFindMax(mid, hi, array);
            invokeAll(lhs, rhs);
            return Math.max(lhs.join(), rhs.join());
        }
    }
}

pool.invoke(task)将一个最初的任务扔进了线程池执行,这个任务将会执行它的compute()方法。在此方法中,若满足某个条件(例如数组上界和下界只差小于阈值THRESHOLD)则直接在这一段数组中查找最大值;若不满足条件,则找出中值mid,然后new出两个子任务lhs(left hand side)和rhs(right hand side),并调用invokeAll(lhs, rhs)将这两个子任务都扔进线程池执行。任务的join()方法会得到返回值,若任务尚未执行完毕则会在此阻塞。 通过这种编程模式,很好的将递归思想用到了多线程领域。值得注意的是,通过调整THRESHOLD可以增加或减少任务的个数,从而极大的影响线程的执行。在很多情况下,使用fork-join框架并不会比普通的多线程效率更高,甚至比单线程运行效率更低。因此,必须找到适合的场景,然后进行多次调优,才能获得性能的改进。

小结

执行者与线程池的引入是因为Concurrency包的设计者想将线程的创建、执行和调度分离,从而使得用户能够更加专注于业务逻辑;Callable接口和Future接口使得异步执行结果的获取更加简单;ScheduledExecutorService取代Timer成为了线程重复和延迟执行的新标准;TimeUnit类的引入简化了时间段的表达工作;包中提供的五种线程池可以极大的满足程序员的各种需求,极端情况下还可以利用ThreadPoolExecutor类自己定制线程池。最后,从JDK1.7后引入的Fork-Join框架将“分而治之”的递归思想实现到线程池中,并应用“work-steal”算法实现了任务执行效率的提升

原文地址:https://cloud.tencent.com/developer/article/2178543

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340