今天我们聊聊subscribeOn线程切换的实现.阅读之前请先阅读之前的几篇博客

目录

第一种情况

造例,还是根据demo来看源码

Observable.just(1)
    .subscribeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));

输出的结果是:

action:RxComputationScheduler-1

虽然我们使用了两个subscribeOn,但是跟上篇博客所述的一样subscribeOn只有第一次(computation线程池)生效了。这是为何呢?这个我们最后再解释.先看下subscribeOn的内部.

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

可以看到它会判断当前Obervable是否是ScalarSynchronousObservable.从demo以及前面几篇博客可以知道just返回的Observable就是属于ScalarSynchronousObservable。但是subscribeOn返回的Observable不属于.所以第一次调用subscribeOn会进入scalarScheduleOn.第二次会进入unsafeCreate(……)。我们先看第一次调用.

public Observable<T> scalarScheduleOn(final Scheduler scheduler) {
    Func1<Action0, Subscription> onSchedule;
    if (scheduler instanceof EventLoopsScheduler) {
        final EventLoopsScheduler els = (EventLoopsScheduler) scheduler;
        onSchedule = new Func1<Action0, Subscription>() {
            @Override
            public Subscription call(Action0 a) {
                return els.scheduleDirect(a);
            }
        };
    } else {
        onSchedule = new Func1<Action0, Subscription>() {
            @Override
            public Subscription call(final Action0 a) {
                final Scheduler.Worker w = scheduler.createWorker();
                w.schedule(new Action0() {
                    @Override
                    public void call() {
                        try {
                            a.call();
                        } finally {
                            w.unsubscribe();
                        }
                    }
                });
                return w;
            }
        };
    }

    return unsafeCreate(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
}

这里会再用一个onSchedule函数将当前的scheduler包起来.当被调用到onSchedule的call方法时,线程就会被切换回当前onSchedule的线程池中.最后使用ScalarAsyncOnSubscribe将值T和onSchedule包装成一个OnSubscribe.

由于subscribeOn(Scheduler scheduler, boolean requestOn) 这一步只会在第一次使用onSchedule时会进入到scalarScheduleOn.而这里会将请求线程切换到第一次使用onSchedule传入的线程池.所以多次调用onSchedule的话最终工作的线程是第一次onSchedule传入的线程池.(第二次subscribeOn的OperatorSubscribeOn是怎么调用到ScalarAsyncOnSubscribe请看下面的第二种情况即可明白).

@Override
public void call(Subscriber<? super T> s) {
    s.setProducer(new ScalarAsyncProducer<T>(s, value, onSchedule));
}

这是ScalarAsyncOnSubscribe的call实现.

ScalarAsyncProducer

@Override
public void request(long n) {
    if (n < 0L) {
        throw new IllegalArgumentException("n >= 0 required but it was " + n);
    }
    if (n != 0 && compareAndSet(false, true)) {
        actual.add(onSchedule.call(this));
    }
}

@Override
public void call() {
    Subscriber<? super T> a = actual;
    if (a.isUnsubscribed()) {
        return;
    }
    T v = value;
    try {
        a.onNext(v);
    } catch (Throwable e) {
        Exceptions.throwOrReport(e, a, v);
        return;
    }
    if (a.isUnsubscribed()) {
        return;
    }
    a.onCompleted();
}

最终call被调用时完成了数据的传递a.onNext(v).

OperatorSubscribeOn

public final class OperatorSubscribeOn<T> implements OnSubscribe<T>

它实现了OnSubscribe.所以将会在订阅(subscribe)之后被调用它的call方法.

@Override
public void call(final Subscriber<? super T> subscriber) {
    final Worker inner = scheduler.createWorker();

    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
    subscriber.add(parent);
    subscriber.add(inner);

    inner.schedule(parent);
}

scheduler就是从subscribeOn传入的Scheduler.io().注意在这里都是io线程池,还没切换到computation池.在call中它会将源Observable(变量source,这个source就是第一次调用subscribeOn返回的Observable)和subscriber用SubscribeOnSubscriber进行包装. 然后使用将parent放入到Worker线程池中(上一篇博客中所提到EventLoopWorker都是实现了Worker接口,所以这里的Inner就是CachedThreadScheduler的内部类EventLoopWorker.)中进行调度.调度的结果就是调用到parent的call方法(从这一步开始数据的传输就开始在一个线程池中进行,即开始了线程切换.进行异步操作).

SubscribeOnSubscriber

static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0

它实现了Subscriber接口,所有有onNext,onCompleted,onError方法,先看它的call方法.

@Override
public void call() {
    Observable<T> src = source;
    source = null;
    t = Thread.currentThread();
    src.unsafeSubscribe(this);
}

这一步很关键. src即第一次调用subscribeOn返回的Observable.所以src.unsafeSubscribe(this),然后就会调用到ScalarAsyncOnSubscribe的call方法.而ScalarAsyncOnSubscribe最终会调用它的局部变量onSchedule的call.所以到这一步就会将io线程切换回onSchedule的线程(即computation线程).

第二种情况

Observable.just(1)
    .map(new Func1<Integer, Long>() {
        @Override
        public Long call(Integer integer) {
            System.out.println(integer+""+Thread.currentThread().getName());
            return integer+1L;
        }
    })
    .subscribeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));

运行的结果是:

1RxComputationScheduler-1
action:RxComputationScheduler-1

可以看到subscribeOn可以影响到数据的上下游.且任务在第一次subscribeOn传入的线程池中工作.那么它又是如何生效的呢?

还是从subscribeOn入手

public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

此时不管是第一次还是第二次调用subscribeOn,当前的Observable都不可能是ScalarSynchronousObservable.所以都会进入到OperatorSubscribeOn。那么只是使用OperatorSubscribeOn,怎么达到只作用在第一次调用subscribeOn的呢?我们直接看关键步骤.

OperatorSubscribeOn:

@Override
public void call(final Subscriber<? super T> subscriber) {
    final Worker inner = scheduler.createWorker();

    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
    subscriber.add(parent);
    subscriber.add(inner);

    inner.schedule(parent);
}

第一次调用时会将第一个scheduler(computation)作为局部变量scheduler保存下来.并传入SubscribeOnSubscriber.也就是说OperatorSubscribeOn,SubscribeOnSubscriber都和线程池scheduler是强绑定.并且SubscribeOnSubscriber会记录源Observable.

SubscribeOnSubscriber:

@Override
public void call() {
    Observable<T> src = source;
    source = null;
    t = Thread.currentThread();
    src.unsafeSubscribe(this);
}

结合代码.如果此时订阅已经开始,那么肯定是会创建两个OperatorSubscribeOn,SubscribeOnSubscriber.但是最外层的SubscribeOnSubscriber会通过source变量调用到最底层的SubscribeOnSubscriber.从而完成线程切换回第一个线程池的目标.

我们将视角切回调用完map之后。此时第一次调用subscribeOn.所以OperatorSubscribeOn中的source是保存着对map返回的Observable的引用. 所以computation的OperatorSubscribeOn被调用call时,会调用到map的Observable.从而影响上流数据.

小结

subscribeOn的特征是通过将上一个操作符返回的Observable作为source.一层层向后传递,订阅后类似递归一样一层层向前调用实现的. observeOn与它最大的区别也是在此了.

虽然最终作用在第一个线程池中,但是之后的线程池也是会生效的.虽然只是个短暂的线程切换.但是还是会有比较大的性能损耗.所以onSchedule尽量只使用一次。

请求流程如图: