一尘不染

并行流,收集器和线程安全

java

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最后wordsCount{a=2, b=1, c=1}

但是我的数据流很大,我想并行化作业,所以我写:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

但是我注意到这wordsCount很简单,HashMap所以我想知道是否需要显式请求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本?


阅读 321

收藏
2020-09-08

共1个答案

一尘不染

非并行收集器可以安全地与并行流一起使用吗?从并行流中收集时,我是否应该仅使用并发版本?

collect并行流的操作中使用非并行收集器是安全的。

在接口的规范Collector,有六个要点的部分是:

对于非并发收集器,必须将结果提供者,累加器或组合器函数返回的所有结果串行地限制在线程中。这使收集可以并行发生,而收集器无需实现任何其他同步。简化实现必须管理输入已正确分区,分区被隔离处理以及仅在累加完成后才进行合并。

这意味着Collectors该类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器也是如此。这也适用于您可能实现的任何您自己的非并行收集器。只要您的收集器不会干扰流源,无副作用,与订单无关等,它们就可以安全地与并行流一起使用。

我还建议阅读java.util.stream软件包文档中的Mutable
Reduction
一节。在本节的中间,有一个示例,该示例被声明为可并行化的,但是将结果收集到一个ArrayList不是线程安全的内。

这种工作方式是:以非并行收集器结尾的并行流可确保不同的线程始终在中间结果收集的不同实例上运行。这就是为什么收集器具有一个Supplier函数的功能,该函数用于创建与线程一样多的中间集合,因此每个线程可以累积到自己的线程中。当要合并中间结果时,它们将在线程之间安全地移交,并且在任何给定时间,只有一个线程将合并任何一对中间结果。

2020-09-08