在RxJava中有非常多的Operator方法,比如map,flatMap,lift等。在这些Operatior中比较重要的是lift方法,因为很多操作都是基于lift来实现的,扩展lift还可以实现自定义Operator. 这篇内容主要是介绍lift,map,flatMap的实现(关于这些操作的作用请查阅官网,这里不做过多介绍)。阅读之前请先看本系列文章的第一篇

目录

lift

先看个例子:

Operator<Long, Long> myOperator = new Operator<Long, Long>() {
    @Override
    public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) {
        return new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                if (subscriber.isUnsubscribed()) {
                    return;
                }

                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onError(e);
            }

            @Override
            public void onNext(Long aLong) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                subscriber.onNext(aLong + 1);
            }
        };
    }
};

Observable.just(System.currentTimeMillis())
    .lift(myOperator)
    .subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println(aLong);
        }
    });
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

然后是lift的实现

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

unsafeCreate的实现

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

所以新建的Observable的onSubscribe变量保存的是OnSubscribeLift。注意unsafeCreate(new OnSubscribeLift(onSubscribe, operator))这里会将onSubscribe作为参数传入,这个onSubscribe和unsafeCreate创建出来的Obervable的onSubscribe并不一样.作为参数的onSubscribe这个是上一个Obervable(即源Obervable)的函数。 接下来我们看看OnSubscribeLift的细节

OnSubscribeLift

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator){
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

OnSubscribeLift实现了OnSubscribe接口。所以它会被订阅者调用其call方法. 在call里面他会调用operator(即我们自定义的myOperator)的call并返回Subscriber st. 然后又会通过源Obervable调用该st的onNext方法。 结合demo我总结下流程:

  1. 调用lift方法时会将just时创建的Obervable保存在OnSubscribeLift的parent变量中.而lift传入的Operator会保存在operator变量.最后返回一个新的Obervable
  2. 调用subscribe()方法时,会调用到lift创建的Obervable的变量obSubscribe(即OnSubscribeLift)的call方法。
  3. OnSubscribeLift的call返回会先调用自定义的myOperator的call方法,并将返回的Subscriber传入源Obervable(即parent变量)。
  4. 源Obervable会通过其onSubscriber(即JustOnSubscribe)最终调用到myOperator的匿名内部类Subscriber的onNext中(这一步骤的细节请结合上一篇文章看)

整个流程跟上一篇的内容相比,就是在订阅数据时,在源Obervable传送数据的过程中新增了一个自定义的myOperator操作,从而实现对数据的调整.调整后的数据最终被订阅者消费.

map

Observable.just(System.currentTimeMillis())
    .map(new Func1<Long, Long>() {
        @Override
        public Long call(Long aLong) {
            return aLong + 1;
        }
    })
    .subscribe(myAction);
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

这里跟lift基本一样.唯一的区别就是unsafeCreate创建的Obervable的局部变量onSubcribe保存的函数不一样。所以这里我们只需要了解函数OnSubscribeMap的细节就弄明白map的实现了.

OnSubscribeMap

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {
        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

结合lift的思路来看,map会在订阅数据之后通过其内部类MapSubscriber来实现对数据的修改.最后通过source.unsafeSubscribe(parent)将数据传送到订阅者.

大致的流程和lift基本一致.

flatMap

flatMap跟map从参数来说有很大区别.如下:

Observable.just(System.currentTimeMillis())
    .flatMap(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            return Observable.just(aLong + 1);
        }
    })
    .subscribe(myAction);

其call方法的返回是一个Obervable.

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

这里会根据源Obervable的类型来进行区分.这里我们不看更为复杂的merge实现(主要是结合我们的demo来看源码).

scalarFlatMap:

public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return unsafeCreate(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> child) {
                Observable<? extends R> o = func.call(t);
                if (o instanceof ScalarSynchronousObservable) {
                    child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t));
                } else {
                    o.unsafeSubscribe(Subscribers.wrap(child));
                }
            }
        });
    }

如上可以看到这里unsafeCreate创建的Obervable的onSubscribe函数是一个匿名类OnSubscribe(). 在其Obervable被订阅时会通过child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t))最终调用到flathMap的匿名函数的call方法.从而实现数据的修改.并最终将修改的数据传到订阅者.

小结

RxJava的操作(Operator)符的实现都遵循了开闭原则(对扩展开放,对修改关闭). 通过Obervable的onSubscibe以及OnSubscribe接口来实现数据的传递与接收,并在传递的过程中插入一个Operator来实现对源数据的修改.

虽然每个操作符都会对数据修改,但是从实现来看,并没有对原始数据进行修改.这也符合函数式编程的数据不可变原则