Java8 Stream流解析
Stream组成
Stream包含一个数据源头(Source),一组(0个或多个)中间操作和一个终止操作。
中间操作,如distinct、filter、limit、map、skip、sorted
终止操作,如count,forEach、collect
Stream实现原理
以如下代码为例
List<String> list1 = Arrays.asList("1", "2", "3", "4", "5", "6", "7");
List<String> collect = list1.stream().skip(2).map(s -> s + "sss").map(s -> s + "sss2").collect(
Collectors.toList());
1.进入Collection
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
2.进入Arrays
public Spliterator<E> spliterator() {
return Spliterators.spliterator(a, Spliterator.ORDERED);
}
3.生成一个数组切分器ArraySpliterator,里面包含了原始数据数据、数组边界
public static <T> Spliterator<T> spliterator(Object[] array,
int additionalCharacteristics) {
return new ArraySpliterator<>(Objects.requireNonNull(array),
additionalCharacteristics);
}
public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
}
4.StreamSupport生成流
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
5.生成一个Head,具体的操作在AbstractPipeline
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
流的生成
1.spliterator 拆分遍历器
spliterator提供了对于一个源头(比如array)的遍历(traverse)和分区(partition)的能力。也就是说,通过Spliterator来遍历数据流源头的每个元素(或者一个bulk的批量),也通过它来分区数据将其parallel并行化。看它的名字嘛,split+iterator,就是这个意思
它的方法
tryAdvance():单独遍历
forEachRemaining():批量顺序遍历
trySplit():拆分元素用于并行操作
spliterator()生成的对象
2.生成的源头流Head
其中source是它本身,previousStage为null,nextStage也为null。它类似指针
流的各种操作
1.skip(long n)
1.
public final Stream<P_OUT> skip(long n) {
if (n < 0)
throw new IllegalArgumentException(Long.toString(n));
if (n == 0)
return this;
else
return SliceOps.makeRef(this, n, -1);
}
2.生成一个带状态的中间的stage(本身也是流),核心是实现了opWrapSink()方法
new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
flags(limit)) {
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
}
}
3.生成stage具体操作在
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
最终的结果,skip()方法生成了一个中间状态的stage的流,它的sourceStage和previousStage都是Head流,nextStage为null
3. map(Function mapper)
1.生成一个无状态的中间stage
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
4.collect()
1.执行collect方法
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
2.定义终止操作TerminalOp,它不是流,它以流为输入产生一个结果,定义了输入类型、流的属性、结果类型、操作属性
根据Collectors.toList()
3.最终调用terminalOp.evaluateSequential方法,
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
4.
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
makeSink()最终是new ReduceSink(),也就是一个消费者consumer
5.包装并复制Sink
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
6.包装Sink,负责调用之前拓扑结构的每个Sink的onWrapSink方法把这个链式结构建立起来
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
7.以map为例,最终调用的opWrapSink(int flags, Sink<R> sink)
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
8.最终生成了一个Sink的引用链,内部属性downstream也是Sink,输入是Sink,输出还是Sink,只是输出在输入外面包了一层引用。它是从后往前、从内到外包装的
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
9.然后copyInto这个SinkChain,先执行begin(内部也是一条链路处理到底),再执行通过forEachRemaining执行accept(),最后执行end((内部也是一条链路处理到底))
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
对切分器(内含原数组数据)执行遍历方法,action就是Sink链,也就是对Sink链调用accept方法消费
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do {
action.accept((T)a[i]); } while (++i < hi);
}
}
10.accept()消费
skip:如果不skip就继续向下传递
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
map:直接把消费生成的结果向下传递,中间变量也省了
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
这样一个链就开始从头到尾执行
11.最终结果生成,调用了终止操作的消费方法,其中
state是一个数组,accumulator.accept()最终调用了数组的add方法,将元素加入到state数组
public void accept(T t) {
accumulator.accept(state, t);
}
12.最后调用Sink.get()方法,得到结果
public U get() {
return state;
}
5.sorted 虽然也是中间操作,但是它处理的时候,必须等到所有元素都汇聚然后再进行处理。处理完成后才能继续处理下一个流
begin()负责构造list
accept()负责把流元素add进去
end()负责排序sort,并未处理的流依次调用begin()、accept()、end()方法
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}```
6.distinct
```java
return new Sink.ChainedReference<T, T>(sink) {
Set<T> seen;
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
@Override
public void end() {
seen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (!seen.contains(t)) {
seen.add(t);
downstream.accept(t);
}
}
};
最终的流
包装的最终的sink链
最终的链式拓扑结构如下:
链式的执行依赖于TerminalOp究竟是什么,因此一个Stream的执行是lazy的,当流构建好时,只是一个Sink的链式结构,最终的遍历和执行需要一个终止操作来触发
最终的结果Sink为