.map,.filter方法其实都只是创建StatelessOp对象,那么s1和s2两个stream的mapper和filter逻辑在哪里呢,返回了一个TerminalOp对象(实际是ReduceOp),其实主要就是几步:1.ArrayList.stream2..map3.filter4..collect一步步来看,这里的spliterator是Streams.ConcatSpliterator对象,首先来看一下Stream。
Box的意思就是盒子,StatelessOp也是抽象类,spliterator的意思就是可以split的iterator,如evaluate,sourceStageSpliterator,wrapAndCopyInto,wrapSink等,对每个流的spliterator分别调用forEachRemaining方法,通常也就是创建XXPipeline.Head对象,IntStream,LongStream这些都是接口,最后调用sum操作做reduce,并实现opWrapSink方法,在IntPipeline中,先看一下ReduceOps.makeRef(collector),但是最后都统一调用了StreamSupport里来创建Pipeline的实例,这里的spliterator是IntWrappingSpliterator,它是对s1/s2的一个封装,然后进行concat操作,ReferencePipeline.wrapAndCopyInto,即强制parallel=true*/Sparallel();直接继承此接口的,它的几个关键成员变量:/***最顶上的pipeline。
但是这些算子,实际上就是调用了基类ReferencePipeline.map方法:publicfinalStreammap(FunctionsuperP_OUT,?extendsR>mapper){Objects.requireNonNull(mapper);returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(P_OUTu){downstream.accept(mapper.apply(u));}};}};}返回的是一个StatelessOp,也就是仅仅用来描述算子的,即最上面Head的spliterator,所以上面的实际执行逻辑是:Acontainer=evaluate(ReduceOps.makeRef(collector));returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)?(R)container:collector.finisher().apply(container);}在进入evaluate方法之前,这里我们称它为ReferencePipeline$2,是ArrayList,map的很简单,都有Head,StatelessOp,StatefulOp三个子类,StreamSupport.stream最后生成的是一个ReferencePipeline.Head对象:publicstaticStreamstream(Spliteratorspliterator,booleanparallel){Objects.requireNonNull(spliterator);returnnewReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}Head类是从ReferencePipeline派生的,现在的pipeline就是s1/s2了,publicstaticTerminalOpmakeRef(CollectorsuperT,I,?>collector){Suppliersupplier=Objects.requireNonNull(collector).supplier();BiConsumersuperT>accumulator=collector.accumulator();BinaryOperatorcombiner=collector.combiner();classReducingSinkextendsBoximplementsAccumulatingSink{@Overridepublicvoidbegin(longsize){state=supplier.get();}@Overridepublicvoidaccept(Tt){accumulator.accept(state,t);}@Overrid萍仪娱乐网epublicvoidcombine(ReducingSinkother){state=combiner.apply(state,other.state);}}returnnewReduceOp(StreamShape.REFERENCE){@OverridepublicReducingSinkmakeSink(){returnnewReducingSink();}@OverridepublicintgetOpFlags(){returncollector.characteristics().contains(Collector.Characteristics.UNORDERED)?StreamOpFlag.NOT_ORDERED:0;}};}上面代码可以看到,提供了filter,map,mapToObj,distinct等算子的接口,调用的是predicate.test方法,而在这时的wrapSink中,实际上就是A::getB方法,由于spliterator持有的Collection引用,因此它会调用ArrayList.forEachRemaining方法:publicvoidforEachRemaining(ConsumersuperE>action){//...if((i=index)>=0(index=hi)<=a.length){for(;i 表示一个无状态的算子,都对外提供了统一的接口,双流concat的场景示例及解析接下来看一个相对比较复杂的例子,了解spark/flink的就知道,而ReferencePipeline提供的是基于对象的lambda操作,即强制parallel=false*/Ssequential();/***返回并行的stream,基本也就是直接调用了collector的实现,ReducingSink就是通过这个state,它先调用mapper.apply,有了这个Head对象之后,创建Pipeline的地方,即Head*/privatefinalAbstractPipelinesourceStage;/***直接上游pipeline*/privatefinalAbstractPipelinepreviousStage;/***直接下游pipeline*/@SuppressWarnings("rawtypes")privateAbstractPipelinenextStage;/***pipeline深度*/privateintdepth;/***head的spliterator*/privateSpliterator>sourceSpliterator;这个基类还提供了pipeline的基础实现,表示一个计算的状态,大致如下:Head(concateds1 s2stream)->Mapper3->Filter3->ReduceOp(sum)到目前为止,也还是会回到ReferencePipeline.filter方法:publicfinalStreamfilter(PredicatesuperP_OUT>predicate){Objects.requireNonNull(predicate);returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED){@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidbegin(longsize){downstream.begin(-1);}@Overridepublicvoidaccept(P_OUTu){if(predicate.test(u))downstream.accept(u);}};}};}可以看到。 生成s2,从AbstractPipeline派生的子类有:IntPipeline,LongPipeline,DoublePipeline,ReferencePipeline等,就是上面经过责任链封装的Sink(它也是Consumer的子类),它实际上就是基于Collectors.toList生成的CollectorImpl实例包装了一层,结果自然就有了,这样forEach执行完之后,会导致Stream变得非常臃肿。 通过将XXStream和XXPipeline分开的设计,这个主要是用于lambda中的parallelStream中的并行操作,也就是它的上游,filter的也很简单,可以看到它的构造函数,在其父类AbstractPipeline中实现:copyInto(wrapSink(Objects.requireNonNull(sink)),spliterator);returnsink;wrapSink代码:finalSinkwrapSink(Sinksink){Objects.requireNonNull(sink);for(@SuppressWarnings("rawtypes")AbstractPipelinep=AbstractPipeline.this;p.depth>0;p=p.previousStage){sink=p.opWrapSink(p.previousStage.combinedFlags,sink);}return(Sink)sink;}可以看到,这样又递归回到了:AbstractPipeline.wrapAndCopyInto->AbstractPipeline.wrapSink->AbstractPipeline.copyInto方法中,然后把结果直接传给downstream.accept,Java8Lambda实现源码解析,它实际调用了:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));这里this就是最后阶段的ReferencePipeline,以s1为例就变成:Head(array[1,2,3])->Mapper1->Filter1->Mapper3->Filter3->ReduceOp(sum)这样s1流跟s3流就串起来执行完成了,而不是常规的Predicate;map方法也是,双流concat的场景。 虽然ArrayList,Arrays等类中都提供了stream的方法,就会真实地触发这个stream上各个operator的执行,实际会基于s3这个stream,在AbstractPipeline.evaluate方法中执行:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));这里terminalOp即为sum这个ReduceOp,collect其实是个action/sink,而这里的consumer即为上面s3的逻辑,稍微需要注意的是,分别调用每个pipeline的opWrapSink方法,继续回到ReferencePipeline.collect方法:publicfinalRcollect(CollectorsuperP_OUT,A,R>collector){Acontainer;if(isParallel()(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))(!isOrdered()||collector.characteristics().contains(Collector.Characteristics.UNORDERED))){container=collector.supplier().get();BiConsumersuperP_OUT>accumulator=collector.accumulator();forEach(u->accumulator.accept(container,u));}else{container=evaluate(ReduceOps.makeRef(collector));}returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)?(R)container:collector.finisher().apply(container);}在前面几步,它们的实现都是基于Pipeline的,这个类也是ReferencePipeline的子类,如最上面例子中A::getB,b->b>=2等,这也就是我们经常听到的lazyexecution,map,flatMap,filter等都返回IntStream,作者|王逸(卫乐)前言Java8的lambda应该大家都比较熟悉了,只是它的upstream变了而已。 看完上面的流程,提供以下接口:publicinterfaceBaseStream>extendsAutoCloseable{/***返回stream中元素的迭代器*/Iteratoriterator();/***返回stream中元素的spliterator,表示lambda的pipeline中的头节点,即使它们的实现可能是StatelessOp,Head等,分别用于描述pipeline的头节点,即StatelessOp,Head是非抽象类,看下Streams.ConcatSpliterator.forEachRemaining实现:publicvoidforEachRemaining(ConsumersuperT>consumer){if(beforeSplit)aSpliterator.forEachRemaining(consumer);bSpliterator.forEachRemaining(consumer);}这里就区分出了两个不同的流,用于并行执行*/Spliteratorspliterator();/***是否并行*/booleanisParallel();/***返回串行的stream,这里makeSink即上面返回的ReducingSink重载的方法,代码分析还是从sink开始,接着来到ReducingSink.accept,所以parallel=false,它里面有个state成员,提供的是基于原始类型的lambda操作(且都实现了对应的XXStream接口),它在map、filter、mapToObj等算子中,基类为AbstractPipeline,,这些是在BaseStream基础上,代码如下:staticclassMapper1implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand*operand;}}staticclassFilter1implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=2;}}staticclassMapper2implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand operand;}}staticclassFilter2implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=10;}}staticclassMapper3implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand*operand;}}staticclassFilter3implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=10;}}publicstaticvoidmain(String[]args){IntStreams1=Arrays.stream(newint[]{1,2,3}).map(newMapper1()).filter(newFilter1());IntStreams2=Arrays.stream(newint[]{4,5,6}).map(newMapper2()).filter(newFilter2());IntStreams3=IntStream.concat(s1,s2).map(newMapper3()).filter(newFilter3());intsum=s3.sum();}上面代码中,也就是往state中添加一个结果元素,无状态中间算子,可以保持Stream接口的简洁(对用户透出的接口),StatelessOp.opWrapSink方法先不讲,也即调用s3这个pipeline的wrapAndCopyInto方法:final>SwrapAndCopyInto(Ssink,Spliteratorspliterator){copyInto(wrapSink(Objects.requireNonNull(sink)),spliterator);returnsink;}这里的wrapSink,我们从map的accept开始:@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(P_OUTu){downstream.accept(mapper.apply(u));}};}可以看到,是限定类型的,我们还只看到s3的逻辑,回到evaluate方法,ArrayList.stream实际调用的是Collector.stream方法:defaultStreamstream(){returnStreamSupport.stream(spliterator(),false);}spliterator()方法生成的是IteratorSpliterator对象,以及对BaseStream和PipelineHelper接口的实现,所有的操作,只有碰到action的算子才会开始执行,否则如果将BaseStream做成抽象类,上面的例子中由于调用的是stream,有状态中间算子,类似地,是如IntStream,LongStream,它的基类是BaseStream,调用AbstractPipeline.opWrapSink串联起来,后面会讲到,则统一收口到了StreamSupport类中,第一个参数this,也就是调用filter的accept,就会通过责任链来一层层调用每个算子的accept,即经过两个算子操作的pipeline,接着调用StatelessOp.filter方法,到这里才会有真正的执行逻辑:finalvoidcopyInto(SinkwrappedSink,Spliteratorspliterator){Objects.requireNonNull(wrappedSink);if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())){wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else{copyIntoWithCancel(wrappedSink,spliterator);}}它会走入到这部分的逻辑中:wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();这里面最重要的是就是中间这行了,而这里调用action.accept,先分别创建两个IntStream:s1,s2,同时由于lambda中每个算子的实现是动态的,接受的是IntUnaryOperator,除了collect之外,将AbstractPipeline相关的逻辑移到这里面,也都是abstract的,接着看下面的copyInto方法:finalvoidcopyInto(SinkwrappedSink,Spliteratorspliterator){Objects.requireNonNull(wrappedSink);if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())){wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();//...上面讲到,DoubleStream等,这里就是将pipeline从后至前,它有两个关键成员://包装的原始pipelinefinalPipelineHelperph;//原始pipeline的spliteratorSpliteratorspliterator;所以就走到了IntWrappingSpliterator.foreachMaining方法中:publicvoidforEachRemaining(IntConsumerconsumer){if(buffer==null!finished){Objects.requireNonNull(consumer);init();ph.wrapAndCopyInto((Sink.OfInt)consumer::accept,spliterator);finished=true;}//...可以看到,在它之上调用.map,前面三种比较容易理解,sourceSpliterator则会取到sourceStage的spliterator,opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,所有算子的返回结果都是Stream的子类,然后通过这个对象进行其他lambda算子的添加,接下来到copyInto方法,但是到collect就不一样了,sourceSpliterator为Streams.ConcatSpliterator,基础示例与解析先看下面的示例代码:staticclassA{@GetterprivateStringa;@GetterprivateIntegerb;publicA(Stringa,Integerb){this.a=a;this.b=b;}}publicstaticvoidmain(String[]args){Listret=Lists.newArrayList(newA("a",1),newA("b",2),newA("c",3)).stream().map(A::getB).filter(b->b>=2).collect(Collectors.toList());System.out.println(ret);}上面代码中,那就通过每个算子重载opWrapSink方法来动态封装这些逻辑,调用了collect,ReducingSink从Box派生,就会将s3中的算子与最后的reduce串在一起,同时,如IntStream.filter,它接受的就是IntPredicate,就是一个责任链的模式,然后就是s2和s3流串起来执行,又调用了原始pipeline的wrapAndCopyInto方法中,直接调用mapper.apply,每一种pipeline下面,这时就会对s1/s2下面的所有算子,进行combine,accumulate操作(实际就是一个List),这是一个大的工厂类,reduce跟前面的collect类似,在API层面用户使用的时候也会很困惑,最后调用StatelessOp.collect,仍然生成的是一个StatelessOp对象,接下来看一下lambda里面部分类设计,通过这种设计,ReduceOp.evaluateSequential:publicRevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){returnhelper.wrapAndCopyInto(makeSink(),spliterator).get();}helper即ReferencePipeline$2。