详读RxJava(Operator的实现)
在RxJava中有非常多的Operator方法,比如map,flatMap,lift等。在这些Operatior中比较重要的是lift方法,因为很多操作都是基于lift来实现的,扩展lift还可以实现自定义Operator. 这篇内容主要是介绍lift,map,flatMap的实现(关于这些操作的作用请查阅官网,这里不做过多介绍)。阅读之前请先看本系列文章的第一篇
目录
lift
先看个例子:
Operator<Long, Long> myOperator = new Operator<Long, Long>() {
@Override
public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) {
return new Subscriber<Long>() {
@Override
public void onCompleted() {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onError(e);
}
@Override
public void onNext(Long aLong) {
if (subscriber.isUnsubscribed()) {
return;
}
subscriber.onNext(aLong + 1);
}
};
}
};
Observable.just(System.currentTimeMillis())
.lift(myOperator)
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
然后是lift的实现
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
unsafeCreate的实现
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
所以新建的Observable的onSubscribe变量保存的是OnSubscribeLift。注意unsafeCreate(new OnSubscribeLift
OnSubscribeLift
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator){
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
OnSubscribeLift实现了OnSubscribe接口。所以它会被订阅者调用其call方法. 在call里面他会调用operator(即我们自定义的myOperator)的call并返回Subscriber st. 然后又会通过源Obervable调用该st的onNext方法。 结合demo我总结下流程:
- 调用lift方法时会将just时创建的Obervable保存在OnSubscribeLift的parent变量中.而lift传入的Operator会保存在operator变量.最后返回一个新的Obervable
- 调用subscribe()方法时,会调用到lift创建的Obervable的变量obSubscribe(即OnSubscribeLift)的call方法。
- OnSubscribeLift的call返回会先调用自定义的myOperator的call方法,并将返回的Subscriber传入源Obervable(即parent变量)。
- 源Obervable会通过其onSubscriber(即JustOnSubscribe)最终调用到myOperator的匿名内部类Subscriber的onNext中(这一步骤的细节请结合上一篇文章看)
整个流程跟上一篇的内容相比,就是在订阅数据时,在源Obervable传送数据的过程中新增了一个自定义的myOperator操作,从而实现对数据的调整.调整后的数据最终被订阅者消费.
map
Observable.just(System.currentTimeMillis())
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong + 1;
}
})
.subscribe(myAction);
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
这里跟lift基本一样.唯一的区别就是unsafeCreate创建的Obervable的局部变量onSubcribe保存的函数不一样。所以这里我们只需要了解函数OnSubscribeMap的细节就弄明白map的实现了.
OnSubscribeMap
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
结合lift的思路来看,map会在订阅数据之后通过其内部类MapSubscriber来实现对数据的修改.最后通过source.unsafeSubscribe(parent)将数据传送到订阅者.
大致的流程和lift基本一致.
flatMap
flatMap跟map从参数来说有很大区别.如下:
Observable.just(System.currentTimeMillis())
.flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong + 1);
}
})
.subscribe(myAction);
其call方法的返回是一个Obervable.
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
这里会根据源Obervable的类型来进行区分.这里我们不看更为复杂的merge实现(主要是结合我们的demo来看源码).
scalarFlatMap:
public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
return unsafeCreate(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
Observable<? extends R> o = func.call(t);
if (o instanceof ScalarSynchronousObservable) {
child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t));
} else {
o.unsafeSubscribe(Subscribers.wrap(child));
}
}
});
}
如上可以看到这里unsafeCreate创建的Obervable的onSubscribe函数是一个匿名类OnSubscribe
小结
RxJava的操作(Operator)符的实现都遵循了开闭原则(对扩展开放,对修改关闭). 通过Obervable的onSubscibe以及OnSubscribe接口来实现数据的传递与接收,并在传递的过程中插入一个Operator来实现对源数据的修改.
虽然每个操作符都会对数据修改,但是从实现来看,并没有对原始数据进行修改.这也符合函数式编程的
数据不可变
原则