您的位置:首页 > 编程语言 > Java开发

用 Java 实现 Stream 高效混排与 Spliterator

小路路 2021-01-13 20:55 48 查看 https://blog.51cto.com/1508239

对 Stream 执行排序操作只要调用排序 API 就好了,要实现相反的效果(混排)却并不简单。


本文介绍了如何使用 Java Stream `Collectors` 工厂方法与自定义 `Spliterator` 对 Stream 进行 Shuffle(混排),支持 Eager 与 Lazy 两种模式。


1. Eager Shuffle Collector


Heinz [在这篇文章][1]中给出了一种解决方案:将整个 Stream 转换为 list,对 list 执行 `Collections#shuffle`,再转为 Stream。像下面这样封装成一个复合操作:


[1]:https://www.javaspecialists.eu/archive/Issue258.html


```java
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
   return Collectors.collectingAndThen(
     toList(),
     list -> {
         Collections.shuffle(list);
         return list.stream();
     });
}
```


这种方法适用于对 Steam 中所有元素进行混排。由于会提前对集合中所有元素进行 Shuffle,如果只处理其中一部分则效果不佳,极端情况比如 Stream 只包含1个元素。


让我们来看看一个简单基准测试的运行结果:


```java
@State(Scope.Benchmark)
public class RandomSpliteratorBenchmark {
   private List<String> source;

   @Param({"1", "10", "100", "1000", "10000", "10000"})
   public int limit;

   @Param({"100000"})
   public int size;

   @Setup(Level.Iteration)
   public void setUp() {
       source = IntStream.range(0, size)
         .boxed()
         .map(Object::toString)
         .collect(Collectors.toList());
   }

   @Benchmark
   public List<String> eager() {
       return source.stream()
         .collect(toEagerShuffledStream())
         .limit(limit)
         .collect(Collectors.toList());
   }
```


```shell
           (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
```


从上面的数据可以看出,尽管运行结果 Stream 中元素不断增加,运行效果还是相当不错。因此,对整个集合提前混排太浪费了,尤其是元素较少的时候得分很差。


让我们看看来有什么好办法。


2. Lazy Shuffle Collector


为了节省 CPU 资源,与其对集合中所有元素预处理,不如根据需要只处理其中一部分。


为了达到这个效果,需要自定义一个 Spliterator 对所有对元素随机遍历,然后通过 `StreamSupport.stream` 构造创建一个 Stream 对象:


```java
public class RandomSpliterator<T> implements Spliterator<T> {
   // ...
   public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toList(),
         list -> StreamSupport.stream(
           new ShuffledSpliterator<>(list), false));
   }
}
```


3. 实现细节


即使只取出一个随机元素,也不能避免计算整个 Steam 中的元素(这意味着不支持无限序列)。因此,可以用 `List<T>` 初始化 `RandomSpliterator<T>`。“注意,这里有一个陷阱”。


如果给定 `List` 不支持在常量时间内完成随机访问,这种方案要比 Eager 方案慢得多。为了避免这种情况,可以在实例化 `Spliterator` 的时候进行简单检查:


```java
private RandomSpliterator(
 List<T> source, Supplier<? extends Random> random) {
   if (source.isEmpty()) { ... } // throw
   this.source = source instanceof RandomAccess
     ? source
     : new ArrayList<>(source);
   this.random = random.get();
}
```


相比随机访问时间复杂度不是 O(1) 的实现,创建 `ArrayList` 的成本可以忽略不计。


现在重写最重要的 `tryAdvance()` 方法。实现很简单,每次迭代都从 `source` 集合中随机挑选并删除一个元素。


不必担心 `source` 发生改变。这里不发布 `RandomSpliterator`,只返回基于它的一个 `Collector`:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
   int remaining = source.size();
   if (remaining > 0 ) {
       action.accept(source.remove(random.nextInt(remaining)));
       return true;
   } else {
       return false;
   }
}
```


除此之外,还需要实现其它3个方法:


```java
@Override
public Spliterator<T> trySplit() {
   return null; // 表示 split 可不行
}

@Override
public long estimateSize() {
   return source.size();
}

@Override
public int characteristics() {
   return SIZED;
}
```


现在检查一下是否有效果:


```java
IntStream.range(0, 10).boxed()
 .collect(toLazyShuffledStream())
 .forEach(System.out::println);
```


结果如下:


```shell
3
4
8
1
7
6
5
0
2
9
```


4. 性能考虑


在这个实现中,我们把大小为 N 的数组换成 M 查找或删除:


  • N:集合大小

  • M:挑选元素的数量


从 `ArrayList` 中查找或删除单个元素通常比交换开销大,因此方案的可扩展性不够好。但是对于 M 值较小的时候性能会好很多。


现在对比 Eager 方案(都包含100000个对象):


```shell
           (limit)   Mode  Cnt     Score     Error  Units
eager             1  thrpt    5   467.796 ±   9.074  ops/s
eager            10  thrpt    5   467.694 ±  17.166  ops/s
eager           100  thrpt    5   459.765 ±   8.048  ops/s
eager          1000  thrpt    5   467.934 ±  43.095  ops/s
eager         10000  thrpt    5   449.471 ±   5.549  ops/s
eager        100000  thrpt    5   331.111 ±   5.626  ops/s
lazy              1  thrpt    5  1530.763 ±  72.096  ops/s
lazy             10  thrpt    5  1462.305 ±  23.860  ops/s
lazy            100  thrpt    5   823.212 ± 119.771  ops/s
lazy           1000  thrpt    5   166.786 ±  16.306  ops/s
lazy          10000  thrpt    5    19.475 ±   4.052  ops/s
lazy         100000  thrpt    5     4.097 ±   0.416  ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart.png)


可以明显看到,如果数据流元素较少,新方案的性能优于前者。但随着“处理数量/集合大小”增加,吞吐量急剧下降。


这是因为从 `ArrayList` 中移除元素会带来额外开销,每次移除都会调用 `System#arraycopy` 对内部数组执行移位操作,开销较大。


对于较大的集合(1000000个元素)可以看到类似的模式:


```shell
     (limit)    (size)   Mode  Cnt  Score   Err  Units
eager       1  10000000  thrpt    5  0.915        ops/s
eager      10  10000000  thrpt    5  0.783        ops/s
eager     100  10000000  thrpt    5  0.965        ops/s
eager    1000  10000000  thrpt    5  0.936        ops/s
eager   10000  10000000  thrpt    5  0.860        ops/s
lazy        1  10000000  thrpt    5  4.338        ops/s
lazy       10  10000000  thrpt    5  3.149        ops/s
lazy      100  10000000  thrpt    5  2.060        ops/s
lazy     1000  10000000  thrpt    5  0.370        ops/s
lazy    10000  10000000  thrpt    5  0.05         ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-2.png)


在更小集合(128个元素)上的表现:


```shell
      (limit)    (size)   Mode  Cnt       Score   Error  Units
eager        2     128    thrpt    5  246439.459          ops/s
eager        4     128    thrpt    5  333866.936          ops/s
eager        8     128    thrpt    5  340296.188          ops/s
eager       16     128    thrpt    5  345533.673          ops/s
eager       32     128    thrpt    5  231725.156          ops/s
eager       64     128    thrpt    5  314324.265          ops/s
eager      128     128    thrpt    5  270451.992          ops/s
lazy         2     128    thrpt    5  765989.718          ops/s
lazy         4     128    thrpt    5  659421.041          ops/s
lazy         8     128    thrpt    5  652685.515          ops/s
lazy        16     128    thrpt    5  470346.570          ops/s
lazy        32     128    thrpt    5  324174.691          ops/s
lazy        64     128    thrpt    5  186472.090          ops/s
lazy       128     128    thrpt    5  108105.699          ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-3.png)


能不能进一步优化?


5. 进一步提高性能


不幸的是,现有的解决方案扩展性不尽如人意,让我们试着改进。但在此之前,先对现有操作进行测评:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.36.58.png)


不出意外,`Arraylist#remove` 是开销最大的操作之一。换句话说,从 `ArrayList` 中删除元素耗费了大量 CPU 资源。


为什么呢?从 `ArrayList` 中删除元素会对底层实现的数组执行移除操作。问题是,Java 数组不会自动调整大小,每次移除都会创建一个更小的新数组:


```java
private void fastRemove(Object[] es, int i) {
   modCount++;
   final int newSize;
   if ((newSize = size - 1) > i)
       System.arraycopy(es, i + 1, es, i, newSize - i);
   es[size = newSize] = null;
}
```


接下来该怎么办?避免从 `ArrayList` 中移除元素。


为了达到这个效果,可以用一个数组存储剩余的元素并记录它的大小:


```java
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
   private final Random random;
   private final T[] source;
   private int size;
   private ImprovedRandomSpliterator(
     List<T> source, Supplier<? extends Random> random) {
       if (source.isEmpty()) {
           throw new IllegalArgumentException(...);
       }
       this.source = (T[]) source.toArray();
       this.random = random.get();
       this.size = this.source.length;
   }
}
```


幸运的是,由于 `Spliterator` 的实例不会在线程之间共享,因此不会遇到并发问题。


现在尝试移除元素时,实际上不需要创建缩小后的新数组。相反,只要减小 `size` 并忽略数组的其余部分即可。


在此之前,把最后一个元素与返回的元素交换:


```java
@Override
public boolean tryAdvance(Consumer<? super T> action) {
   if (size > 0) {
       int nextIdx = random.nextInt(size);
       int lastIdx = size - 1;
       action.accept(source[nextIdx]);
       source[nextIdx] = source[lastIdx];
       source[lastIdx] = null; // let object be GCed
       size--;
       return true;
   } else {
       return false;
   }
}
```


对改进后的方案进行评测,可以看到开销最大的调用已经消失了:


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/Screenshot-2019-01-03-at-16.38.47.png)


准备在此运行基准测试进行比较:


```shell
              (limit)  (size)   Mode  Cnt     Score     Error  Units
eager                1  100000  thrpt    3   456.811 ±  20.585  ops/s
eager               10  100000  thrpt    3   469.635 ±  23.281  ops/s
eager              100  100000  thrpt    3   466.486 ±  68.820  ops/s
eager             1000  100000  thrpt    3   454.459 ±  13.103  ops/s
eager            10000  100000  thrpt    3   443.640 ±  96.929  ops/s
eager           100000  100000  thrpt    3   335.134 ±  21.944  ops/s
lazy                 1  100000  thrpt    3  1587.536 ± 389.128  ops/s
lazy                10  100000  thrpt    3  1452.855 ± 406.879  ops/s
lazy               100  100000  thrpt    3   814.978 ± 242.077  ops/s
lazy              1000  100000  thrpt    3   167.825 ± 129.559  ops/s
lazy             10000  100000  thrpt    3    19.782 ±   8.596  ops/s
lazy            100000  100000  thrpt    3     3.970 ±   0.408  ops/s
lazy_improved        1  100000  thrpt    3  1509.264 ± 170.423  ops/s
lazy_improved       10  100000  thrpt    3  1512.150 ± 143.927  ops/s
lazy_improved      100  100000  thrpt    3  1463.093 ± 593.370  ops/s
lazy_improved     1000  100000  thrpt    3  1451.007 ±  58.948  ops/s
lazy_improved    10000  100000  thrpt    3  1148.581 ± 232.218  ops/s
lazy_improved   100000  100000  thrpt    3   383.022 ±  97.082  ops/s
```


(https://e2p3q8w7.stackpathcdn.com/wp-content/uploads/2018/12/chart-5.png)


从上面的结果可以看出,改进后的方案性能受元素数量变化影响显著减小。


实际上,即使遇到最差情况,改进方案的性能也比基于 `Collections#shuffle` 的方案略好一些。


6. 完整示例


完整示例可以在 [GitHub][2] 上找到。


[2]:https://github.com/pivovarit/articles/tree/master/java-random-stream


```java
package com.pivovarit.stream;
import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ImprovedRandomSpliterator<T> implements Spliterator<T> {
   private final Random random;
   private final T[] source;
   private int size;
   ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
       if (source.isEmpty()) {
           throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
       }
       this.source = (T[]) source.toArray();
       this.random = random.get();
       this.size = this.source.length;
   }
    @Override
   public boolean tryAdvance(Consumer<? super T> action) {
       if (size > 0) {
           int nextIdx = random.nextInt(size);
           int lastIdx = size - 1;
           action.accept(source[nextIdx]);
           source[nextIdx] = source[lastIdx];
           source[lastIdx] = null; // let object be GCed
           size--;
           return true;
       } else {
           return false;
       }
   }
   @Override
   public Spliterator<T> trySplit() {
       return null;
   }
   @Override
   public long estimateSize() {
       return source.length;
   }
   @Override
   public int characteristics() {
       return SIZED;
   }
}
```


```java
package com.pivovarit.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.toCollection;
public final class RandomCollectors {

   private RandomCollectors() {
   }

   public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> !list.isEmpty()
           ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
           : Stream.empty());
   }

   public static <T> Collector<T, ?, Stream<T>> toLazyShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> !list.isEmpty()
           ? StreamSupport.stream(new RandomSpliterator<>(list, Random::new), false)
           : Stream.empty());
   }

   public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
       return Collectors.collectingAndThen(
         toCollection(ArrayList::new),
         list -> {
             Collections.shuffle(list);
             return list.stream();
         });
   }
}
```


标签: