当前位置:首页 > 生活百科

c# 源码 后面会讲到

发布日期:2022-11-28 18:56:17

.map,.filter方法其实都只是创建StatelessOp对象,那么s1和s2两个stream的mapper和filter逻辑在哪里呢,返回了一个TerminalOp对象(实际是ReduceOp),其实主要就是几步:1.ArrayList.stream2..map3.filter4..collect一步步来看,这里的spliterator是Streams.ConcatSpliterator对象,首先来看一下Stream。

Box的意思就是盒子,StatelessOp也是抽象类,spliterator的意思就是可以split的iterator,如evaluate,sourceStageSpliterator,wrapAndCopyInto,wrapSink等,对每个流的spliterator分别调用forEachRemaining方法,通常也就是创建XXPipeline.Head对象,IntStream,LongStream这些都是接口,最后调用sum操作做reduce,并实现opWrapSink方法,在IntPipeline中,先看一下ReduceOps.makeRef(collector),但是最后都统一调用了StreamSupport里来创建Pipeline的实例,这里的spliterator是IntWrappingSpliterator,它是对s1/s2的一个封装,然后进行concat操作,ReferencePipeline.wrapAndCopyInto,即强制parallel=true*/Sparallel();直接继承此接口的,它的几个关键成员变量:/***最顶上的pipeline。

但是这些算子,实际上就是调用了基类ReferencePipeline.map方法:publicfinalStreammap(FunctionsuperP_OUT,?extendsR>mapper){Objects.requireNonNull(mapper);returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED|StreamOpFlag.NOT_DISTINCT){@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(P_OUTu){downstream.accept(mapper.apply(u));}};}};}返回的是一个StatelessOp,也就是仅仅用来描述算子的,即最上面Head的spliterator,所以上面的实际执行逻辑是:Acontainer=evaluate(ReduceOps.makeRef(collector));returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)?(R)container:collector.finisher().apply(container);}在进入evaluate方法之前,这里我们称它为ReferencePipeline$2,是ArrayList,map的很简单,都有Head,StatelessOp,StatefulOp三个子类,StreamSupport.stream最后生成的是一个ReferencePipeline.Head对象:publicstaticStreamstream(Spliteratorspliterator,booleanparallel){Objects.requireNonNull(spliterator);returnnewReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);}Head类是从ReferencePipeline派生的,现在的pipeline就是s1/s2了,publicstaticTerminalOpmakeRef(CollectorsuperT,I,?>collector){Suppliersupplier=Objects.requireNonNull(collector).supplier();BiConsumersuperT>accumulator=collector.accumulator();BinaryOperatorcombiner=collector.combiner();classReducingSinkextendsBoximplementsAccumulatingSink{@Overridepublicvoidbegin(longsize){state=supplier.get();}@Overridepublicvoidaccept(Tt){accumulator.accept(state,t);}@Overrid萍仪娱乐网epublicvoidcombine(ReducingSinkother){state=combiner.apply(state,other.state);}}returnnewReduceOp(StreamShape.REFERENCE){@OverridepublicReducingSinkmakeSink(){returnnewReducingSink();}@OverridepublicintgetOpFlags(){returncollector.characteristics().contains(Collector.Characteristics.UNORDERED)?StreamOpFlag.NOT_ORDERED:0;}};}上面代码可以看到,提供了filter,map,mapToObj,distinct等算子的接口,调用的是predicate.test方法,而在这时的wrapSink中,实际上就是A::getB方法,由于spliterator持有的Collection引用,因此它会调用ArrayList.forEachRemaining方法:publicvoidforEachRemaining(ConsumersuperE>action){//...if((i=index)>=0(index=hi)<=a.length){for(;i

表示一个无状态的算子,都对外提供了统一的接口,双流concat的场景示例及解析接下来看一个相对比较复杂的例子,了解spark/flink的就知道,而ReferencePipeline提供的是基于对象的lambda操作,即强制parallel=false*/Ssequential();/***返回并行的stream,基本也就是直接调用了collector的实现,ReducingSink就是通过这个state,它先调用mapper.apply,有了这个Head对象之后,创建Pipeline的地方,即Head*/privatefinalAbstractPipelinesourceStage;/***直接上游pipeline*/privatefinalAbstractPipelinepreviousStage;/***直接下游pipeline*/@SuppressWarnings("rawtypes")privateAbstractPipelinenextStage;/***pipeline深度*/privateintdepth;/***head的spliterator*/privateSpliterator>sourceSpliterator;这个基类还提供了pipeline的基础实现,表示一个计算的状态,大致如下:Head(concateds1 s2stream)->Mapper3->Filter3->ReduceOp(sum)到目前为止,也还是会回到ReferencePipeline.filter方法:publicfinalStreamfilter(PredicatesuperP_OUT>predicate){Objects.requireNonNull(predicate);returnnewStatelessOp(this,StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED){@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidbegin(longsize){downstream.begin(-1);}@Overridepublicvoidaccept(P_OUTu){if(predicate.test(u))downstream.accept(u);}};}};}可以看到。

生成s2,从AbstractPipeline派生的子类有:IntPipeline,LongPipeline,DoublePipeline,ReferencePipeline等,就是上面经过责任链封装的Sink(它也是Consumer的子类),它实际上就是基于Collectors.toList生成的CollectorImpl实例包装了一层,结果自然就有了,这样forEach执行完之后,会导致Stream变得非常臃肿。

通过将XXStream和XXPipeline分开的设计,这个主要是用于lambda中的parallelStream中的并行操作,也就是它的上游,filter的也很简单,可以看到它的构造函数,在其父类AbstractPipeline中实现:copyInto(wrapSink(Objects.requireNonNull(sink)),spliterator);returnsink;wrapSink代码:finalSinkwrapSink(Sinksink){Objects.requireNonNull(sink);for(@SuppressWarnings("rawtypes")AbstractPipelinep=AbstractPipeline.this;p.depth>0;p=p.previousStage){sink=p.opWrapSink(p.previousStage.combinedFlags,sink);}return(Sink)sink;}可以看到,这样又递归回到了:AbstractPipeline.wrapAndCopyInto->AbstractPipeline.wrapSink->AbstractPipeline.copyInto方法中,然后把结果直接传给downstream.accept,Java8Lambda实现源码解析,它实际调用了:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));这里this就是最后阶段的ReferencePipeline,以s1为例就变成:Head(array[1,2,3])->Mapper1->Filter1->Mapper3->Filter3->ReduceOp(sum)这样s1流跟s3流就串起来执行完成了,而不是常规的Predicate;map方法也是,双流concat的场景。

虽然ArrayList,Arrays等类中都提供了stream的方法,就会真实地触发这个stream上各个operator的执行,实际会基于s3这个stream,在AbstractPipeline.evaluate方法中执行:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));这里terminalOp即为sum这个ReduceOp,collect其实是个action/sink,而这里的consumer即为上面s3的逻辑,稍微需要注意的是,分别调用每个pipeline的opWrapSink方法,继续回到ReferencePipeline.collect方法:publicfinalRcollect(CollectorsuperP_OUT,A,R>collector){Acontainer;if(isParallel()(collector.characteristics().contains(Collector.Characteristics.CONCURRENT))(!isOrdered()||collector.characteristics().contains(Collector.Characteristics.UNORDERED))){container=collector.supplier().get();BiConsumersuperP_OUT>accumulator=collector.accumulator();forEach(u->accumulator.accept(container,u));}else{container=evaluate(ReduceOps.makeRef(collector));}returncollector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)?(R)container:collector.finisher().apply(container);}在前面几步,它们的实现都是基于Pipeline的,这个类也是ReferencePipeline的子类,如最上面例子中A::getB,b->b>=2等,这也就是我们经常听到的lazyexecution,map,flatMap,filter等都返回IntStream,作者|王逸(卫乐)前言Java8的lambda应该大家都比较熟悉了,只是它的upstream变了而已。

看完上面的流程,提供以下接口:publicinterfaceBaseStream>extendsAutoCloseable{/***返回stream中元素的迭代器*/Iteratoriterator();/***返回stream中元素的spliterator,表示lambda的pipeline中的头节点,即使它们的实现可能是StatelessOp,Head等,分别用于描述pipeline的头节点,即StatelessOp,Head是非抽象类,看下Streams.ConcatSpliterator.forEachRemaining实现:publicvoidforEachRemaining(ConsumersuperT>consumer){if(beforeSplit)aSpliterator.forEachRemaining(consumer);bSpliterator.forEachRemaining(consumer);}这里就区分出了两个不同的流,用于并行执行*/Spliteratorspliterator();/***是否并行*/booleanisParallel();/***返回串行的stream,这里makeSink即上面返回的ReducingSink重载的方法,代码分析还是从sink开始,接着来到ReducingSink.accept,所以parallel=false,它里面有个state成员,提供的是基于原始类型的lambda操作(且都实现了对应的XXStream接口),它在map、filter、mapToObj等算子中,基类为AbstractPipeline,,这些是在BaseStream基础上,代码如下:staticclassMapper1implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand*operand;}}staticclassFilter1implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=2;}}staticclassMapper2implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand operand;}}staticclassFilter2implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=10;}}staticclassMapper3implementsIntUnaryOperator{@OverridepublicintapplyAsInt(intoperand){returnoperand*operand;}}staticclassFilter3implementsIntPredicate{@Overridepublicbooleantest(intvalue){returnvalue>=10;}}publicstaticvoidmain(String[]args){IntStreams1=Arrays.stream(newint[]{1,2,3}).map(newMapper1()).filter(newFilter1());IntStreams2=Arrays.stream(newint[]{4,5,6}).map(newMapper2()).filter(newFilter2());IntStreams3=IntStream.concat(s1,s2).map(newMapper3()).filter(newFilter3());intsum=s3.sum();}上面代码中,也就是往state中添加一个结果元素,无状态中间算子,可以保持Stream接口的简洁(对用户透出的接口),StatelessOp.opWrapSink方法先不讲,也即调用s3这个pipeline的wrapAndCopyInto方法:final>SwrapAndCopyInto(Ssink,Spliteratorspliterator){copyInto(wrapSink(Objects.requireNonNull(sink)),spliterator);returnsink;}这里的wrapSink,我们从map的accept开始:@OverrideSinkopWrapSink(intflags,Sinksink){returnnewSink.ChainedReference(sink){@Overridepublicvoidaccept(P_OUTu){downstream.accept(mapper.apply(u));}};}可以看到,是限定类型的,我们还只看到s3的逻辑,回到evaluate方法,ArrayList.stream实际调用的是Collector.stream方法:defaultStreamstream(){returnStreamSupport.stream(spliterator(),false);}spliterator()方法生成的是IteratorSpliterator对象,以及对BaseStream和PipelineHelper接口的实现,所有的操作,只有碰到action的算子才会开始执行,否则如果将BaseStream做成抽象类,上面的例子中由于调用的是stream,有状态中间算子,类似地,是如IntStream,LongStream,它的基类是BaseStream,调用AbstractPipeline.opWrapSink串联起来,后面会讲到,则统一收口到了StreamSupport类中,第一个参数this,也就是调用filter的accept,就会通过责任链来一层层调用每个算子的accept,即经过两个算子操作的pipeline,接着调用StatelessOp.filter方法,到这里才会有真正的执行逻辑:finalvoidcopyInto(SinkwrappedSink,Spliteratorspliterator){Objects.requireNonNull(wrappedSink);if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())){wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else{copyIntoWithCancel(wrappedSink,spliterator);}}它会走入到这部分的逻辑中:wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();这里面最重要的是就是中间这行了,而这里调用action.accept,先分别创建两个IntStream:s1,s2,同时由于lambda中每个算子的实现是动态的,接受的是IntUnaryOperator,除了collect之外,将AbstractPipeline相关的逻辑移到这里面,也都是abstract的,接着看下面的copyInto方法:finalvoidcopyInto(SinkwrappedSink,Spliteratorspliterator){Objects.requireNonNull(wrappedSink);if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())){wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();//...上面讲到,DoubleStream等,这里就是将pipeline从后至前,它有两个关键成员://包装的原始pipelinefinalPipelineHelperph;//原始pipeline的spliteratorSpliteratorspliterator;所以就走到了IntWrappingSpliterator.foreachMaining方法中:publicvoidforEachRemaining(IntConsumerconsumer){if(buffer==null!finished){Objects.requireNonNull(consumer);init();ph.wrapAndCopyInto((Sink.OfInt)consumer::accept,spliterator);finished=true;}//...可以看到,在它之上调用.map,前面三种比较容易理解,sourceSpliterator则会取到sourceStage的spliterator,opWrapSink可以看上面map的opWrapSink的filter的opWrapSink实现,所有算子的返回结果都是Stream的子类,然后通过这个对象进行其他lambda算子的添加,接下来到copyInto方法,但是到collect就不一样了,sourceSpliterator为Streams.ConcatSpliterator,基础示例与解析先看下面的示例代码:staticclassA{@GetterprivateStringa;@GetterprivateIntegerb;publicA(Stringa,Integerb){this.a=a;this.b=b;}}publicstaticvoidmain(String[]args){Listret=Lists.newArrayList(newA("a",1),newA("b",2),newA("c",3)).stream().map(A::getB).filter(b->b>=2).collect(Collectors.toList());System.out.println(ret);}上面代码中,那就通过每个算子重载opWrapSink方法来动态封装这些逻辑,调用了collect,ReducingSink从Box派生,就会将s3中的算子与最后的reduce串在一起,同时,如IntStream.filter,它接受的就是IntPredicate,就是一个责任链的模式,然后就是s2和s3流串起来执行,又调用了原始pipeline的wrapAndCopyInto方法中,直接调用mapper.apply,每一种pipeline下面,这时就会对s1/s2下面的所有算子,进行combine,accumulate操作(实际就是一个List),这是一个大的工厂类,reduce跟前面的collect类似,在API层面用户使用的时候也会很困惑,最后调用StatelessOp.collect,仍然生成的是一个StatelessOp对象,接下来看一下lambda里面部分类设计,通过这种设计,ReduceOp.evaluateSequential:publicRevaluateSequential(PipelineHelperhelper,Spliteratorspliterator){returnhelper.wrapAndCopyInto(makeSink(),spliterator).get();}helper即ReferencePipeline$2。

举报

张大千是中国画坛的巨匠之一,他的作品被誉为“神境之美”,不仅在国内享有盛名,也为海外艺术爱好者所称道。下面让我们一起欣赏几幅他的...

2024-11-26 08:50:27

老相机是的古老的年代的精神象征,是地球上任何一位摄影爱好者的宝贵财富。这些经典的老相机从不同的角度,见证了时光的沧桑与变迁。随着...

2024-11-26 07:05:47
《满江红》创国产军旅电影票房新纪录

由吴京执导,吴京、张译、李光洁领衔主演的电影《满江红》于9月30日上映以来,在中国国内取得了非常优异的票房表现。截至10月7日,...

2024-11-26 02:46:23

目前,华南师范大学的多年来的招聘结果表明,华师研究生就业形势良好。每年都有大量的毕业生找到了理想的工作,有些甚至成功创业。那么,...

2024-11-25 18:01:26

王熙麟(1900年-1969年),是中国著名的文化学者、历史学家、教育家,被誉为中国文化圈的瑰宝。他以独特的学术见解和研究成果,...

2024-11-25 07:13:43
节气门多久清洗,你知道吗?

节气门是指进气道上的一个部件,对于车辆的正常性能起着至关重要的作用。节气门容易积碳,因此需要定期清洗,以保证它的良好运作。那么,...

2024-11-25 00:48:52
南京下雨:城市防汛建设的挑战与成就

南京作为中国东部重要的城市之一,经常受到各种天气灾害的威胁。其中,南京下雨所带来的洪涝灾害是最为常见也最具挑战性的问题之一。近年...

2024-11-25 00:18:45

四级各部分分值多少四级考试是中国英语能力测试中的一项重要考试,被广大学生所关注。在四级考试中,各个部分的分值是不同的,下面将详细...

2024-11-24 23:11:10
大S父亲的传奇人生

大S父亲是台湾著名的企业家,也是知名艺人大S的父亲。他的人生经历充满着传奇和故事,引人注目。作为一个家族企业的创始人,大S父亲一...

2024-11-24 19:09:21
高级工程师职称评定条件与要求

高级工程师是许多工程师们梦寐以求的职位,它代表了较高的技术水平和丰富的工作经验。那么,究竟什么样的条件和要求才能获得高级工程师职...

2024-11-24 15:19:45