前言

“我正在参与「启航计划」”

作者简介: 不愿过江东丶,一个来自二线城市的程序员,致力于用“猥琐”办法解决繁琐问题,让杂乱的问题变得通俗易懂。

支撑作者: 点赞、重视、留言~

最近大聪明一直在开发项目中的新需求,其中有一个需求是“解析文件(.txt文件,一行便是一条数据)中的数据并进行入库操作”。其实这个需求也很简略,无非便是将文件中每一行数据转换为一个目标,将每一个目标都存储到 list 集合中,终究履行批量入库的操作。但便是这么一个简略的需求却让我踩了一个大坑….

踩坑日记

大聪明教你学Java | parallelStream().forEach() 的踩坑日记
各位小伙伴先看一下上图中的代码,不知道各位小伙伴有没有看出什么问题呢? 或许这么看起来有些不好了解,咱们再简化一下图中的代码,如下所示:

public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        System.out.println("a:"+list.size());
        List<Integer> streamList = new ArrayList<>();
        list.parallelStream().forEach(streamList::add);
        System.out.println("b:"+streamList.size());
    }
}

各位小伙伴看看简化后的代码,猜想一下 a 和 b 输出的值分别是多少呢?这儿就不卖关子了,咱们直接揭晓答案

大聪明教你学Java | parallelStream().forEach() 的踩坑日记
成果或许和大多数小伙伴猜想的都不太相同,a 和 b 的值居然不相等,且 b 的值 永远都会小于 a,一起在多次履行之后或许会出现数组下标越界异常,明显这儿的代码是不符合逻辑的

这也是我在项目中遇倒的问题所在,解析完文件后,经过 parallelStream().forEach() 遍历成果进行处理,可是终究入库的数据条数总是小于文件中的数据条数。

这种状况大聪明仍是第一次见到,不过却又一次激起了大聪明的求知欲,开启了寻根究底模式~

寻根究底

经过大聪明的一番探究,也是总算找到了问题答案…

Stream(流)是 JDK8 中引入的一种相似与迭代器(Iterator)的单向迭代访问数据的东西。ParallelStream 则是并行的流,它经过 Fork/Join 结构来拆分使命,加速流的处理过程。Fork/Join 的结构是经过把一个大使命不断 fork 成许多子使命,然后多线程履行这些子使命,最终再 Join 这些子使命得到终究成果。咱们回到实例代码中来解释一下,便是先将 list 集合 fork 成多段,然后多线程增加到 streamList 的结合中,而 streamList 是ArrayList 类型,ArrayList 的 add() 办法并不能确保原子性。

咱们先看一下 ArrayList 中 add() 办法的源码

大聪明教你学Java | parallelStream().forEach() 的踩坑日记
众所周知,ArrayList 作为 Collection 中极重要的一员,对错线程安全的,所以 ArrayList 并不合适多线程高并发的状况,在多线程高并发时会出现内部某些方位为 null 的状况。核心原因是,ArrayList 的add() 的办法不是线程安全的,对错原子性的,add操作能够简略了解为两个过程:

  • ensureCapacityInternal(size + 1) :确认当时 ArrayList 中的数组是否还能够加入新的元素。假如不可,就会再申请一个:int newCapacity = oldCapacity + (oldCapacity >> 1) 大小的数组(即容量变为原来的 1.5 倍),然后将数据复制过去。
  • elementData[size++] = e:将元素增加到 elementData 数组中。

那么在多线程高并发状况下,假如有A、B两个线程一起履行 add() 办法,在第一步校验数组容量时,A、B线程都发现当时无需扩容,还能够持续增加一个元素;因此A、B线程都进入了第二步,此刻,A线程先履行完,数组容量已满,然后B线程再对 elementData 赋值时,就会出现咱们上面提到的状况,要么是数据丢掉,要么是抛出数组下标越界的异常。

解决计划

问题原因咱们现已找到了,那么问题的解决计划也就呼之欲出了~

计划一:将 parallelStream 改成 stream,或者直接运用 foreach 遍历处理。也便是放弃多线程的写法,改为传统的单线程处理。

计划二:运用 list = new CopyOnWriteArrayList<>(); 这是个线程安全的类。从源码上看,CopyOnWriteArrayList 在 add 操作时,经过 ReentrantLock 进行加锁,避免并发写。可是每次 add 操作都是把原数组中的元素拷贝一份到新数组中,然后在新数组中增加新元素,最终再把引用指向新数组,这也就会频繁的创立数组(千万别忘了数组需求一块连续的内存空间)。所以当实际事务逻辑中存在很多 add 操作时,要慎重运用 CopyOnWriteArrayList 。

计划三:运用包装类 list = Collections.synchronizedList(Arrays.asList());

咱们在运用 parallelStream 之前,一定要仔细思考一下自己的事务逻辑是否真的需求多线程并发处理。其实在实际运用场景中,并不是一切的问题都合适运用并发来解决,比方当数据量不大时,次序履行往往比并行履行更快,毕竟预备线程池和其它相关资源也是需求时间的。可是,当使命涉及到 I/O 操作而且使命之间不互相依赖时,那么并行化便是一个不错的挑选。

小结

本人经验有限,有些地方或许讲的没有特别到位,假如您在阅览的时候想到了什么问题,欢迎在评论区留言,咱们后续再一一探讨‍

期望各位小伙伴动动自己可爱的小手,来一波点赞+重视 (✿◡‿◡) 让更多小伙伴看到这篇文章~ 蟹蟹呦(●’◡’●)

假如文章中有过错,欢迎大家留言纠正;若您有更好、更独到的了解,欢迎您在留言区留下您的名贵主意。

爱你所爱 行你所行 遵从你心 无问东西