Skip to content

Commit 8b238a6

Browse files
xiajiafuCarpenterLee
authored andcommitted
并行流介绍 (#6)
1 parent 2fc61a4 commit 8b238a6

7 files changed

+258
-0
lines changed

.gitignore

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Created by .ignore support plugin (hsz.mobi)
2+
### Java template
3+
# Compiled class file
4+
*.class
5+
6+
# Log file
7+
*.log
8+
9+
# BlueJ files
10+
*.ctxt
11+
12+
# Mobile Tools for Java (J2ME)
13+
.mtj.tmp/
14+
15+
# Package Files #
16+
*.jar
17+
*.war
18+
*.nar
19+
*.ear
20+
*.zip
21+
*.tar.gz
22+
*.rar
23+
*.idea
24+
25+
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
26+
hs_err_pid*
27+

6-Stream Pipelines.md

+48
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,54 @@ int longestStringLengthStartingWithA
3232
<table width="600"><tr><td colspan="3" align="center" border="0">Stream操作分类</td></tr><tr><td rowspan="2" border="1">中间操作(Intermediate operations)</td><td>无状态(Stateless)</td><td>unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td></tr><tr><td>有状态(Stateful)</td><td>distinct() sorted() sorted() limit() skip() </td></tr><tr><td rowspan="2" border="1">结束操作(Terminal operations)</td><td>非短路操作</td><td>forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td></tr><tr><td>短路操作(short-circuiting)</td><td>anyMatch() allMatch() noneMatch() findFirst() findAny()</td></tr></table>
3333

3434
Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(*Stateless*)和有状态的(*Stateful*),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如*找到第一个满足条件的元素*。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
35+
为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。
36+
```Java
37+
IntStream.range(1, 10)
38+
.peek(x -> System.out.print("\nA" + x))
39+
.limit(3)
40+
.peek(x -> System.out.print("B" + x))
41+
.forEach(x -> System.out.print("C" + x));
42+
```
43+
输出为:
44+
A1B1C1
45+
A2B2C2
46+
A3B3C3
47+
中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的forEach操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,...,直到最初的第一步。
48+
第一次forEach执行的时候,会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A1B1C1
49+
第二次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A2B2C2
50+
51+
...
52+
当第四次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,到limit的时候,发现limit(3)这个job已经完成,这里就相当于循环里面的break操作,跳出来终止循环。
53+
54+
再来看第二段代码:
55+
56+
```Java
57+
IntStream.range(1, 10)
58+
.peek(x -> System.out.print("\nA" + x))
59+
.skip(6)
60+
.peek(x -> System.out.print("B" + x))
61+
.forEach(x -> System.out.print("C" + x));
62+
```
63+
输出为:
64+
A1
65+
A2
66+
A3
67+
A4
68+
A5
69+
A6
70+
A7B7C7
71+
A8B8C8
72+
A9B9C9
73+
第一次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,因为执行到skip,这个操作的意思就是跳过,下面的都不要执行了,也就是就相当于循环里面的continue,结束本次循环。输出:A1
74+
75+
第二次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第二次skip,结束本次循环。输出:A2
76+
77+
...
78+
79+
第七次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第七次skip,已经大于6了,它已经执行完了skip(6)的job了。这次skip就直接跳过,继续执行下面的操作。输出:A7B7C7
80+
81+
...直到循环结束。
82+
3583

3684
## 一种直白的实现方式
3785

7-ParallelStream.md

+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
# parallelStream 介绍
2+
3+
## 引言
4+
大家应该已经对Stream有过很多的了解,对其原理及常见使用方法已经也有了一定的认识。流在处理数据进行一些迭代操作的时候确认很方便,但是在执行一些耗时或是占用资源很高的任务时候,串行化的流无法带来速度/性能上的提升,并不能满足我们的需要,通常我们会使用多线程来并行或是分片分解执行任务,而在Stream中也提供了这样的并行方法,那就是使用parallelStream()方法或者是使用stream().parallel()来转化为并行流。开箱即用的并行流的使用看起来如此简单,然后我们就可能会忍不住思考,并行流的实现原理是怎样的?它的使用会给我们带来多大的性能提升?我们可以在什么场景下使用以及使用时应该注意些什么?
5+
6+
首先我们看一下Java 的并行 API 演变历程基本如下:
7+
- 1.0-1.4 中的 java.lang.Thread
8+
- 5.0 中的 java.util.concurrent
9+
- 6.0 中的 Phasers 等
10+
- 7.0 中的 Fork/Join 框架
11+
- 8.0 中的 Lambda
12+
13+
## parallelStream是什么?
14+
先看一下`Collection`接口提供的并行流方法
15+
```java
16+
/**
17+
* Returns a possibly parallel {@code Stream} with this collection as its
18+
* source. It is allowable for this method to return a sequential stream.
19+
*
20+
* <p>This method should be overridden when the {@link #spliterator()}
21+
* method cannot return a spliterator that is {@code IMMUTABLE},
22+
* {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
23+
* for details.)
24+
*
25+
* @implSpec
26+
* The default implementation creates a parallel {@code Stream} from the
27+
* collection's {@code Spliterator}.
28+
*
29+
* @return a possibly parallel {@code Stream} over the elements in this
30+
* collection
31+
* @since 1.8
32+
*/
33+
default Stream<E> parallelStream() {
34+
return StreamSupport.stream(spliterator(), true);
35+
}
36+
```
37+
注意其中的代码注释的返回值 `@return a possibly parallel` 一句说明调用了这个方法,只是可能会返回一个并行的流,流是否能并行执行还受到其他一些条件的约束。
38+
parallelStream其实就是一个并行执行的流,它通过默认的`ForkJoinPool`**可能**提高你的多线程任务的速度。
39+
引用[Custom thread pool in Java 8 parallel stream](https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream)上面的两段话:
40+
> The parallel streams use the default `ForkJoinPool.commonPool` which [by default has one less threads as you have processors](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html), as returned by `Runtime.getRuntime().availableProcessors()` (This means that parallel streams use all your processors because they also use the main thread)。
41+
42+
做个实验来证明上面这句话的真实性:
43+
```java
44+
public static void main(String[] args) {
45+
IntStream list = IntStream.range(0, 10);
46+
Set<Thread> threadSet = new HashSet<>();
47+
//开始并行执行
48+
list.parallel().forEach(i -> {
49+
Thread thread = Thread.currentThread();
50+
System.err.println("integer:" + i + "" + "currentThread:" + thread.getName());
51+
threadSet.add(thread);
52+
});
53+
System.out.println("all threads:" + Joiner.on("").join(threadSet.stream().map(Thread::getName).collect(Collectors.toList())));
54+
}
55+
```
56+
<img src="./Figures/13932958-263c866e35df81e5.png">
57+
58+
从运行结果里面我们可以很清楚的看到parallelStream同时使用了主线程和`ForkJoinPool.commonPool`创建的线程。
59+
值得说明的是这个运行结果并不是唯一的,实际运行的时候可能会得到多个结果,比如:
60+
61+
<img src="./Figures/13932958-e1836ce1a66f41ec.png">
62+
63+
甚至你的运行结果里面只有主线程。
64+
65+
来源于java 8 实战的书籍的一段话:
66+
> 并行流内部使用了默认的`ForkJoinPool`(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由`Runtime.getRuntime().available- Processors()`得到的。 但是你可以通过系统属性`java.util.concurrent.ForkJoinPool.common. parallelism`来改变线程池大小,如下所示: `System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");` 这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个 并行流指定这个值。一般而言,让`ForkJoinPool`的大小等于处理器数量是个不错的默认值, 除非你有很好的理由,否则我们强烈建议你不要修改它。
67+
68+
```java
69+
// 设置全局并行流并发线程数
70+
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
71+
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
72+
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
73+
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
74+
```
75+
为什么两次的运行结果是一样的呢?上面刚刚说过了这是一个全局设置,`java.util.concurrent.ForkJoinPool.common.parallelism`是final类型的,整个JVM中只允许设置一次。既然默认的并发线程数不能反复修改,那怎么进行不同线程数量的并发测试呢?答案是:`引入ForkJoinPool`
76+
```java
77+
IntStream range = IntStream.range(1, 100000);
78+
// 传入parallelism
79+
new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();
80+
```
81+
因此,使用parallelStream时需要注意的一点是,**多个parallelStream之间默认使用的是同一个线程池**,所以IO操作尽量不要放进parallelStream中,否则会阻塞其他parallelStream。
82+
> Using a ForkJoinPool and submit for a parallel stream does not reliably use all threads. If you look at this ( [Parallel stream from a HashSet doesn't run in parallel](https://stackoverflow.com/questions/28985704/parallel-stream-from-a-hashset-doesnt-run-in-parallel) ) and this ( [Why does the parallel stream not use all the threads of the ForkJoinPool?](https://stackoverflow.com/questions/36947336/why-does-the-parallel-stream-not-use-all-the-threads-of-the-forkjoinpool) ), you'll see the reasoning.
83+
84+
```java
85+
// 获取当前机器CPU处理器的数量
86+
System.out.println(Runtime.getRuntime().availableProcessors());// 输出 4
87+
// parallelStream默认的并发线程数
88+
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 3
89+
```
90+
为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?文章的开始已经提过了。因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。
91+
这一点可以从源码中看出来:
92+
```java
93+
static final int MAX_CAP = 0x7fff; // max #workers - 1
94+
// 无参构造函数
95+
public ForkJoinPool() {
96+
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
97+
defaultForkJoinWorkerThreadFactory, null, false);
98+
}bs-channel
99+
```
100+
101+
## 从parallelStream认识[Fork/Join 框架](https://www.infoq.cn/article/fork-join-introduction/)
102+
Fork/Join 框架的核心是采用分治法的思想,将一个大任务拆分为若干互不依赖的子任务,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。同时,为了最大限度地提高并行处理能力,采用了工作窃取算法来运行任务,也就是说当某个线程处理完自己工作队列中的任务后,尝试当其他线程的工作队列中窃取一个任务来执行,直到所有任务处理完毕。所以为了减少线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
103+
- Fork/Join 的运行流程图
104+
<img src="./Figures/13932958-dbceae46ea7c15c3.png">
105+
106+
简单地说就是大任务拆分成小任务,分别用不同线程去完成,然后把结果合并后返回。所以第一步是拆分,第二步是分开运算,第三步是合并。这三个步骤分别对应的就是Collector的*supplier*,*accumulator**combiner*
107+
- 工作窃取算法
108+
Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的CPU,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
109+
<img src="./Figures/13932958-ffe0d5ddd7101bbc.png">
110+
111+
## 使用parallelStream的利弊
112+
使用parallelStream的几个好处:
113+
1) 代码优雅,可以使用lambda表达式,原本几句代码现在一句可以搞定;
114+
2) 运用多核特性(forkAndJoin)并行处理,大幅提高效率。
115+
关于并行流和多线程的性能测试可以看一下下面的几篇博客:
116+
[并行流适用场景-CPU密集型](https://blog.csdn.net/larva_s/article/details/90403578)
117+
[提交订单性能优化系列之006-普通的Thread多线程改为Java8的parallelStream并发流](https://blog.csdn.net/blueskybluesoul/article/details/82817007)
118+
119+
然而,任何事物都不是完美的,并行流也不例外,其中最明显的就是使用(parallel)Stream极其不便于代码的跟踪调试,此外并行流带来的不确定性也使得我们对它的使用变得格外谨慎。我们得去了解更多的并行流的相关知识来保证自己能够正确的使用这把双刃剑。
120+
121+
parallelStream使用时需要注意的点:
122+
1) **parallelStream是线程不安全的;**
123+
```java
124+
List<Integer> values = new ArrayList<>();
125+
IntStream.range(1, 10000).parallel().forEach(values::add);
126+
System.out.println(values.size());
127+
```
128+
values集合大小可能不是10000。集合里面可能会存在null元素或者抛出下标越界的异常信息。
129+
原因:List不是线程安全的集合,add方法在多线程环境下会存在并发问题。
130+
当执行add方法时,会先将此容器的大小增加。。即size++,然后将传进的元素赋值给新增的`elementData[size++]`,即新的内存空间。但是此时如果在size++后直接来取这个List,而没有让add完成赋值操作,则会导致此List的长度加一,,但是最后一个元素是空(null),所以在获取它进行计算的时候报了空指针异常。而下标越界还不能仅仅依靠这个来解释,如果你观察发生越界时的数组下标,分别为10、15、22、33、49和73。结合前面讲的数组自动机制,数组初始长度为10,第一次扩容为15=10+10/2,第二次扩容22=15+15/2,第三次扩容33=22+22/2...以此类推,我们不难发现,越界异常都发生在数组扩容之时。
131+
`grow()`方法解释了基于数组的ArrayList是如何扩容的。数组进行扩容时,会将老数组中的元素重新拷贝一份到新的数组中,通过`oldCapacity + (oldCapacity >> 1)`运算,每次数组容量的增长大约是其原容量的1.5倍。
132+
```java
133+
/**
134+
* Increases the capacity to ensure that it can hold at least the
135+
* number of elements specified by the minimum capacity argument.
136+
*
137+
* @param minCapacity the desired minimum capacity
138+
*/
139+
private void grow(int minCapacity) {
140+
// overflow-conscious code
141+
int oldCapacity = elementData.length;
142+
int newCapacity = oldCapacity + (oldCapacity >> 1);// 1.5倍扩容
143+
if (newCapacity - minCapacity < 0)
144+
newCapacity = minCapacity;
145+
if (newCapacity - MAX_ARRAY_SIZE > 0)
146+
newCapacity = hugeCapacity(minCapacity);
147+
// minCapacity is usually close to size, so this is a win:
148+
elementData = Arrays.copyOf(elementData, newCapacity);// 拷贝旧的数组到新的数组中
149+
}
150+
151+
152+
/**
153+
* Appends the specified element to the end of this list.
154+
*
155+
* @param e element to be appended to this list
156+
* @return <tt>true</tt> (as specified by {@link Collection#add})
157+
*/
158+
public boolean add(E e) {
159+
ensureCapacityInternal(size + 1); // Increments modCount!! 检查array容量
160+
elementData[size++] = e;// 赋值,增大Size的值
161+
return true;
162+
}
163+
```
164+
解决方法:
165+
加锁、使用线程安全的集合或者采用`collect()`或者`reduce()`操作就是满足线程安全的了。
166+
```java
167+
List<Integer> values = new ArrayList<>();
168+
for (int i = 0; i < 10000; i++) {
169+
values.add(i);
170+
}
171+
List<Integer> collect = values.stream().parallel().collect(Collectors.toList());
172+
System.out.println(collect.size());
173+
```
174+
2) parallelStream 适用的场景是CPU密集型的,只是做到别浪费CPU,假如本身电脑CPU的负载很大,那还到处用并行流,那并不能起到作用;
175+
- I/O密集型 磁盘I/O、网络I/O都属于I/O操作,这部分操作是较少消耗CPU资源,一般并行流中不适用于I/O密集型的操作,就比如使用并流行进行大批量的消息推送,涉及到了大量I/O,使用并行流反而慢了很多
176+
- CPU密集型 计算类型就属于CPU密集型了,这种操作并行流就能提高运行效率。
177+
178+
3) 不要在多线程中使用parallelStream,原因同上类似,大家都抢着CPU是没有提升效果,反而还会加大线程切换开销;
179+
4) 会带来不确定性,请确保每条处理无状态且没有关联;
180+
5) 考虑NQ模型:N可用的数据量,Q针对每个数据元素执行的计算量,乘积 N * Q 越大,就越有可能获得并行提速。N * Q>10000(大概是集合大小超过1000) 就会获得有效提升;
181+
6) parallelStream是创建一个并行的Stream,而且它的并行操作是*不具备线程传播性*的,所以是无法获取ThreadLocal创建的线程变量的值;
182+
7) **在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序**
183+
8) lambda的执行并不是瞬间完成的,所有使用parallel stream的程序都有可能成为阻塞程序的源头,并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。

Figures/13932958-263c866e35df81e5.png

31.9 KB
Loading

Figures/13932958-dbceae46ea7c15c3.png

117 KB
Loading

Figures/13932958-e1836ce1a66f41ec.png

43.8 KB
Loading

Figures/13932958-ffe0d5ddd7101bbc.png

67.4 KB
Loading

0 commit comments

Comments
 (0)