Pipeline
管道或流水线,设计类似双向链表,ReferencePipeline主要实现了Stream接口,Stream是暴露给用户的接口。AbstractPipeline主要实现了BaseStream接口。它们的继承关系如图:
ReferencePipeline实现类有三种:Head-链表头、StatelessOp-无状态操作、StatefullOp-有状态操作。无状态操作和有状态操作都属于中间操作,Head是stream操作创建的第一个Stream,记录了数据集的Spliterator、stream操作的初始状态及特性、是进行同步操作还是异步操作。这篇文章分享的就是同步操作。中间操作包括:筛选、映射、去重、排序、切片等。
Spliterator
分路器或可分割迭代器(Splitable iterator),主要接口:
1)tryAdvance,执行接口,对元素进行操作或可向下传递
2)forEachRemaining,遍历数据集的剩余元素,通过tryAdvance分发
3)trySplit,并行操作,Fork分割接口,实现数据分割
4)estimateSize,评估执行数量,数据集执行的最终位置减去执行的初始位置,异步操作中配合trySplit使用,同步操作中数值为数据集总量
5)getExactSizeIfKnow 确切数据集数量
6)characteristics 特性,用于初始化Stream的初始状态
TerminalOp
终端操作,终端操作是对中间操作搜集的数据进行消费,终端操作包括遍历、匹配、查找、归约、聚合、统计、分组\分区等,这篇文章分享遍历、匹配、查找、归约。
终端操作继承或者包含了Sink,Sink接口继承Consumer接口,Sink主要包括begin、end、accept等方法,是数据的操作流向。中间操作通过opWrapSink产生Sink,opWrapSink是AbstractPipeline的抽象方法。中间操作或者终端操作具体做啥事情,都是通过实现Sink来实现。
Sink
水槽或操作池,槽就是有进有耗有出,Sink完成三件事情:
1)接收上一个Sink来的数据
2)对数据做中间操作,例如映射、排序等
3)数据流向下一个Sink
Sink的流动策略分为:单数据操作后流(无状态操作,例如:映射)、搜集全数据操作后流(有状态操作,例如:排序)
每个中间操作实现其对应的功能,中间操作是通过双向链表串联起来的ReferencePipeline,其中包括:nextStage,下一个ReferencePipeline;previousPipeline,前一个ReferencePipeline;sourceStage,指向Head ReferencePipeline。
中间操作ReferencePipeline通过evaluate执行终端操作TerminalOp,通过wrapAndCopyInfo完成ReferencePipeline向ChainedReference转换,ChainedReference中包含Sink。第一个Sink就是终端操作本身(ForEachOp)或者其内包含的Sink(MatchOp、FindOp、ReduceOp)。
wrapAndCopyInfo从最后一个中间操作开始,通过depth和previousState向前找,直到Head链接的第一个ReferencePipeline,Head的depth是0,不会产生Sink,不做任何操作。
ChainedReference中包含Sink,即指向下一个的Sink,Spliterator是Head中的sourceState中包含的Spliterator,即创建Stream时,数据集创建的Spliterator。
StatelessOp
无状态中间操作:筛选、映射、切片。
筛选:filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
// Android-changed: Make public, to match the method it's overriding.
public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
通过Predicate断言,判断是否流向下一个Sink,条件成立流向,条件不成立则阻断。
映射:map、flatMap等
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
通过Function来完成x->y转换,即y=f(x),不做拦截,转换后即流向下一个Sink。
切片:limit、skip
@Override
// Android-changed: Make public, to match the method it's overriding.
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}
跳过:n = skip,先进行n--,直到n == 0则进行向下传递。
限制:m = limit,跳过skip后,对m计数,直到m == 0则不再向下传递。
StatefulOp
有状态中间操作:去重、排序。
去重:distinct
@Override
// Android-changed: Make public, to match the method it's overriding.
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
return sink;
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
return new Sink.ChainedReference<T, T>(sink) {
boolean seenNull;
T lastSeen;
@Override
public void begin(long size) {
seenNull = false;
lastSeen = null;
downstream.begin(-1);
}
@Override
public void end() {
seenNull = false;
lastSeen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (t == null) {
if (!seenNull) {
seenNull = true;
downstream.accept(lastSeen = null);
}
} else if (lastSeen == null || !t.equals(lastSeen)) {
downstream.accept(lastSeen = t);
}
}
};
} else {
return new Sink.ChainedReference<T, T>(sink) {
Set<T> seen;
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
@Override
public void end() {
seen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (!seen.contains(t)) {
seen.add(t);
downstream.accept(t);
}
}
};
}
}
1)如果已去重,则返回上一个Sink,不再生产新的Sink。
2)如果已排序,则记录上个数据与本次数据通过equals做对比,如果不等则流向下一个Sink,如果相等则不再流向。
3)如果是未排序,则使用Set收集所有数据,接收到新的数据则于Set中数据做对比,如果不存在则流向下一个Sink,如果存在则不再流向。
排序:sorted
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// If the input is already naturally sorted and this operation
// also naturally sorted then this is a no-op
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
1)已排序,则不再生产新的Sink
2)如果size确定,则使用数组接收,SizedRefSortingSink。
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
private T[] array;
private int offset;
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = (T[]) new Object[(int) size];
}
@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
if (!cancellationWasRequested) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
@Override
public void accept(T t) {
array[offset++] = t;
}
}
在begin中创建数组array,在accept中接收所有数据,在end中使用Arrays.sort对数组进行排序,然后再循环流向下一个Sink,循环结束end流向下一个Sink。
3)如果size不确定,则使用ArrayList接收,RefSortingSink。
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
}
else {
for (T t : list) {
if (downstream.cancellationRequested()) break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}
}
在begin中创建ArrayList list,在accept中接收所有的数据,在end中使用list.sort排序,然后再循环流向下一个Sink,循环结束end流向下一个Sink。
TerminalOp
终端操作:遍历、匹配、查找、归约
遍历:forEach、forEachSorted
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
通过Consumer接收数据进行消费。
匹配:anyMatch、allMatch、noneMatch
public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<T> {
MatchSink() {
super(matchKind);
}
@Override
public void accept(T t) {
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);
}
通过Redicate断言是否匹配,并通过stop状态阻断操作。
查找:findFirst、findAny
private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
boolean hasValue;
T value;
FindSink() {} // Avoid creation of special accessor
@Override
public void accept(T value) {
if (!hasValue) {
hasValue = true;
this.value = value;
}
}
@Override
public boolean cancellationRequested() {
return hasValue;
}
/** Specialization of {@code FindSink} for reference streams */
static final class OfRef<T> extends FindSink<T, Optional<T>> {
@Override
public Optional<T> get() {
return hasValue ? Optional.of(value) : null;
}
}
通过accept接收数据,通过Supplier get返回需要的数据,以Optional呈现。
归约:reduce
public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<T, Optional<T>, ReducingSink> {
private boolean empty;
private T state;
public void begin(long size) {
empty = true;
state = null;
}
@Override
public void accept(T t) {
if (empty) {
empty = false;
state = t;
} else {
state = operator.apply(state, t);
}
}
@Override
public Optional<T> get() {
return empty ? Optional.empty() : Optional.of(state);
}
@Override
public void combine(ReducingSink other) {
if (!other.empty)
accept(other.state);
}
}
return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
接收到数据后进行归约操作。
总结
Stream流向,中间操作(Stream)以链表形式相互串联,终端操作,反向遍历到Head链接的第一个Stream,并生成每个Steam的Sink,再从第一个Sink开始执行。