详读RxJava(数据的发送与接收)
18年我就再尝试使用rxJava来解决网络IO问题.但是一直使用得不是很得心应手,原因是不太知道它的底层原理.这次春节正好有时间可以把RxJava的原理看一遍。
目录
这里不介绍什么是RxJava。以及它的优缺点,有兴趣的同学可以去它的官网看看。
由于我所在的公司使用的基础框架是Spring Cloud 1.5.x。而其底层所使用的RxJava是1.x版本.所以我所看的RxJava也是使用的1.x版本。
这一系列我只会读RxJava的关键流程。因为时间有限,而我只想弄清楚它的核心原理是什么。
首先我们要了解它是怎么传递数据的,操作符怎么实现的。然后是怎么订阅到这些数据,最后弄明白它的线程调度是怎么回事。
建议初学者先去熟悉下RxJava的api
数据传递
首先我们看一个简单的demo:
Observable.just(System.currentTimeMillis());
just声明一个Observable.
just
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
跳入just方法看到的就是上面的代码。从代码来看就是将value值包装到一个Obervable里面(当然这里使用的是ScalarSynchronousObservable).
我们先看下ScalarSynchronousObservable类的定义
public final class ScalarSynchronousObservable<T> extends Observable<T>
它继承自Observable。
Obervable
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
}
只列出来了我们需要的方法
它的构造函数会将一个OnSubscribe存储在变量onSubscribe中.而OnSubscribe是一个函数接口Action1的子类
public interface Action1<T> extends Action {
void call(T t);
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
create(OnSubscribe
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe){
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
请在脑海里留下印象之后很多地方都有类似代码。
然后我们再回到ScalarSynchronousObservable.create(value)方法。
public static <T> ScalarSynchronousObservable<T> create(T t) {
return new ScalarSynchronousObservable<T>(t);
}
protected ScalarSynchronousObservable(final T t) {
super(RxJavaHooks.onCreate(new JustOnSubscribe<T>(t)));
this.t = t;
}
注意super(RxJavaHooks.onCreate(new JustOnSubscribe
我们在细看ScalarSynchronousObservable
ScalarSynchronousObservable
JustOnSubscribe
它是ScalarSynchronousObservable的静态内部类
static final class JustOnSubscribe<T> implements OnSubscribe<T> {
final T value;
JustOnSubscribe(T value) {
this.value = value;
}
@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}
}
static <T> Producer createProducer(Subscriber<? super T> s, T v) {
if (STRONG_MODE) {
return new SingleProducer<T>(s, v);
}
return new WeakSingleProducer<T>(s, v);
}
WeakSingleProducer
static final class WeakSingleProducer<T> implements Producer {
final Subscriber<? super T> actual;
final T value;
boolean once;
public WeakSingleProducer(Subscriber<? super T> actual, T value) {
this.actual = actual;
this.value = value;
}
@Override
public void request(long n) {
if (once) {
return;
}
if (n < 0L) {
throw new IllegalStateException("n >= required but it was " + n);
}
if (n == 0L) {
return;
}
once = true;
Subscriber<? super T> a = actual;
if (a.isUnsubscribed()) {
return;
}
T v = value;
try {
a.onNext(v);//将值v传递给onNext。
} catch (Throwable e) {
Exceptions.throwOrReport(e, a, v);
return;
}
if (a.isUnsubscribed()) {
return;
}
a.onCompleted();
}
}
JustOnSubscribe函数的call方法会创建一个Producer.而Producer会有一个request(long n)方法.最终在改方法里面会调用Subscriber的onNext()以及onCompleted接口。 这个request方法会在之后的订阅中被调用.请继续往下看
Subscriber
Subscriber实现了Observer,Subscription两个接口.我们简单列一下这两个接口的定义.
public interface Observer<T> {
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onCompleted();
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(T t);
}
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
我们目前需要关注的是他的另外一个方法:
public void setProducer(Producer p) {
//省略了次要逻辑
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
该方法在JustOnSubscribe的call方法中调用.而该方法最终又会调用producer的request. 所以JustOnSubscribe的call方法最终会调用到Subscriber的onNext方法。而onNext方法就是传递数据用的。
现在知道数据是通过JustOnSubscribe的call来传递数据的了.但是这个call是谁来调用呢?很明显Subscriber就是订阅者.如果后续有人订阅了这个Observable的话.我想Subscriber的call方法就会被调用到.那么实际上真会如此吗?我们继续
订阅
我们将上面的demo扩展一下,订阅just的数据。
Observable.just(System.currentTimeMillis())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});
我们可以看到subscribe中声明了一个匿名类Action1.我们将其称为订阅者函数action。
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
ActionSubscriber只是将订阅者函数action包装了下.这里我们直接跳过.看重点:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
由于内容较多,我只列出了核心代码.
首先回顾下just方法.它会返回一个Observable.所以Observable.subscribe(subscriber, this)中的this。就是just中声明的Observable. 在just声明Observable时Observable的onSubscribe变量保存的是JustOnSubscribe.所以RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);这一步就是调用了JustOnSubscribe的call方法.参数subscriber就是demo代码中声明的订阅者函数action. 结合上面所述.最终会将Observable中的数据value值。通过JustOnSubscribe的call方法传递到订阅者函数action的call方法中。这样就实现了数据的订阅。我总结下流程(一些其他的细节等下再解释):
Observable声明(值得定义):
- Obervable调用just.
- just中创建ScalarSynchronousObservable(ScalarSynchronousObservable.create(value))
- new JustOnSubscribe(value).
- value会保存在justOnSubscribe的局部变量中
- 将justObSubscribe保存在ScalarSynchronousObservable的局部变量onSubscribe中
订阅:
- 声明一个匿名函数Action1且将其传入subscribe方法.
- Observable.subscribe(subscriber, this); this是just返回的Observable.
- RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber) observable.onSubscribe就是just方法中声明的JustOnSubscribe。subscriber即匿名函数Action1的包装类
- 调用JustOnSubscribe.call(subscriber).
- createProducer(subscriber, value) 创建producer,并将subscriber和value保存在producer的局部变量中
- 调用producer.request();
- 在producer的request中调用subscriber.onNext(value); subscriber就是匿名函数Action1.所以到此就可以在值value传递到匿名函数Action1中了。
其他细节
SafeSubscriber
代码subscriber = new SafeSubscriber
下篇我们看下操作符是如何实现的.