18年我就再尝试使用rxJava来解决网络IO问题.但是一直使用得不是很得心应手,原因是不太知道它的底层原理.这次春节正好有时间可以把RxJava的原理看一遍。

目录

这里不介绍什么是RxJava。以及它的优缺点,有兴趣的同学可以去它的官网看看。

由于我所在的公司使用的基础框架是Spring Cloud 1.5.x。而其底层所使用的RxJava是1.x版本.所以我所看的RxJava也是使用的1.x版本。

这一系列我只会读RxJava的关键流程。因为时间有限,而我只想弄清楚它的核心原理是什么。

首先我们要了解它是怎么传递数据的,操作符怎么实现的。然后是怎么订阅到这些数据,最后弄明白它的线程调度是怎么回事。

建议初学者先去熟悉下RxJava的api

数据传递

首先我们看一个简单的demo:

Observable.just(System.currentTimeMillis());

just声明一个Observable.

just

public static <T> Observable<T> just(final T value) {
    return ScalarSynchronousObservable.create(value);
}

跳入just方法看到的就是上面的代码。从代码来看就是将value值包装到一个Obervable里面(当然这里使用的是ScalarSynchronousObservable).

我们先看下ScalarSynchronousObservable类的定义

public final class ScalarSynchronousObservable<T> extends Observable<T> 

它继承自Observable。

Obervable

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    
    @Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
}

只列出来了我们需要的方法

它的构造函数会将一个OnSubscribe存储在变量onSubscribe中.而OnSubscribe是一个函数接口Action1的子类

public interface Action1<T> extends Action {
    void call(T t);
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}

create(OnSubscribe f)方法中使用了RxJavaHooks.onCreate(f)来包装函数f.但是这一操作只是为监控rxjava留的口子。如果RxJava并没有启用,只是简单的将f返回而已.如下:

 public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe){
     Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
     if (f != null) {
         return f.call(onSubscribe);
     }
     return onSubscribe;
 }

请在脑海里留下印象之后很多地方都有类似代码。

然后我们再回到ScalarSynchronousObservable.create(value)方法。

public static <T> ScalarSynchronousObservable<T> create(T t) {
    return new ScalarSynchronousObservable<T>(t);
}

protected ScalarSynchronousObservable(final T t) {
    super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
    this.t = t;
}

注意super(RxJavaHooks.onCreate(new JustOnSubscribe(t)))这里.它会调用父类(也就是Obersvable)的构造函数。但是在此之前会使用一个JustOnSubscribe(类似的如果demo中使用的是defer,则会使用OnSubscribeDefer)将值t封装成一个函数OnSubscribe.然后将该函数保存在父类的obSubscribe变量中(这里请记住,在之后的订阅中会用上).

我们在细看ScalarSynchronousObservable

ScalarSynchronousObservable

JustOnSubscribe

它是ScalarSynchronousObservable的静态内部类

static final class JustOnSubscribe<T> implements OnSubscribe<T> {
        final T value;

        JustOnSubscribe(T value) {
            this.value = value;
        }

        @Override
        public void call(Subscriber<? super T> s) {
            s.setProducer(createProducer(s, value));
        }
    }
static <T> Producer createProducer(Subscriber<? super T> s, T v) {
        if (STRONG_MODE) {
            return new SingleProducer<T>(s, v);
        }
        return new WeakSingleProducer<T>(s, v);
    }

WeakSingleProducer

static final class WeakSingleProducer<T> implements Producer {
        final Subscriber<? super T> actual;
        final T value;
        boolean once;

        public WeakSingleProducer(Subscriber<? super T> actual, T value) {
            this.actual = actual;
            this.value = value;
        }

        @Override
        public void request(long n) {
            if (once) {
                return;
            }
            if (n < 0L) {
                throw new IllegalStateException("n >= required but it was " + n);
            }
            if (n == 0L) {
                return;
            }
            once = true;
            Subscriber<? super T> a = actual;
            if (a.isUnsubscribed()) {
                return;
            }
            T v = value;
            try {
                a.onNext(v);//将值v传递给onNext。
            } catch (Throwable e) {
                Exceptions.throwOrReport(e, a, v);
                return;
            }

            if (a.isUnsubscribed()) {
                return;
            }
            a.onCompleted();
        }
    }

JustOnSubscribe函数的call方法会创建一个Producer.而Producer会有一个request(long n)方法.最终在改方法里面会调用Subscriber的onNext()以及onCompleted接口。 这个request方法会在之后的订阅中被调用.请继续往下看

Subscriber

Subscriber实现了Observer,Subscription两个接口.我们简单列一下这两个接口的定义.

public interface Observer<T> {

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onCompleted();

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onCompleted}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(T t);
}

public interface Subscription {

    /**
     * Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
     * was received.
     * <p>
     * This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
     * onCompleted is called).
     */
    void unsubscribe();

    /**
     * Indicates whether this {@code Subscription} is currently unsubscribed.
     *
     * @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
     */
    boolean isUnsubscribed();

}

我们目前需要关注的是他的另外一个方法:

public void setProducer(Producer p) {
    //省略了次要逻辑
    if (toRequest == NOT_SET) {
        producer.request(Long.MAX_VALUE);
    } else {
        producer.request(toRequest);
    }
}

该方法在JustOnSubscribe的call方法中调用.而该方法最终又会调用producer的request. 所以JustOnSubscribe的call方法最终会调用到Subscriber的onNext方法。而onNext方法就是传递数据用的。

现在知道数据是通过JustOnSubscribe的call来传递数据的了.但是这个call是谁来调用呢?很明显Subscriber就是订阅者.如果后续有人订阅了这个Observable的话.我想Subscriber的call方法就会被调用到.那么实际上真会如此吗?我们继续

订阅

我们将上面的demo扩展一下,订阅just的数据。

Observable.just(System.currentTimeMillis())
                .subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                System.out.println(aLong);
            }
        });

我们可以看到subscribe中声明了一个匿名类Action1.我们将其称为订阅者函数action。

public final Subscription subscribe(final Action1<? super T> onNext) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }

        Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }

ActionSubscriber只是将订阅者函数action包装了下.这里我们直接跳过.看重点:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    subscriber.onStart();
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        if (subscriber.isUnsubscribed()) {
            RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
        } else {
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                throw r; // NOPMD
            }
        }
        return Subscriptions.unsubscribed();
    }
}

由于内容较多,我只列出了核心代码.

首先回顾下just方法.它会返回一个Observable.所以Observable.subscribe(subscriber, this)中的this。就是just中声明的Observable. 在just声明Observable时Observable的onSubscribe变量保存的是JustOnSubscribe.所以RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);这一步就是调用了JustOnSubscribe的call方法.参数subscriber就是demo代码中声明的订阅者函数action. 结合上面所述.最终会将Observable中的数据value值。通过JustOnSubscribe的call方法传递到订阅者函数action的call方法中。这样就实现了数据的订阅。我总结下流程(一些其他的细节等下再解释):

Observable声明(值得定义):

  1. Obervable调用just.
  2. just中创建ScalarSynchronousObservable(ScalarSynchronousObservable.create(value))
  3. new JustOnSubscribe(value).
  4. value会保存在justOnSubscribe的局部变量中
  5. 将justObSubscribe保存在ScalarSynchronousObservable的局部变量onSubscribe中

订阅:

  1. 声明一个匿名函数Action1且将其传入subscribe方法.
  2. Observable.subscribe(subscriber, this); this是just返回的Observable.
  3. RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber) observable.onSubscribe就是just方法中声明的JustOnSubscribe。subscriber即匿名函数Action1的包装类
  4. 调用JustOnSubscribe.call(subscriber).
  5. createProducer(subscriber, value) 创建producer,并将subscriber和value保存在producer的局部变量中
  6. 调用producer.request();
  7. 在producer的request中调用subscriber.onNext(value); subscriber就是匿名函数Action1.所以到此就可以在值value传递到匿名函数Action1中了。

其他细节

SafeSubscriber

代码subscriber = new SafeSubscriber(subscriber)。是将函数subscriber进行了包装。包装类SafeSubscriber会保证subscriber的onNext,onCompleted,onError方法的正确使用(即onNext可以调用1到多次,而onCompleted和onError只能调用一次,且三者是互斥关系).

下篇我们看下操作符是如何实现的.