鸿 网 互 联 www.68idc.cn

当前位置 : 服务器租用 > 编程语言开发 > erlang > >

RxJava1.0 flatMap方法的源码分析

来源:互联网 作者:佚名 时间:2016-07-17 21:15
RxJava1.0 flatMap方法的源码分析 如果大家想看map的请转到RxJava map转换方法的源码分析 package com.yue.test;import java.awt.Cursor;import java.util.ArrayList;import java.util.List;import com.yue.bean.Course;import com.yue.bean.Student;import

RxJava1.0 flatMap方法的源码分析

  • 如果大家想看map的请转到RxJava map转换方法的源码分析
  • package com.yue.test;
    
    import java.awt.Cursor;
    import java.util.ArrayList;
    import java.util.List;
    
    import com.yue.bean.Course;
    import com.yue.bean.Student;
    
    import rx.Observable;
    import rx.Subscription;
    import rx.Observable.OnSubscribe;
    import rx.Observable.Operator;
    import rx.exceptions.Exceptions;
    import rx.exceptions.OnErrorFailedException;
    import rx.exceptions.OnErrorThrowable;
    import rx.functions.Func1;
    import rx.internal.operators.OnSubscribeLift;
    import rx.internal.operators.OperatorMerge;
    import rx.internal.operators.OnSubscribeMap.MapSubscriber;
    import rx.internal.operators.OperatorMerge.InnerSubscriber;
    import rx.internal.util.ScalarSynchronousObservable;
    import rx.internal.util.UtilityFunctions;
    import rx.plugins.RxJavaHooks;
    import rx.plugins.RxJavaPlugins;
    import rx.subscriptions.Subscriptions;
    import rx.Subscriber;
    
    /**
     * flatmap 一对多的转换
     * 
     * RxJava的其他方式的观察者模式的实现都和普通的一样,所以楼主用最普通的方式实现实例 ,而没有用just、from 或ActionX等方式
     * 
     * @ClassName: RxText3
     * @Description: TODO
     * @author shimingyue
     * @date 2016-7-14 上午10:25:57
     * 
     */
    public class RxText3 {
    
    	public static void main(String[] args) {
    		RxText3 rxText3 = new RxText3();
    		rxText3.method3();
    	}
    
    	/**
    	 * method1
    	 */
    	private void method1() {
    		final Integer[] student = { 110, 120, 119 };
    		/**
    		 * 泛型为String的观察者
    		 */
    		Subscriber<String> subscriber = new Subscriber<String>() {
    
    			@Override
    			public void onNext(String t) {
    				System.out.println(t);
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 泛型为Integer的源被观察者
    		 */
    		Observable<Integer> observable = Observable
    				.create(new OnSubscribe<Integer>() {
    
    					@Override
    					public void call(Subscriber<? super Integer> subscriber) {
    						// 这里和from的效果是一样的 Observable.from()
    						subscriber.onNext(student[0]);
    						subscriber.onNext(student[1]);
    						subscriber.onNext(student[2]);
    						subscriber.onCompleted();
    					}
    				});
    		/**
    		 * 转换后的被观察者observable2 泛型为String ,我们要在这里进行类型
    		 */
    		Observable<String> observable2 = observable
    				.map(new Func1<Integer, String>() {
    
    					@Override
    					public String call(Integer t) {
    						System.out.println("funcx 源被传的值:" + t);
    						return "转换后:" + t;
    					}
    				});
    		observable2.subscribe(subscriber);
    	}
    
    	/**
    	 * 使用map和for循环 一对一 打印
    	 */
    	private void method2() {
    		final Student[] students = getParams();
    		/**
    		 * 定义一个观察者
    		 */
    		Subscriber<Student> subscriber = new Subscriber<Student>() {
    
    			@Override
    			public void onNext(Student student) {
    				// 接收被观察者的消息通知,被观察者会传过来student这个值
    				System.out.println("--------------------");
    				System.out.println(student.getName());
    				List<Course> courses = student.getCourses();
    				for (Course course : courses) {
    					System.out.println(course.getName());
    				}
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 定义一个被观察者
    		 */
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    
    					/**
    					 * 这个方法将在订阅时被调用,
    					 */
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    
    		// 观察者订阅被观察者
    		observable.subscribe(subscriber);
    	}
    
    	/**
    	 * 上面的方法(method1)是用for循环逐个打印每个学生对应的书籍的
    	 */
    
    	private void method3() {
    		final Student[] students = getParams();
    		/**
    		 * 创建一个观察者 泛型类型为Course
    		 */
    		Subscriber<Course> subscriber = new Subscriber<Course>() {
    
    			@Override
    			public void onNext(Course course) {
    				System.out.println(course.getName());
    			}
    
    			@Override
    			public void onError(Throwable e) {
    			}
    
    			@Override
    			public void onCompleted() {
    			}
    		};
    
    		/**
    		 * 创建一个观察者 泛型类型为Student
    		 */
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    					//自己写的
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    
    		/**
    		 * 使用flatmap进行类型转换 将泛型为Student的被观察者转换为泛型为Course
    		 */
    		Observable<Course> observable2 = observable
    				.flatMap(new Func1<Student, Observable<? extends Course>>() {
    
    					@Override
    					public Observable<? extends Course> call(Student student) {
    						System.out.println("------------------");
    						System.out.println(student.getName());
    						return Observable.from(student.getCourses());
    					}
    				});
    		// 两个不同类型的观察者和被观察者定义好啦
    		observable2.subscribe(subscriber);
    	}
    
    	/**
    	 * flatMap源码分析
    	 * 
    	 * 首先打开flatMap 方法
    	 *  public final <R> Observable<R> flatMap(Func1<? super T, ?
    	 * extends Observable<? extends R>> func) { if (getClass() ==
    	 * ScalarSynchronousObservable.class) { return
    	 * ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); } return
    	 * merge(map(func)); }
    	 * 
    	 * 可以看到一个merge方法里有一个map方法的执行,map方法我们前面分析过,但是里面的FuncX有所不同,
    	 * 它的call方法里返回的是一个Observable被观察者对象,所以我们要重新审视一下map这个方法。
    	 * 
    	 * 原来正常的map 如果观察者订阅被观察者导致执行的话
    	 * 我们知道一起方法的执行都是起源于subscribe订阅方法的
    	 * 而subscribe最后又会执行被观察者的OnSubscribe里的call(Subscriber)方法,
    	 * call里又执行观察者的onNext方法进行通知,而在map转换里面,问题就出在OnSubscribe.call
    	 * 里啦。因为经过map转换后会产生一个新的被观察者,而这个新的被观察者会持有一个新的OnSubscribe
    	 * 监听实例OnSubscribeMap[继承于OnSubscribe],
    	 * 所以在我们的观察者订阅被观察者的时候,在被观察者的订阅方法subscribe里执行的OnSubscribe.call
    	 * 实际上是OnSubscribeMap的call方法,在这个call方法里会初始化一个观察者的实例MapSubscriber,
    	 * 它持有源被观察者和funcx,然后兜兜圈圈又神奇的执行到源Observable的OnSubscribe实例的call方法
    	 * ,然而此时这个源call方法里subscriber对象俨然变成了MapSubscriber,所以后续执行的是MapSubscriber
    	 * 的onNext方法,进行转换
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 创建
    	 * 
    	 * flatMap方法,上面的map是经过了执行的,不执行其实就是上面的新观察者的创建过程
    	 * 
    	 * 我们知道map返回了一个上述的被观察者,如果不使用flatMap的话会按照套路走,那使用flatmap后呢,
    	 * 我们看到它吧map生成的被观察者对象放入了一个叫merge的方法,进去看看
    	 * 
    	 *     @SuppressWarnings({"unchecked", "rawtypes"})
        public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
            if (source.getClass() == ScalarSynchronousObservable.class) {
                return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
            }
            /**
             * 含有使用源被观察者和FuncX实例的新被观察者source
             * OperatorMerge:Operator:经营者,操作者 Merge:合并
             * 他创建了一个合并的类,但这个类合并的是什么暂时不知道
             * 但可以到我们新的被观察者的lift里去看看
            return source.lift(OperatorMerge.<T>instance(false));
        }
    	 * 
    	 * 我们看到执行lift后又反悔了一个Observable,说明在经过map转换后又进行了一次调整
    	 * 
    	 * public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
    	 * 在看看create方法 OnSubscribeLift,这是什么鬼,
    	 * 可以看出这也是一个OnSubscribe的子类,和OnSubscribeMap一样,看看声明时里面传的参数
    	 * onSubscribe:因为传入lift的是一个经过map转换的新被观察者对象[提示merge],所以这个参数是OnSubscribeMap监听实例
         * operator:暂时不知道什么作用的一个对象
         * 到这里生成了一个持有新被观察者OnSubscribeMap和一个operator参数的OnSubscribeLift监听实例
         * 
         * 继续向下,
         *     public static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(RxJavaHooks.onCreate(f));
        }
    	 *
    	 * 果然,OnSubscribeLift被用来创建一个2级的新的被观察者,这样源被观察者被动了两次手术,但有一个手术刀使我们不知道怎么用的,就是operator
    	 * 
    	 * 这个参数: 没关系我们可以百度:
    	 * http://www.tuicool.com/articles/VrQRvur
    	 * Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber
    	 * 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。
    	 * Observable生产事件,Operator修改事件,最后由Subscriber去及时响应事件,现在我们知道这个参数是干啥的啦,就可以继续往下看啦
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 
    	 * 执行
    	 * 
    	 * 
    	 * 创建完啦,转换也完啦,就剩执行啦,拉莫还是回头看subscribe()订阅方法
    	 * 我们知道在里面执行了 OnSubscribe.call方法,那这个OnSubscribe是啥呢。
    	 * 没有错,他就是我们flatMap转换后返回的Observable实例,经过上面的分析,我们知道flatMap返回的实例是一个
    	 * 含有OnSubscribeLift监听实例的3手的被观察者,
    	 * 所以会执行OnSubscribeLift.call(),进去看看做了什么
    	 * 
    	 *     @Override
        public void call(Subscriber<? super R> o) {
            try {
            	创建一个新的观察者,还记得map的MapSubscriber吗,它也是个新的观察者,基于源观察者o的,在map里会兜兜圈圈执行这个新观察者
            	onNext,MapSubscriber持有FuncX对象和源观察者。
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    // 看这里
                    st.onStart();
                    parent.call(st);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                o.onError(e);
            }
        }
    	 * 
    	 * 其实走到这里我们应该捋一捋
    	 * 1.我们首先使用map创建了一个新的被观察者,他持有OnSubscribeMap:他持有源被观察者和FuncX
    	 * 2.然后我们用又使用这个新的被观察者创建了一个3手的被观察者,而它持有一个OnSubscribeLift:它持有OnSubscribeMap和一个operator[事件操作者]
    	 * 4.然后我们调用3手被观察者的OnSubscribeLift.call()方法,到这里OnSubscribeLift又持有一个观察者对象
    	 * 
    	 * 好啦,到了关键时候啦,我们看到OnSubscribeLift只有几行重要的代码
    	 * 先看新观察者的创建过程,回顾一下前面的operator:在merge中  return source.lift(OperatorMerge.<T>instance(false));
    	 * instance 里的返回 T是源被观察者的泛型
    	 * static final OperatorMerge<Object> INSTANCE = new OperatorMerge<Object>(false, Integer.MAX_VALUE);
    	 * operator原来是这个样子的,都快忘啦。
    	 * 
    	 * 好,回顾完啦。我们看RxJavaHooks.onObservableLift(operator);这一句做了什么
    	 * 它传进去了一个operator,那是我们的operator,它用它做了什么呢,其实很前面一样,啥都没做,
    	 * 就是判断了一下,然后就将这个operator又原路返回啦,源码如下:
    	 * @SuppressWarnings({ "rawtypes", "unchecked" })
        public static <T, R> Operator<R, T> onObservableLift(Operator<R, T> operator) {
            Func1<Operator, Operator> f = onObservableLift;
            if (f != null) {
                return f.call(operator);
            }
            return operator;
        }
    	 *
    	 * 
    	 * onObservableLift = new Func1<Operator, Operator>() {
                @Override
                public Operator call(Operator t) {
                    return RxJavaPlugins.getInstance().getObservableExecutionHook().onLift(t);
                }
            };
            
            所以Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            这儿调用的call方法是我们前面OperatorMerge的call方法,进去看看
    	 * 
    	 * 
    	 * maxConcurrent:Integer.MAX_VALUE 65535
    	 * delayErrors:延迟错误,传过来的是false OperatorMerge.<T>instance(false)
    	 * child:源观察者
    	 *    @Override
        public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
            MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
            MergeProducer<T> producer = new MergeProducer<T>(subscriber);
            subscriber.producer = producer;
            
            //这两句楼主也没明白啥意思,有知道的请告知哦
            child.add(subscriber);
            child.setProducer(producer);
            
            return subscriber;
        }
    
    	 *
    	 *可以看出创建了一个MergeSubscriber新观察者,它持有上面三个参数,是不是和OnSubscribeMap里的内部类MapSubscriber差不多,
    	 *没错,他也是一个内部类
    	 *可以看到除了一个MergeSubscriber对象外,还有一个MergeProducer对象,他把MergeSubscriber传进了构造方法,到这里,我们的
    	 *源观察者放进了MergeSubscriber,而MergeSubscriber放进了MergeProducer,仅仅是放了进去,啥也没有做呢,再向下看
    	 *subscriber.producer = producer;将新创建的MergeProducer赋值。再往下看。又有add方法,与上面一样将我们的新观察者放进
    	 *了list里面,还不知道有什么卵用,那就看下一句,仅仅设置了一下setProducer,最后返回了这个新创建的观察者,
    	 *
    	 *ok,获得了观察者,在回头看OnSubscribeLift的call方法,它执行了新观察者的MergeSubscriber的onStart方法,其实MergeSubscriber
    	 *是没有onStart方法的,所以,执行的是父类的,然而并没有什么卵用,父类啥也没做
    	 *
    	 *
    	 *
    	 *
    	 *继续 parent.call(st);
    	 *parent是我们2手观察者的OnSubscribe,也就是OnSubscribeMap,终于到了关键的时候啦,
    	 *它将我们新创建的观察者放入了OnSubscribeMap的call方法内
    	 *回顾一下这个方法:
    	 * @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            这里source还是源被观察者
            source.unsafeSubscribe(parent);
        }
    	 *在这里,它使用MergeSubscriber又创建了一个MapSubscriber,transformer是使我们自己定义的FuncX,
    	 *他在前面merge(map(funcx))里已经传入,现在终于排上用场啦
    	 *add方法不用管,lz也不知道有何意义,直接看source.unsafeSubscribe(parent);,前面map我们分析过这个东西,
    	 *在执行一遍流程。
    	 *MapSubscriber 3手观察者:持有一个MergeSubscriber观察者,MergeSubscriber持有源观察者和其他两个参数maxConcurrent,delayErrors
    	 *
    	 *
    	 *    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                // new Subscriber so onStart it
                subscriber.onStart();
                // allow the hook to intercept and/or decorate
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD 
                }
                return Subscriptions.unsubscribed();
            }
        }
    
    	 *看下面这一句
    	 *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
    	 *我们在上一篇map的时候已经知道执行的是源被观察的onSubscribe的call方法,
    	 *subscriber就是上面持有我们MergeSubscriber观察者的MapSubscriber观察者,
    	 *既然执行我们源观察者的onSubscribe的call方法,那就看我们自己写的吧
    	 *
    		 * 创建一个观察者 泛型类型为Student
    		Observable<Student> observable = Observable
    				.create(new OnSubscribe<Student>() {
    					//自己写的这里面的subscriber是MapSubscriber包含MergeSubscriber的复合观察者,我们这里执行了三次
    					@Override
    					public void call(Subscriber<? super Student> subscriber) {
    						subscriber.onNext(students[0]);
    						subscriber.onNext(students[1]);
    						subscriber.onNext(students[2]);
    						subscriber.onCompleted();
    					}
    				});
    	 *转了一圈又一圈,终于有回到了原点,只不过比map又多了一个MergeSubscriber,他是OperatorMerge的内部类
    	 *下面我们看是怎么执行的
    	 *我们知道subscriber是MapSubscriber,所以首先看MapSubscriber的onNext方法
    	 *
    	 *@Override
            public void onNext(T t) {
                R result;
                
                try {
                mapper:我们的funcx
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
                actual:这里的就是我们调用OnSubscribeMap.call传过来的观察者,这里是MergeSubscriber
                actual.onNext(result);
            }
    	 * 还是和以前一样貌美如花, result = mapper.call(t);这里的t是源观察者调用onNext传的参数,
    	 * 我们使用funcx的call方法将这个参数转换(至于转换成什么,看你喽),这里得转换成一个被观察者对象
    	 * ,得到返回结果后执行传过来的观察者MergeSubscriber的
    	 * onNext方法,并传入我们自己生成的被观察者,这才是关键,so,看看MergeSubscriber里的onNext是怎么执行的,
    	 * 
    	 *  @Override
            public void onNext(Observable<? extends T> t) {
                if (t == null) {
                    return;
                }
                if (t == Observable.empty()) {
                    emitEmpty();
                } else
                if (t instanceof ScalarSynchronousObservable) {
                要是你声明的被观察者是ScalarSynchronousObservable就看着而吧
                    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
                } else {
                //主要看这儿
                    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                    addInner(inner);
                    //这里的t就是我们funcx返回的被观察者Observable
                    t.unsafeSubscribe(inner);
                    emit();
                }
            }
         *
         *可以看出InnerSubscriber内部类,窝草,又一个观察者,fuck,
         *可以看出这个内部的观察者持有一个MergeSubscriber的实例,然后又有一个uniqueId:应该是一个记录的参数,先不管他
         *下面到了addInner(inner);先不管
         *再往下t.unsafeSubscribe(inner);
         *t.unsafeSubscribe(inner)这一句代码是不是有点熟悉,我们在分析map的时候,回顾一下吧
         *在OnSubscribeMap里
         *  @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
         *好啦回顾完毕,看看这个鬼执行了神马,和map的一样,走进去看看,老样子
         *
         *   public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                // new Subscriber so onStart it
                subscriber.onStart();
                // allow the hook to intercept and/or decorate
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                // special handling for certain Throwable/Error/Exception types
                Exceptions.throwIfFatal(e);
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    RxJavaHooks.onObservableError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r; // NOPMD 
                }
                return Subscriptions.unsubscribed();
            }
        }
         *
         *RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);主要看这句
         *我们是用funcx返回的Obsevable来调用的unsafeSubscribe方法,而RxJavaHooks.onObservableStart(this, onSubscribe)
         *这一段就是进行了一下安全判断在返回这个Obserable的onSubscribe,所以我们使用Funcx的call方法获得的Obserable的
         *onSubscribe执行了call方法,具体执行了什么,看你返回的Obserable喽,到此分析完毕
         *
         *
         *好啦 到这里该总结一下流程了
         *
         *1.源观察者执行 subscriber.onNext方法将参数传入
         *2.上面的subscriber其实已经被转义为MapSubscriber的对象了,他持有。。。。
         *3.在MapSubscriber的onNext方法里进行了转换并获得了一个Obserable
         *4.Obserable是我们的FuncX的call方法转换返回的call里面的值是我们源观察者的onNext的传入参数,不懂看1和2
         *5.得到这个结果后,onNext方法里执行了actual.onNext(result);这一句actual是一个MergeSubscriber的实例
         *6.所以actual.onNext(result);执行的是MergeSubscriber里的onNext方法,其实MergeSubscriber还会被包装一下
         *成为一个InnerSubscriber楼主不做介绍
         *7。t.unsafeSubscribe(inner);然后到了这里,
         *8.最后到了RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);这里执行了返回的被观察者的onSubscribe的call方法,
    	 */
    
    	/**
    	 * 设置参数
    	 */
    	private Student[] getParams() {
    		Course course1 = new Course();
    		course1.setName("红楼梦");
    		Course course2 = new Course();
    		course2.setName("水浒传");
    		Course course3 = new Course();
    		course3.setName("西游记");
    		Course course4 = new Course();
    		course4.setName("三国演义");
    		List<Course> courses = new ArrayList<Course>();
    		courses.add(course1);
    		courses.add(course2);
    		courses.add(course3);
    		courses.add(course4);
    
    		Student student1 = new Student();
    		student1.setName("小明");
    		student1.setCourses(courses);
    
    		Student student2 = new Student();
    		student2.setName("小牧");
    		student2.setCourses(courses);
    
    		Student student3 = new Student();
    		student3.setName("小段");
    		student3.setCourses(courses);
    
    		Student[] students = { student1, student2, student3 };
    		return students;
    	}
    
    }
    

总结:

  1. 我们首先创建了一个观察者和一个被观察者,他们两个的泛型类型是不同的
  2. 我们对被观察者使用flatMap方法进行了类型转换,最后得到一个新的被观察者实例
  3. 我们使用观察者对这个新被观察者进行了订阅,也就是执行了新被观察者的observable2.subscribe(subscriber)方法,可以看出,我们将观察者传了过去
  4. 我们都知道,执行订阅方法后,再订阅方法内部又会执行onSubscribe.call方法,这个onSubscribe是我们被观察者持有的一个监听实例
  5. 我们的被观察者是一个基于源被观察者的被flatMap方法进行包装过的新被观察者,而我们观察者订阅的是这个新被观察者
  6. so,执行的也是我们新被观察者的onSubscribe.call方法,好这就是神奇的开始
  7. 通过上面的源码,我们知道我们通过flatMap得到的其实是一个3手被观察这,因为flatMap内部又进行了一次map转换,map使用的funcX是我们自己定义的,
  8. flatMap内部使用merge方法对map产生的2手被观察者进行了转换,由此可见,我们第6部的onSubscribe.call()执行的是flatMap的3手观察者的onSubscribe回调实例
  9. 终于到了关键,3手观察者持有的监听实例为OnSubscribeLift,所以执行的是OnSubscribeLift的call方法
  10. 在OnSubscribeLift.call方法里,和map一样,果然在这个特殊的监听实例里又产生了一个新的观察者,大家以为这个观察者是基于源观察者的二手观察者吧
  11. 对,这个观察者是二手的,我们在第一步使用map对源被观察者进行转换会产生一个2手被观察者,这个二手被观察者持有一个新的监听实例OnSubscribeMap,
    而这个OnSubscribeMap又持有源被观察者和一个FuncX,我们使用这个二手被观察者再次转换,也就是lift方法,
    转换出来的新的被观察者持有一个OnSubscribeLift监听实例,OnSubscribeLift又持有一个2手观察者的OnSubscribe监听实例,也就是OnSubscribeMap这个东西,
    除此之外,还有一个operator实例(他是用来操控事件处理分发的)
    • http://www.tuicool.com/articles/VrQRvur
      	 * Observable -> Operator 1 -> Operator 2 -> Operator 3 -> Subscriber
      	 * 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。
      	 * Observable生产事件,Operator修改事件,最后由Subscriber去及时响应事件,现在我们知道这个参数是干啥的啦,就可以继续往下看啦



  12. so,当调用OnSubscribeLift.call内创建新观察者源代码:Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);o是我们传过去的源观察者
    这个call其实执行的是3手被观察者持有的operator实例的call方法,这个operator是OperatorMerge这个东西,他是Operator的子类,并且将我们的源观察者传了
    过去,
  13. ok,看OperatorMerge的call最后返回了什么,返回了一个MergeSubscriber实例,也是一个观察者,它持有我们的源观察者,delayErrors, maxConcurrent【integer.max65535】最大并发数
  14. ok,拿到了我们的新观察者,在OnSubscribeLift的call方法里执行了parent.call(st);
    parent:我们的二手被观察者的OnSubscribeMap实例,第一次转换时得到,不懂看11,
    st:我们新得到的二手观察者
  15. so,现在又到了OnSubscribeMap的call方法内,它现在除了持有我们的源被观察者和FuncX,还有一个新的二手观察者MergeSubscriber
  16. 我们知道OnSubscribeMap的call方法内会对传进来的观察者在进行一次包装产生一个MapSubscriber三手观察者,它持有一个我们的FuncX:第一次map是传入,和传入的观察者二手MergeSubscriber
  17. 紧接着,剧情又发生了一次转变,又跑到source.unsafeSubscribe(parent);这个方法里来了
    source:源被观察者,map转换时传入,
    parent:3手观察者MapSubscriber
  18. 好,去看看unsafeSubscribe方法,里面有这一句 RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
    RxJavaHooks.onObservableStart(this, onSubscribe)这一段会返回一个onSubscribe,它是源被观察者的,因为17的source,
    所以这里执行了源被观察者的onSubscribe实例的call方法并传入了我们转换了两次的三手观察者MapSubscriber
  19. ok,到这里就快结束了,我们可以看到自己的代码里被观察者的onSubscribe实例的call方法【也就是我们上面的源被观察者】
  20. 哇哦,他调用了观察者的onNext方法,而这个观察者就是我们18里的三手观察者MapSubscriber,并向其传入了源被观察者的泛型类型参数
  21. ok,可以发现传入的参数和我们自己写的观察者的泛型参数不同,没关系,有转换。result = mapper.call(t);方法 mapper:funcx t:源被观察者调用onNext是传入的参数
  22. 我们看源观察者的泛型参数到了MapSubscriber的onNext方法内,在这个onNext方法内又使用了我们自己的Funcx的call方法进行类型转换
    与map不同,map的方式是直接返回转换后的参数结果result,
    而flatMap的方式下,Funcx的call方法返回result的是基于源被观察者回调里调用观察者onNext的泛型参数的被观察者
  23. 拿到了转换结果:actual.onNext(result); actual:二手观察者MergeSubscriber,
  24. 看MergeSubscriber的onNext方法
    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
    t.unsafeSubscribe(inner);
  25. 看,又出来一个观察者InnerSubscriber t:我们funcx的call方法返回的观察者。
    unsafeSubscribe方法内 RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
  26. 我们知道RxJavaHooks.onObservableStart(this, onSubscribe)其实就做了一下判断,不出大问题就会返回一个被观察者的onSubscrie,
    那个被观察者调用的unsafeSubscribe就是那个的onSubscrie,这里是25里面的t,
  27. 然后执行call方法,这里也就是我们funcx的call方法返回的被观察者的回调call方法,
  28. 但这里传入的是个inner,而不是我们的源观察者:没关系,他持有一个二手观察者MergeSubscriber【持有源观察者】,还有一个id(int型),应该是记录作用
  29. 继续,上面悲观擦者t会执行回调方法call,并用传入的inner调用onNext(参数是funcx返回的被观察者的泛型参数,目标参数),而inner里的onNext,
    又会调用二手观察者MergeSubscriber的parent.tryEmit(this, t);.
  30. queueScalar(subscriber, value);又到了这里subscriber:InnerSubscriber,value:funcx返回的被观察者的泛型参数
  31. q.onNext(nl.next(value));.又到了这里,终于看到我们的value值执行啦,到这里就完啦,是不是还有些疑惑,楼主解决不了了,反正inner间接含有我们的源观察者,
    会执行到我们源观察者的onNext方法并将值传过去。
  32. ok完成
  • 好,来个大总结吧,从订阅开始observable2.subscribe(subscriber);使用转换后的三手被观察者进行订阅,然后执行onSubscriber.call方法,
  • 第一次会将调用源被观察者的call回调方法,再在这个回调方法里调用观察者的onNext方法,这是第一步,生产事件
  • 第一步的onNext方法传的源被观察者的泛型参数并不适用于我们的源观察者,也就是说泛型参数不同,
  • 所以这个参数又到了我们源被观察者的Funcx的call方法里,并且call方法用这个参数制造了一个新的被观察者并返回,这是第二步,使用参数生成被观察者
  • 我们第二步的FuncX 的call方法返回的新的被观察者符合我们观察者的泛型,
  • 所以我们新的被观察者的call方法会调用我们目标观察者【我们自己定义的观察者】的onNext方法并将参数传入。
  • 可以看到我们源被观察者的call方法内执行了几次观察者的onNext方法 就转换参数执行几次,直到执行完毕。
  • 这也是flat平铺的含义 生成若干个被观察者(funcx.,call的返回),在交给flatMap生成的被观察者统一管理他们的事件,然后执行这些事件
  • 结束







网友评论
<