这次我们聊聊observeOn线程切换的实现.阅读之前请先阅读之前的几篇博客

目录

Observable.just(1)
    .map(new Func1<Integer, Long>() {
        @Override
        public Long call(Integer integer) {
            System.out.println(integer+""+Thread.currentThread().getName());
            return integer+1L;
        }
    })
    .observeOn(Schedulers.io())
    .subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));

运行结果:

1main
action:RxIoScheduler-2

可以看到observeOn只影响数据下游.

还是直接进入observeOn。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize{
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

重点在于lift()方法.所以我们先看这个方法:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

lift中会使用OnSubscribeLift将onSubscribe, operator包装起来.而operator则是lift中传入的OperatorObserveOn实例.onSubscribe在demo中是OnSubscribeMap.

OnSubscribeLift

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
}

它将上一个操作符(map)返回的Observable(OnSubscribeMap)作为源Observable保存在parent中.并将OperatorObserveOn保存在operator中.当调用了subscribe()时会调到它的call方法.从call里面可以看到它会先调用OperatorObserveOn的call,然后再调用OnSubscribeMap的call.简单的画下流程:

subscribe()->OnSubscribeLift.call()->OperatorObserveOn.call()->OnSubscribeMap.call();

我们再看下OperatorObserveOn做了什么:

OperatorObserveOn

内容较多,只提取了关键代码.

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        // avoid overhead, execute directly
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        // avoid overhead, execute directly
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }
}

scheduler变量是从observeOn中传入的Schedulers.io(),child是从OnSubscribeLift的call传递过来的subscribe()中创建的匿名函数(即你demo中的扩展方法)。我们再看下ObserveOnSubscriber:

ObserveOnSubscriber

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
    this.child = child;
    this.recursiveScheduler = scheduler.createWorker();
    this.delayError = delayError;
    int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
    // this formula calculates the 75% of the bufferSize, rounded up to the next integer
    this.limit = calculatedSize - (calculatedSize >> 2);
    if (UnsafeAccess.isUnsafeAvailable()) {
        queue = new SpscArrayQueue<Object>(calculatedSize);
    } else {
        queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
    }
    // signal that this is an async operator capable of receiving this many
    request(calculatedSize);
}

void init() {
    // don't want this code in the constructor because `this` can escape through the
    // setProducer call
    Subscriber<? super T> localChild = child;

    localChild.setProducer(new Producer() {

        @Override
        public void request(long n) {
            if (n > 0L) {
                BackpressureUtils.getAndAddRequest(requested, n);
                schedule();
            }
        }

    });
    localChild.add(recursiveScheduler);
    localChild.add(this);
}
protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        recursiveScheduler.schedule(this);
    }
}

从他的构造函数可以看到他使用了Schduler.io()的EventLoopWorker。在init方法中通过child的setProducer方法调用到schedule.schedule方法会使用Schduler.io()的EventLoopWorker来调度(recursiveScheduler.schedule(this)).

recursiveScheduler.schedule(this)从这代码可以看到从这里开始会将调用ObserveOnSubscriber的任务提交到一个线程池中进行操作.

我再看下ObserveOnSubscriber的call方法的关键代码:

final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
Object v = q.poll();
localChild.onNext(NotificationLite.<T>getValue(v));

而this.child是demo中的拓展匿名函数也是数据的下游.所以下游的数据处理是异步的。

我们再回到OnSubscribeLift的call的两行关键代码:

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); //1
parent.call(st); //2

我们可以得知进入这个call方法时的线程是主线程.然后通过第一行代码将ObserveOnSubscriber的执行放入到了一个线程池.所以从主线程将不会再处理ObserveOnSubscriber的任务.但是主线程还会继续执行第二行代码parent.call(st); 从上面可以得知parent是map操作符返回的Observable(OnSubscribeMap)。所以observeOn并不会影响调用它之前的数据.只会影响调用之后的数据. 所以observeOn可以进行多次调用进行线程切换