Java8 Stream源码精讲(一):从一个简单的例子入手
上一篇文章,通过分析一个使用Stream操作数据的例子,讲解了构建Stream,经过中间操作map()和filter()方法调用返回一个ReferencePipeline链表,调用终止操作forEach()将声明的函数构造成为一个sink链表,最终每一个元素都会被传入Sink#accept()方法处理。本章将通过重点分析创建Stream的源码,了解Stream的构建过程。
在分析Stream构建之前,需要填一下上一章的坑,还记得吗,在上一章分析Stream流程的时候,构建Stream传入了一个Spliterator对象,当时只是说它是一个类似迭代器一样的东西。
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) { return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false); } 复制代码
现在我们来仔细看看这个接口是做什么的,先来看接口定义,省略了接口中的常量和内部接口,只留下了方法定义:
public interface Spliterator<T> { //用于遍历单个元素,action是Stream调用终止操作之后包装的sink链 boolean tryAdvance(Consumer<? super T> action); //用于批量遍历元素,action是Stream调用终止操作之后包装的sink链 default void forEachRemaining(Consumer<? super T> action) { do { } while (tryAdvance(action)); } //并行计算的时候拆分Spliterator Spliterator<T> trySplit(); //预估元素的大小 long estimateSize(); //精确获取元素的大小 default long getExactSizeIfKnown() { return (characteristics() & SIZED) == 0 ? -1L : estimateSize(); } int characteristics(); default boolean hasCharacteristics(int characteristics) { return (characteristics() & characteristics) == characteristics; } //获取元素比较器 default Comparator<? super T> getComparator() { throw new IllegalStateException(); } } 复制代码
Spliterator翻译成中文是分离器或者拆分器,接口注释大概的意思是:Spliterator是一个用于遍历和划分源元素的对象,源元素可以来自一个数组、Collection集合、IO Channel或者一个生成器函数。可以通过tryAdvance()方法来遍历源中的单个元素,也可以通过forEachRemaining()方法批量遍历源中的元素。在并行计算中,可以通过trySplit()方法将源中的一些元素拆分为另外的Spliterator。
从注释我们知道Spliterator的主要作用:
现在我们主要来看看方法:
对Spliterator有一个详细的认识之后,我们来看一看如何实现一个Spliterator。
哇,好多实现类,第一时间是不是比较懵逼,这么多子类不知道从何下手?不过从图中可以看出,Spliterator的子类基本上都是某一个集合的内部类,所以我打算选择两个常用子类详细讲解,大家也可以根据这种思路分析其它的。
ArraySpliterator是Spliterators的一个内部类,每次通过一个数组构建Stream时,都会创建相应的ArraySpliterator对象。Arrays#stream()方法的调用流程:
public static <T> Stream<T> stream(T[] array) { return stream(array, 0, array.length); } public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) { return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false); } public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) { return Spliterators.spliterator(array, startInclusive, endExclusive, Spliterator.ORDERED | Spliterator.IMMUTABLE); } 复制代码
Spliterators#spliterator()工厂方法用于创建ArraySpliterator对象:
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex, int additionalCharacteristics) { checkFromToBounds(Objects.requireNonNull(array).length, fromIndex, toIndex); //返回ArraySpliterator对象 return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics); } 复制代码
ArraySpliterator字段分析:
//源数组 private final Object[] array; //当前索引 private int index; // current index, modified on advance/split //终止索引 private final int fence; // one past last index private final int characteristics; 复制代码
ArraySpliterator构造函数
public ArraySpliterator(Object[] array, int additionalCharacteristics) { this(array, 0, array.length, additionalCharacteristics); } public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) { this.array = array; this.index = origin; this.fence = fence; //这里要重点关注一下ArraySpliterator源元素大小是确定的 this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED; } 复制代码
通过构造函数可以看出,默认情况下没有指明origin和fence时,就是从0开始,到数组尾部结束,数组中的元素都算作流元素。
ArraySpliterator方法分析
源码比较简单,直接在代码上用注释说明了,不再单独讲解。
public void forEachRemaining(Consumer<? super T> action) { Object[] a; int i, hi; // hoist accesses and checks from loop //判空 if (action == null) throw new NullPointerException(); //数组长度大于等于fence变量 //index变量大于等于0 //修改index变量且当前下标小于hi if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { //循环消费数组元素 do { action.accept((T)a[i]); } while (++i < hi); } } 复制代码
public boolean tryAdvance(Consumer<? super T> action) { //判空 if (action == null) throw new NullPointerException(); //当前下标大于等于0且小于fence //数组中才有剩余可访问的元素 if (index >= 0 && index < fence) { //取元素,index自增 @SuppressWarnings("unchecked") T e = (T) array[index++]; //消费元素 action.accept(e); return true; } //返回false代表没有源元素了 return false; } 复制代码
//就是fence减去index,代表数组中剩余的元素大小 public long estimateSize() { return (long)(fence - index); } 复制代码
ArrayListSpliterator是ArrayList的内部类,调用ArrayList#stream()方法时会创建这样一个对象。
ArrayList继承自Collection的stream()方法:
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } 复制代码
重写了spliterator()方法,这个方法会返回一个Spliterator对象,可以看到创建的就是ArrayListSpliterator,同时传入了ArrayList自身:
public Spliterator<E> spliterator() { return new ArrayListSpliterator<>(this, 0, -1, 0); } 复制代码
那我们来着重研究下ArrayListSpliterator的源码。
ArrayListSpliterator字段分析
//源元素集合 private final ArrayList<E> list; //当前下标 private int index; // current index, modified on advance/split //结束下标 private int fence; // -1 until used; then one past last index private int expectedModCount; // initialized when fence set 复制代码
ArrayListSpliterator方法分析
public void forEachRemaining(Consumer<? super E> action) { int i, hi, mc; // hoist accesses and checks from loop ArrayList<E> lst; Object[] a; //判空 if (action == null) throw new NullPointerException(); //ArrayList不为空且其中存放元素的数组不能为空,很容易理解 if ((lst = list) != null && (a = lst.elementData) != null) { //一般来说进入这个分支,因为创建ArrayListSpliterator时,fence是-1 if ((hi = fence) < 0) { mc = lst.modCount; //hi就是ArrayList的元素大小 hi = lst.size; } else mc = expectedModCount; //修改index变量 if ((i = index) >= 0 && (index = hi) <= a.length) { for (; i < hi; ++i) { //遍历list中的元素数组,取依次取上面的元素,调用action消费 @SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e); } //校验modCount,不允许方法执行时内部结构发生改变 if (lst.modCount == mc) return; } } throw new ConcurrentModificationException(); } 复制代码
public boolean tryAdvance(Consumer<? super E> action) { //判空 if (action == null) throw new NullPointerException(); //这里hi其实就是ArrayList元素大小 int hi = getFence(), i = index; if (i < hi) { index = i + 1; //取数组中index下标的元素 @SuppressWarnings("unchecked") E e = (E)list.elementData[i]; //调用action消费元素 action.accept(e); //并发修改校验 if (list.modCount != expectedModCount) throw new ConcurrentModificationException(); return true; } //没有剩余的元素,返回false return false; } 复制代码
tryAdvance()中是通过将getFence()的返回值赋值给hi的,进入这个方法看下:
private int getFence() { // initialize fence to size on first use int hi; // (a specialized variant appears in method forEach) ArrayList<E> lst; //首次被调用时会进入,将ArrayList.size赋值给fence,ArrayList.modCount赋值给expectedModCount, //其它时候直接返回fence的值 if ((hi = fence) < 0) { if ((lst = list) == null) hi = fence = 0; else { expectedModCount = lst.modCount; hi = fence = lst.size; } } return hi; } 复制代码
//估计值大小,ArrayList也是元素大小确定的,计算逻辑是ArrayList.size减去当前下标,表示还有多少源元素 public long estimateSize() { return (long) (getFence() - index); } 复制代码
通过前面分析我们知道Stream都是通过StreamSupport这个工具类创建的,传入的Spliterator参数就是上面讲解的Spliterator实现类实例:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); } 复制代码
返回的实际上是一个ReferencePipeline.Head对象,我在上一个章节中有详细的讲解,现在我们再来分析一下加深印象。先看一下它的类继承关系:
Head是ReferencePipeline的内部类,同时又继承了ReferencePipeline,ReferencePipeline是引用类型Stream的抽象,它实现了Stream接口,拥有中间操作和终止操作的能力,ReferencePipeline也继承了AbstractPipeline,这个抽象类上一章节有详细讲解。
Head的构造函数调用ReferencePipeline构造函数:
Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } 复制代码
最终调用AbstractPipeline构造函数,上一章也有详细讲解,这里再回顾一下:
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { //头结点没有前一个节点了 this.previousStage = null; //代表源元素的Spliterator this.sourceSpliterator = source; //sourceStage变量指向自己 this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; //深度为0 this.depth = 0; //标识是不是并行流,默认为false this.parallel = parallel; } 复制代码
总结一下,通过StreamSupport#stream()传入Spliterator创建Stream,实际上返回的都是ReferencePipeline.Head对象,它代表源阶段的Stream,也是Pipeline链表的头结点。
前面分析了Spliterator和创建Stream的过程之后,还有一个疑问:Spliterator是在什么地方被使用到的呢?
先说结论:Spliterator是在Stream调用终止操作的时候触发它的方法调用。其实这很容易理解,因为Stream是惰性流,创建和中间操作的时候什么都不会做,只有终止操作时才调用声明的处理数据的lambda表达式。
看过上一章节的小伙伴应该还记得终止操作都会调用AbstractPipeline#evaluate()方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } 复制代码
经过一系列调用,进入AbstractPipeline#copyInto()方法:
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); //非短路操作 if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { //调用spliterato#getExactSizeIfKnown()获取源元素精确大小,然后传递给sink链表的begin()方法 wrappedSink.begin(spliterator.getExactSizeIfKnown()); //调用spliterator#forEachRemaining()批量遍历源元素 spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } //短路操作 else { copyIntoWithCancel(wrappedSink, spliterator); } } 复制代码
终止操作是非短路操作的,在copyInto()中调用Spliterato#getExactSizeIfKnown()方法,会间接调用Spliterato#estimateSize(),然后调用Spliterato#forEachRemaining()批量遍历源元素。
终止操作是短路操作的,会再调用AbstractPipeline#copyIntoWithCancel()方法:
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this; //取Pipeline链表头节点 while (p.depth > 0) { p = p.previousStage; } wrappedSink.begin(spliterator.getExactSizeIfKnown()); p.forEachWithCancel(spliterator, wrappedSink); wrappedSink.end(); } 复制代码
ReferencePipeline#forEachWithCancel()方法:
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink)); } 复制代码
短路操作在do-while循环中调用tryAdvance()方法遍历Spliterator的单个源元素。
本文首先介绍了Spliterator的含义,详细讲解了每一个方法的作用,然后通过ArraySpliterator和ArrayListSpliterator分析如何实现一个Spliterator接口,再次回顾了代表源Stream的Head类,最后分析了Spliterator方法的调用时机。
Stream的数据处理逻辑都是通过lambda表达式定义的,它是一种声名式编程,与命令式编程不同。命令式编程比如传统的集合遍历迭代很好调试,而Stream很难调试,出了问题很多时候无从下手。通过阅读源码,了解原理,可以帮助我们更容易调试代码,定位问题。
本系列文章是以专栏的形式发布的,上下文多有关联,如果跳着阅读,可能会产生不连贯的感觉,所以建议按照顺序阅读。另外由于是源码分析,跟其他类型文章不同,所以建议在阅读的时候跟着思路亲自动手调试源码。
最后,原创不易,如果觉得本系列文章对您有帮助,能够加深您对Stream原理和源码的理解的话,请不要吝啬您手中的赞(✪ω✪)!
来源:https://juejin.cn/post/7101217470542774308