文章目录
1. Strem 组件结构
1.1 操作分类
根据Java 8 Stream(1)-流的使用可以知道,Stream
的操作可以分为两大类:中间操作与终结操作。中间操作只是对操作进行了记录,终结操作才会实际触发计算逻辑(即惰性求值),源码中也把 Stream 的一个操作称为一个 stage
- 中间操作
又可以分为无状态(Stateless)操作
与有状态(Stateful)操作
,前者是指元素的处理不受之前元素的影响,可以一个一个即时处理;后者是指该操作只有拿到所有元素之后才能继续下去,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果- 终结操作
又可以分为短路操作
与非短路操作
,前者是指遇到某些符合条件的元素就可以得到最终结果;而后者是指必须处理所有元素才能得到最终结果
以下继承结构以
ReferencePipeline
为例,其对应的Stream
类型为StreamShape.REFERENCE
,也就是处理引用类型数据的流
1.2 操作对象的结构
target.stream()
.filter(fund -> fund.getConfirm()> 2)
.map(fund->String.valueOf(fund.getMode()))
.forEach(System.out::println);
以上代码其对应的操作执行如下图所示,其主要分为两个步骤:
Stream
被构造处理的时候首先获得一个Head
对象,这是整个流操作执行链的头节点。它也是一个操作行为的封装,只不过比较特殊,深度depth = 0
,且没有对数据的操作逻辑,其主要的作用是串起整个流处理流程。当中间操作都执行完,则获得了一条对每一步操作都进行了描述的 Stream 中间操作双向链表,头指针指到了stage2
- 当终结操作触发时,以终结操作本身的数据处理逻辑的封装对象
Sink0
为起点,从操作链表尾部stage2
逆向遍历,将操作动作中封装的数据处理逻辑封装成ChaineReference
对象,并将传入的上一个Sink 引用
赋值给新建 Sink 的 downStream 变量,从而形成单向的调用链,头指针指到了Sink2
1.2.1 流中间操作链表头对象 Head
以 ReferencePipeline.Head
内部对象为例,追溯其构造方法,最终抵达 AbstractPipeline
的构造方法。这其实就是 Stream 中间操作
对应的抽象节点,每一个中间操作都被封装成这样一个节点,然后通过前后指针连接起来,形成了一条双向链表。 从代码看其比较重要的属性如下:
previousStage
当前中间操作节点的上一个节点,因为Head
为整个双向链表最上游,故其前一个节点为 nullsourceSpliterator
数据源的可分解迭代器,并行流中分解任务所需sourceStage
保存的Head
头节点引用,用于获取保存在头节点关于整个 Stream 处理流程中的关键信息,如是否是并行模式depth
当前节点的深度,Head
头节点深度为 0,该值在并行流大任务fork()分解子任务时可用于维护任务层级parallel
是否是并行模式,决定了是否启用ForkJoinPool
用于并行执行任务
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
1.2.2 流中间操作的其他对象
以 ReferencePipeline#map()
对应的中间操作为例,其代码如下。可以看到其主要逻辑是生成一个新的 StatelessOp 对象,并且重写了opWrapSink()
方法
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
StatelessOp
的构造方法也会追溯到 AbstractPipeline
的构造方法,简单地说其主要完成以下几件事:
- 将调用方对象自身作为参数传入,成为这个新的操作对象的
previousStage
,执行previousStage.nextStage = this
将前一个操作对象的后指针指向这个新创建的操作对象,形成双向链表结构- 如果 Stream 中存在有状态的中间操作,
sourceStage.sourceAnyStateful = true
将其保存在 Head 头节点中this.depth = previousStage.depth + 1
新创建的操作对象深度为 previousStage 的 depth 加 1
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
1.2.3 数据处理逻辑的封装对象 Sink
以ReferencePipeline#map()
为例,新的 StatelessOp
对象生成时重写了opWrapSink()
方法,该方法会返回一个 Sink
对象。我们已经知道了 Stream 操作是如何记录下来形成双向链表的,但是这些操作中封装的回调方法(也就是我们写的对数据源的处理逻辑)的真正执行却需要借助 Sink
。Sink
接口为各个操作的数据处理逻辑的调用提供了规范,其包含的方法如下所示:
// 开始遍历元素之前调用该方法,通知Sink做好准备
default void begin(long size)
// 所有元素遍历完成之后调用,通知Sink没有更多的元素了
default void end()
// 是否可以结束操作,可以让短路操作尽早结束
default boolean cancellationRequested()
// 遍历元素时调用,接受一个待处理元素,并对元素进行处理
// Stage 把自己包含的回调方法封装到该方法里,前一个Stage只需要调用当前 Stage.accept(T t)方法就行了
default void accept(int value)
有了规范,每个
操作对象
都将自己的数据处理逻辑封装到一个Sink
中,前一个操作只需调用后一个操作的Sink
接口方法即可,不需要知道其内部是如何处理的。事实上Stream
各个操作内部实现的本质,就是如何重载Sink
的四个接口方法,各操作实现自己的逻辑,处理数据时只需要从Head
开始对数据源依次调用每个操作对象对应的Sink.{begin(),accept(),cancellationRequested(),end()}
方法就可以了
- 对于有状态的操作,
Sink
的begin()
和end()
方法是必须实现的。例如Stream.sorted()
,其操作对象封装的数据处理逻辑 RefSortingSink 对象中,begin()
方法创建了一个存储元素的容器,accept()
方法负责将元素添加到该容器,最后end()
实现对容器中的元素进行排序,并决定了下游 Sink 如何执行- 对于短路操作,Sink 的
cancellationRequested()
方法也是必须实现的,比如Stream.findFirst()
是短路操作,只要找到一个元素cancellationRequested()
就要返回true
,以便尽快结束查找
Sink
的继承结构如下,其中 BooleanTerminalSink
为 Match 操作的数据处理逻辑抽象,AccumulationgSink
为 Reduce 操作的数据处理逻辑抽象,ChainedReference
为所有中间操作的数据处理逻辑抽象
2. Stream 实现原理
2.1 Spliterator 可分割迭代器
Spliterator
可以看作一个splittable Iterator, 是 Java 8 中新增的一个迭代器。其缺省方法forEachRemaining(Consumer<? super E> action)
内部是一个循环,tryAdvance()
方法对下一个未处理的元素执行 action 并返回 true
, 如果没有下一个元素返回 false
。另外它也提供了trySplit()
,实现数据源的拆解,为多线程并行处理提供最小任务
/**
* Performs the given action for each remaining element, sequentially in
* the current thread, until all elements have been processed or the action
* throws an exception. If this Spliterator is {@link #ORDERED}, actions
* are performed in encounter order. Exceptions thrown by the action
* are relayed to the caller.
*
* @implSpec
* The default implementation repeatedly invokes {@link #tryAdvance} until
* it returns {@code false}. It should be overridden whenever possible.
*
* @param action The action
* @throws NullPointerException if the specified action is null
*/
default void forEachRemaining(Consumer<? super T> action) {
do {
} while (tryAdvance(action));
}
2.2 Stream 串行处理流程
-
以
ArrayList#stream()
方法作为流处理的入口,从代码看其实是调用了StreamSupport#stream()
方法,从而生成了操作记录链表的头节点ReferencePipeline.Head
,并将其引用返回public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
-
外部持有了
ReferencePipeline.Head
引用,再调用 filter() 方法其实是调用到了ReferencePipeline#filter()
,此时会新建一个无状态的中间操作,其重写的opWrapSink()
规定了该操作的下游操作的Sink
是如何组织数据处理逻辑的。完成这些后将新建的中间操作引用返回,之后再调用 map() 方法流程与此类似@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
-
当终结方法 forEach() 被调用时,首先调用
ForEachOps#makeRef()
新建最终操作ForEachOp
对象,之后调用了AbstractPipeline#evaluate()
开始执行操作中定义的数据处理逻辑@Override public void forEach(Consumer<? super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false)); }
-
AbstractPipeline#evaluate()
需要判断当前流是否是并行流,此处以串行流分析,则调用了terminalOp.evaluateSequential()
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
-
terminalOp.evaluateSequential()
方法调用到重写方法ForEachOp#evaluateSequential()
,可以看到其内部逻辑很清晰,其实就是调用到了AbstractPipeline#wrapAndCopyInto()
@Override public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) { return helper.wrapAndCopyInto(this, spliterator).get(); }
-
AbstractPipeline#wrapAndCopyInto()
中包含了整个流中最重要的逻辑,其主要是分为了两个步骤:wrapSink()
从操作链表的尾部开始,调用操作对象自身重写的 opWrapSink()方法将每一个操作对象中的数据处理逻辑封装成 Sink.ChainedReference,并将传入的 Sink 作为新建 Sink 的 downStream,从而形成单向调用链copyInto()
从调用链头部开始执行中间操作数据处理逻辑封装成的Sink
对象的方法,完成对数据源的处理
@Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; }
-
AbstractPipeline#wrapSink()
是数据处理逻辑从中间操作中抽取出来封装成Sink
对象的关键步骤,其主要逻辑是回调中间操作对象的opWrapSink()
方法,将其中包含的数据处理逻辑封装为新的Sink
,并将当前的Sink
作为新建Sink
的 downStream,促使单向调用链成型final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
-
AbstractPipeline#copyInto()
方法主要负责调用Sink
链,spliterator.forEachRemaining()
借助迭代器对每一个未被遍历的元素应用对数据的处理逻辑final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
-
ArrayListSpliterator#forEachRemaining()
中,action.accept()
在当前Sink#accept()
方法被调用后,会调用下一个Sink
的相关方法完成对数据的流处理public void forEachRemaining(Consumer<? super E> action) { int i, hi, mc; // hoist accesses and checks from loop ArrayList<E> lst; Object[] a; if (action == null) throw new NullPointerException(); if ((lst = list) != null && (a = lst.elementData) != null) { if ((hi = fence) < 0) { mc = lst.modCount; hi = lst.size; } else mc = expectedModCount; if ((i = index) >= 0 && (index = hi) <= a.length) { for (; i < hi; ++i) { @SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e); } if (lst.modCount == mc) return; } } throw new ConcurrentModificationException(); }
2.3 Stream 并行处理流程
-
流的并行处理与串行处理差别不是很大,主要的差异在于当终结操作被触发时,其调用的是
terminalOp.evaluateParallel()
,最终调用到其重写方法ForEachOp#evaluateParallel()
,然后生成了ForEachTask
这个ForkJoinTask
的子类,并调用其invoke()
提交到线程池执行需注意该步骤中的
helper.wrapSink(this)
会在生成线程池任务的时候生成 Sink 调用链@Override public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask<>(helper, spliterator, this).invoke(); else new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); return null; }
-
ForkJoinPool
相关流程请参考 Java 线程池源码详解(2)-ForkJoinPool 源码解析,可知最后会调用到ForEachTask#compute()
方法。这个方法内部会开启while
循环对任务进行估算,然后fork()
分解大任务,直到任务足够小才执行task.helper.copyInto(taskSink, rightSplit)
,之后的数据处理流程与串行流完全一致// Similar to AbstractTask but doesn't need to track child tasks public void compute() { Spliterator<S> rightSplit = spliterator, leftSplit; long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; if ((sizeThreshold = targetSize) == 0L) targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); boolean forkRight = false; Sink<S> taskSink = sink; ForEachTask<S, T> task = this; while (!isShortCircuit || !taskSink.cancellationRequested()) { if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null) { task.helper.copyInto(taskSink, rightSplit); break; } ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit); task.addToPendingCount(1); ForEachTask<S, T> taskToFork; if (forkRight) { forkRight = false; rightSplit = leftSplit; taskToFork = task; task = leftTask; } else { forkRight = true; taskToFork = leftTask; } taskToFork.fork(); sizeEstimate = rightSplit.estimateSize(); } task.spliterator = null; task.propagateCompletion(); }