您的位置:首页 > 编程语言 > Java开发

java8 ParallelStream 的并行体验,利用多核机器加快计算速度

2018-03-08 22:00 369 查看
我们现在cup 进入多核时代, 所以我们测试java8对Stream的并行处理: 
对Stream进行并发,并行执行,怎么做呢?我们用Stream来对它进行加法的运算我们用Stream获取到一个大值,用来比较运算时间
private static long iterateStream(long limit) {//定义一个上限
return Stream.iterate(1L, i -> i + 1)//每次都+1 它进行循环
.limit(limit).reduce(0L, Long::sum);//对它进行相加
}下面是我们最普通的代码:
private static long normalAdd(long limit) {
long result = 0L;
for (long i = 1L; i < limit; i++) {
result += i;
}
return result;
}上面两个片段,作用都一样, 现在我们对它俩运行相同的次数,当然,先要写一个测量的方法:
private static long measureSumPerformance(Function<Long, Long> adder, long limit) {
//给一个最快的速度long fastest = Long.MAX_VALUE; //上面我们用到了Function 给了一个参数,又出来一个参数,
for (int i = 0; i < 10; i++) {
long startTimestamp = System.currentTimeMillis();//开始时间
long result = adder.apply(limit);//结果,
long duration = System.currentTimeMillis() - startTimestamp;//耗时
  
       
    System.out.println("The result of sum=>" + result);//也就是计算结果输出
    if (duration < fastest) fastest = duration;//计算了n次,拿到最优的时间(也就是最小的)
      }      
    return fastest;
}
给一个输出并调用
public static void main(String[] args) {
System.out.println("The best process time(normalAdd)=>"+measureSumPerformance(ParallelProcessing::normalAdd, 100_000_000) + " MS");The best process time(normalAdd)=>40 MS
我们看到最普通的就是40MS 
两个一起调用:
public static void main(String[] args) {
System.out.println("The best process time(normalAdd)=>" + measureSumPerformance(ParallelProcessing::normalAdd, 100_000_000) + " MS");
System.out.println("The best process time(iterateStream)=>" + measureSumPerformance(ParallelProcessing::iterateStream, 10_000_000) + " MS");The best process time(normalAdd)=>42 MS
The best process time(iterateStream)=>99 MS
我们看到第一个用了Stream的耗时99  而最普通的是42 
这里你可能会说Stream不是有并行嘛?当然我们把Stream用并行也就是parallel
private static long parallelStream(long limit) {
return Stream.iterate(1L, i -> i + 1).parallel()
.limit(limit).reduce(0L, Long::sum);
}结果输出还是比不并行还惨:
The best process time(normalAdd)=>39 MS
The best process time(iterateStream)=>93 MS

The best process time(parallelStream)=>116 MS
我们看到最后加了个并行还是慢了
当然我们现在的并行是比较吃亏的:
Stream.iterate(1L, i -> i + 1).因为我们拿出来的东西是一个Object的,每次都要进行拆箱成long类型的
所以我们变成Long类型的:(更上面操作一样但是我们多了个拆箱)
private static long parallelStream2(long limit) {
return Stream.iterate(1L, i -> i + 1).mapToLong(Long::longValue).parallel()
.limit(limit).reduce(0L, Long::sum);
}在运行一遍:
The best process time(normalAdd)=>40 MS
The best process time(iterateStream)=>94 MS
The best process time(parallelStream)=>122 MS

The best process time(parallelStream2)=>177 MS
我们看到的结果分别是,最普通的40m,
加了用Stream.iter的是94,
 然后我们给加了一个并行 结果122
,最后我们加了个拆箱结果是177
我们最后当然要放大了,用人家给的LongStream进行运算并行,
private static long parallelStream3(long limit) {
return LongStream.rangeClosed(1, limit).parallel().reduce(0L, Long::sum);
}The best process time(normalAdd)=>36 MS
The best process time(iterateStream)=>97 MS
The best process time(parallelStream)=>120 MS
The best process time(parallelStream2)=>162 MS
The best process time(parallelStream3)=>24 MS
现在我们放掉中间的,让两个最小的比较:
The best process time(normalAdd)=>42 MS

The best process time(parallelStream3)=>25 MS
我们可以看到,并行Stream的速度还是挺慢的
当然jDK8中也给了一种方式叫做
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");当然他会随着硬件的改变而改变,他会影响整个ForkJoin,不断的设置参数,(不建议改变)
说一下为什么我们拆箱分箱了还是这么慢?其实不是所有Stream都可以达到并行的 (对应着Java8 inaction的7.2章节)
有些方法是对并行讨喜的有些是不讨喜的,比方说ArrayList对并行是非常好的,书中章节都可以讲到,
过程当中去权衡对数据源产生的Stream比如我们上面的例子,从Stream.iterate 参生的就是不好的而我们用原生的也就是rangeClosed就是最好的,而我们最快的也就是有rangeclosed产生的.
 我贴出整块代码:
public static void main(String[] args) {
System.out.println("The best process time(normalAdd)=>" + measureSumPerformance(ParallelProcessing::normalAdd, 100_000_000) + " MS");
// System.out.println("The best process time(iterateStream)=>" + measureSumPerformance(ParallelProcessing::iterateStream, 10_000_000) + " MS");
// System.out.println("The best process time(parallelStream)=>" + measureSumPerformance(ParallelProcessing::parallelStream, 10_000_000) + " MS");
// System.out.println("The best process time(parallelStream2)=>" + measureSumPerformance(ParallelProcessing::parallelStream2, 10_000_000) + " MS");
System.out.println("The best process time(parallelStream3)=>" + measureSumPerformance(ParallelProcessing::parallelStream3, 100_000_000) + " MS");
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
}

private static long measureSumPerformance(Function<Long, Long> adder, long limit) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long startTimestamp = System.currentTimeMillis();
long result = adder.apply(limit);
long duration = System.currentTimeMillis() - startTimestamp;
// System.out.println("The result of sum=>" + result);
if (duration < fastest) fastest = duration;
}

return fastest;
}

private static long iterateStream(long limit) {//定义一个上限
return Stream.iterate(1L, i -> i + 1)//每次都+1 它进行循环
.limit(limit).reduce(0L, Long::sum);//对它进行相加
}

private static long parallelStream(long limit) {
return Stream.iterate(1L, i -> i + 1).parallel()

a2a1
.limit(limit).reduce(0L, Long::sum);
}

private static long parallelStream2(long limit) {
return Stream.iterate(1L, i -> i + 1).mapToLong(Long::longValue).parallel()
.limit(limit).reduce(0L, Long::sum);
}

private static long parallelStream3(long limit) {
return LongStream.rangeClosed(1, limit).parallel().reduce(0L, Long::sum);
}

private static long normalAdd(long limit) {
long result = 0L;
for (long i = 1L; i < limit; i++) {
result += i;
}
return result;
}
}方便大家测试........................
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: