详读RxJava(observeOn实现)
这次我们聊聊observeOn线程切换的实现.阅读之前请先阅读之前的几篇博客
目录
Observable.just(1)
.map(new Func1<Integer, Long>() {
@Override
public Long call(Integer integer) {
System.out.println(integer+""+Thread.currentThread().getName());
return integer+1L;
}
})
.observeOn(Schedulers.io())
.subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));
运行结果:
1main
action:RxIoScheduler-2
可以看到observeOn只影响数据下游.
还是直接进入observeOn。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize{
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
重点在于lift()方法.所以我们先看这个方法:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
lift中会使用OnSubscribeLift将onSubscribe, operator包装起来.而operator则是lift中传入的OperatorObserveOn实例.onSubscribe在demo中是OnSubscribeMap.
OnSubscribeLift
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
它将上一个操作符(map)返回的Observable(OnSubscribeMap)作为源Observable保存在parent中.并将OperatorObserveOn保存在operator中.当调用了subscribe()时会调到它的call方法.从call里面可以看到它会先调用OperatorObserveOn的call,然后再调用OnSubscribeMap的call.简单的画下流程:
subscribe()->OnSubscribeLift.call()->OperatorObserveOn.call()->OnSubscribeMap.call();
我们再看下OperatorObserveOn做了什么:
OperatorObserveOn
内容较多,只提取了关键代码.
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
scheduler变量是从observeOn中传入的Schedulers.io(),child是从OnSubscribeLift的call传递过来的subscribe()中创建的匿名函数(即你demo中的扩展方法)。我们再看下ObserveOnSubscriber:
ObserveOnSubscriber
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
从他的构造函数可以看到他使用了Schduler.io()的EventLoopWorker。在init方法中通过child的setProducer方法调用到schedule.schedule方法会使用Schduler.io()的EventLoopWorker来调度(recursiveScheduler.schedule(this)).
recursiveScheduler.schedule(this)从这代码可以看到从这里开始会将调用ObserveOnSubscriber的任务提交到一个线程池中进行操作.
我再看下ObserveOnSubscriber的call方法的关键代码:
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
Object v = q.poll();
localChild.onNext(NotificationLite.<T>getValue(v));
而this.child是demo中的拓展匿名函数也是数据的下游.所以下游的数据处理是异步的。
我们再回到OnSubscribeLift的call的两行关键代码:
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); //1
parent.call(st); //2
我们可以得知进入这个call方法时的线程是主线程.然后通过第一行代码将ObserveOnSubscriber的执行放入到了一个线程池.所以从主线程将不会再处理ObserveOnSubscriber的任务.但是主线程还会继续执行第二行代码parent.call(st); 从上面可以得知parent是map操作符返回的Observable(OnSubscribeMap)。所以observeOn并不会影响调用它之前的数据.只会影响调用之后的数据. 所以observeOn可以进行多次调用进行线程切换