Java~ForkJoinPool + parallelStream实现并行快速处理数据流_Listen-Y(学习&踩坑笔记本)的博客-程序员秘密_forkjoinpool parallelstream

技术标签: Java  算法  java  多线程  队列  并发编程  

ForkJoinPool

说起ForkJoinPool先说Fork/Join框架

我们通过Fork和Join这两个单词来理解一下Fork/Join框架。Fork就是把一个大任务切分 为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结 果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和, 最终汇总这10个子任务的结果
在这里插入图片描述
每个线程在fork分割的子任务的时候就会把子任务放在自己的线程安全的阻塞双端队列里,然后线程分别从双端队列的队头里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

把任务放到双端队列中就是为了实现工作窃取算法是, 指某个线程从其他队列里窃取任务来执行。那么,为什么 需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干 互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个 队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A 队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有 任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列 里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被 窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿 任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

这个算法的优点就是提高了效率
缺点就是有了多余的消耗, 比如只有一个任务的时候, 一个线程本来可以结束了但是还得去额外的去其他线程看一看去问候一下

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。

ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。

ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的, 所以在使用的时候还是要注意一下。

构造函数

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
    
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
  • parallelism:并行度,默认为CPU数,最小为1
  • factory:工作线程工厂;
  • handler:处理工作线程运行任务时的异常情况类,默认为null;
  • asyncMode:是否为异步模式,默认为 false。如果为true,表示子任务的执行遵循 FIFO 顺序并且任务不能被合并(join),这种模式适用于工作线程只运行事件类型的异步任务。

在多数场景使用时,如果没有太强的业务需求,我们一般直接使用 ForkJoinPool 中的common池,在JDK1.8之后提供了ForkJoinPool.commonPool()方法可以直接使用common池,来看一下它的构造:

commonPool()

private static ForkJoinPool makeCommonPool() {
    
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {
      // ignore exceptions in accessing/parsing
        String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
        String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");//线程工厂
        String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//异常处理类
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
                    getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler) ClassLoader.
                    getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    
    }
    if (factory == null) {
    
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;//默认并行度为1
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
}

使用common pool的优点就是我们可以通过指定系统参数的方式定义“并行度、线程工厂和异常处理类”;并且它使用的是同步模式,也就是说可以支持任务合并(join)。

invoke、execute和submit区别

使用ForkJoinPool的时候发现执行任务的方法有:

invoke(ForkJoinTask task)
execute(ForkJoinTask<?> task)
submit(ForkJoinTask task)

submit 和 execute 跟invoke的区别是 externalPush(task);以后没有task.join

这个join方法调用的作用是使主线程挂起等候task结果。

execute(ForkJoinTask) 异步执行tasks,无返回值
invoke(ForkJoinTask) 有Join会使主线程挂起等待task的结果, tasks会被同步到主进程
submit(ForkJoinTask) 异步执行,直接返回task对象,可通过task.get/join 阻塞主线程然后将结果同步到主线程

parallelStream

java8除了新增stream,还提供了parallel stream-多线程版的stream,parallel stream的优势是:充分利用多线程,提高程序运行效率,但是正确的使用并不简单,盲目使用可能导致以下后果

  1. 效率不增反降
  2. 增加额外的复杂度,程序更易出错

效率不增反降
parallel stream是基于fork/join框架的,简单点说就是使用多线程来完成的,使用parallel stream时要考虑初始化fork/join框架的时间,也就是要有初始化线程的时间,如果要执行的任务很简单,那么初始化fork/join框架的时间会远多于执行任务所需时间,也就导致了效率的降低. 根据附录doug Lee的说明,任务数量*执行方法的行数>=10000或者执行的是消耗大量时间操作(如io/数据库)才有必要使用

增加额外的复杂度,程序更易出错
会有多线程安全问题

实现快速处理数据流

    public static void main(String[] args) throws InterruptedException {
    
        List<Integer> ids = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
    
            ids.add(i);
        }

        ForkJoinPool pool = new ForkJoinPool(10);
        List<String> result = new ArrayList<>();
        pool.submit(() -> ids.parallelStream().forEach(id -> {
    
            id += 1;
            result.add(String.valueOf(id));
        })).join();

        Thread.sleep(1000);
        pool.shutdown();
        System.out.println(result.size());
    }

但是上面的代码是有问题的, 也就是多线程问题

因为我使用的是一个普通的arrayList, 如果在多线程下会有线程安全问题, 也就是数据丢失问题
在这里插入图片描述

解决办法

  1. 使用安全的ArrayList, 比如下面这三个
        List<String> list = Collections.synchronizedList(new ArrayList<>());
        List<String> list1 = new CopyOnWriteArrayList<>();
        List<String> list2 = new Vector<>();
  1. 将写入链表的操作变成同步代码块
    public static void main(String[] args) throws InterruptedException {
    

        List<String> list = Collections.synchronizedList(new ArrayList<>());
        List<String> list1 = new CopyOnWriteArrayList<>();
        List<String> list2 = new Vector<>();

        List<Integer> ids = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
    
            ids.add(i);
        }

        ForkJoinPool pool = new ForkJoinPool(10);
        List<String> result = new ArrayList<>();
        pool.submit(() -> ids.parallelStream().forEach(id -> {
    
            id += 1;
            synchronized (pool) {
    
                result.add(String.valueOf(id));
            }
        })).join();

        Thread.sleep(1000);
        pool.shutdown();
        System.out.println(result.size());
    }
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Shangxingya/article/details/114682297

智能推荐

Linux查询内存使用情况,查询磁盘使用情况,查看文件及文件夹大小_linux查询文件夹可用内存大小_独孤飞磊的博客-程序员秘密

查询磁盘使用情况,查看文件及文件夹大小命令内容df -h查看当前系统磁盘使用空间du -sh *查看当前目录文件占用空间大小du -h [目录名]返回该目录所有文件的大小du -h ./test/a.txt查看某文件大小du -sh [目录名]返回该目录的大小查询内存使用情况命令内容free -h查看内存使用情况...

把数据转换成json格式_dinner1984的博客-程序员秘密

@RequestMapping(value="/getTree", method=RequestMethod.POST)    public void getTree(HttpServletRequest request,HttpServletResponse response) throws IOException{          String s1 = "{id:1, pId:0,

【学习笔记】opencv之canny算子_canny算子生成白底图像_tyu5658的博客-程序员秘密

canny是边缘检测的一个算法,最重要的特点就是其将独立边的候选像素拼接成轮廓。轮廓的行程是对这些像素运用滞后性阈值,分为上下阈值,大于上限阈值则被认为是边界,小于下线阈值则被抛弃,介于两者之间时,只有当其与高于上限阈值的像素连接时才会被接受。利用opencv时,cvCanny的形式如下:void cvCanny( const CvArr*img,CvArr*edges,doubl

jmeter新手入门(附jdk安装教程)_jmeter需要安装jdk吗_慧乐课堂软件测试的博客-程序员秘密

一.前言这里总结了jmeter完整的安装教程,附上新手使用教程。二.Jmeter安装jdk安装(jmeter运行所需环境)jmeter运行需要java环境,如果没事先安装jdk,启动jmeter会出现“Not able to find Java executable or version. Please check your Java installation.”的错误。Jdk下载地址:...

python版本学生成绩管理系统_python学生成绩管理系统pandas查询学生_staryer-chen的博客-程序员秘密

#放在外面,用来存储学生账号,学号,学生姓名和成绩等#学生端:用于存储学生的账号和密码,只有匹配正确才能进入学生端xueshengzhanghao = ['12345']xueshengmima = ['12345']c = ['陈思宇'] # 用于存储学生信息d = ['100']#用处存储学生成绩#教师端:用于存储教师的账号和密码,只有匹配正确才能进入教师端jiaoshizh...

学习英语的一些网站_jason_hisoft的博客-程序员秘密

梁睿英语http://www.lrenglish.com/梁睿英语论坛http://www.lrenglish.com/bbs雅思网站专题http://211.147.1.40雅思的官方网站http://www.ielts.org/雅思考试网东西不多http://www.ieltsnet.net/index.htm关于雅思的一些资料http://www.rotolife.com/cgi-bin/n

随便推点

Glide 3.7.0 使用随笔_glide3.7.0使用_元古宙新手的博客-程序员秘密

使用Glide的时候,可以搭配jp.wasabeef:glide-transformations:2.0.2 伴侣口感会更好----------------------------------------------------------------------------------------------------------------------------------

mongodb查询某个字段数据_牛奔的博客-程序员秘密

如下db.集合名.find( {}, {需要查询的字段:1, _id:0} )例如db.userInfo.find({}, {'created_at':1, _id: 0})默认会显示 _id1 表示显示此字段0 表示不显示此字段

layui使用tips_layer ui插件显示tips时,修改字体颜色的实现方法_舞蝶迷香径的博客-程序员秘密

今天做调查问卷,又遇到一个蛋疼小问题,记录下。调查问卷有很多选项是要求必填的,如果不填的话,需要给出友好的提示。用的如下组件:http://layer.layui.com/1、之前一直默认用的:function showMessage(msg, domObj) {layer.tips(msg, domObj,{tips:3});//弹出框加回调函数}showMessage("选项不能为空", $(...

八小球中一个偏重问题的讨论_8个小球 一个偏重_叫我靖迪的博客-程序员秘密

问题描述:8个球7个一样重的,有一个偏重,一个天平,如何两次找出偏重的小球问题分析:如果最后是两个小球比较,那么天平最重的那个是重的小球如果最后是三个小球比较(假设两个一样的为a1,a2,比较重的是b)             2.1当b小球在天平两端时候,重的能被比较出来.          2.2当b小球没在天平上的时候,那么a1==a2,所以剩下的是最重的问题解决方法:   1.天平左右分别...

RadioButton的字体和位置的改变_dingcheng998的博客-程序员秘密

<RadioButton android:text="其他" android:button="@null" //取消原来图标 android:layout_width="match_parent" android:layout_height="48dp" android:drawableRight="@and

省级面板数据(2000-2019)四:固定资产投资+房地产(stata版)_理想主义的百年孤独的博客-程序员秘密

省级面板数据(2000-2019)四:固定资产投资+房地产(stata版)下载链接:省级面板数据(2000-2019)四:固定资产+房地产一、全社会固定资产投资全社会固定资产投资(亿元)城镇固定资产投资(亿元)房地产开发投资(亿元)二、按登记注册类型分全社会固定资产投资内资企业全社会固定资产投资(亿元)国有全社会固定资产投资(亿元)集体全社会固定资产投资(亿元)股份合作全社会固定资产投资(亿元)港、澳、台商投资全社会固定资产投资(亿元)外商投资全社会固定资产投资(亿元)三、按资

推荐文章

热门文章

相关标签