详读RxJava(Scheduler实现)
这篇文章主要介绍在RxJava中的Scheduler实现。阅读之前请先看数据的发送与接收
目录
先看个demo
Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
System.out.println("start:" + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}).subscribeOn(Schedulers.computation()).map(integer -> {
System.out.println(integer + ":" + Thread.currentThread().getName());
return integer + 1;
}).observeOn(Schedulers.io())
.map(integer -> {
System.out.println(integer + ":" + Thread.currentThread().getName());
return integer + 1;
}).observeOn(Schedulers.newThread())
.subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));
demo中使用了subscribeOn和observeOn来进行线程切换.其中observeOn每次切换线程都会生效,但是subscribeOn只会在第一次调用时生效.至于原因我们在下篇文章中揭晓. 这篇文章我们主要介绍的是常用的Schedulers.io()和Schedulers.computation(). 这两者的应用场景跟其名字一样,一个是在io比较多的场景使用,一个是cpu计算密集场景使用.
Schedulers.computation()
public static Scheduler computation() {
return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);
}
private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
current.shutdownInstance();
}
}
}
private Schedulers() {
@SuppressWarnings("deprecation")
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
Scheduler c = hook.getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
}
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}
如上代码. Schedulers.computation()会调用Schedulers的单例方法getInstance().并在第一次调用时初始化computationScheduler,ioScheduler以及newThreadScheduler.
我们先看RxJavaSchedulersHook.createComputationScheduler()
RxJavaSchedulersHook.createComputationScheduler()
public static Scheduler createComputationScheduler() {
return createComputationScheduler(new RxThreadFactory("RxComputationScheduler-"));
}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
private static final long serialVersionUID = -8841098858898482335L;
public static final ThreadFactory NONE = new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
throw new AssertionError("No threads allowed.");
}
};
final String prefix;
public RxThreadFactory(String prefix) {
this.prefix = prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + incrementAndGet());
t.setDaemon(true);
return t;
}
}
rxJava提供了一个线程工厂类.在创建新线程时会使用固定的前缀以及原子自增数作为线程名.
public static Scheduler createComputationScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new EventLoopsScheduler(threadFactory);
}
public final class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle
public EventLoopsScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(threadFactory, MAX_THREADS);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
createComputationScheduler会传入线程工厂RxThreadFactory,并返回一个EventLoopsScheduler.
EventLoopsScheduler的内部我们可以看到它创建了一个固定大小的线程池(FixedSchedulerPool). 该大小为MAX_THREADS。
static final int MAX_THREADS;
static {
int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0);
int cpuCount = Runtime.getRuntime().availableProcessors();
int max;
if (maxThreads <= 0 || maxThreads > cpuCount) {
max = cpuCount;
} else {
max = maxThreads;
}
MAX_THREADS = max;
}
MAX_THREADS等于cpu核心数.所以computation scheduler创建的是一个大小为cpu核心数的线程池.这样每个线程会占用一个cpu,减少了线程切换的耗时从而提高效率,但是他不适合有阻塞的场景.因为一旦产生阻塞就会导致cpu被浪费. 因为cpu数量是有限的,如果有超过cpu数量的任务就必须排队,所以它也不适合大量数据的场景.最好是小于或等cpu数量。
FixedSchedulerPool
他是EventLoopsScheduler的内部类.
static final class FixedSchedulerPool {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(ThreadFactory threadFactory, int maxThreads) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
}
PoolWorker
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
从上面可以看到PoolWorker继承了NewThreadWorker,而NewThreadWorker在构造方法中通过Executors创建了一个大小为1的ScheduledExecutorService. 所以PoolWorker和ScheduledExecutorService的关系是1:1,FixedSchedulerPool和PoolWorker的关系是1:cpu核心数。线程池ScheduledExecutorService保存在NewThreadWorker的局部变量executor中.
EvetLoopWorker
这个类会在下一篇文章用上.我们先看下他的方法实现.至于他用来干什么的,在看了下篇博客之后就明白了.
static final class EventLoopWorker extends Scheduler.Worker {
private final SubscriptionList serial = new SubscriptionList();
private final CompositeSubscription timed = new CompositeSubscription();
private final SubscriptionList both = new SubscriptionList(serial, timed);
private final PoolWorker poolWorker;
EventLoopWorker(PoolWorker poolWorker) {
this.poolWorker = poolWorker;
}
@Override
public void unsubscribe() {
both.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return both.isUnsubscribed();
}
@Override
public Subscription schedule(final Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
return poolWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, 0, null, serial);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
return poolWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit, timed);
}
}
可以看到他的schedule方法都会使用poolWorker的scheduleActual.
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
上面就是poolWorker的实现. 可以看到函数action会被封装成ScheduledAction.而ScheduledAction实现了Runnable接口.所以scheduleActual方法会讲函数ScheduledAction提交到线程池中.并且设置调度的时间(delayTime)。等到线程池executor调度到时就会调用到action的call方法:
@Override
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
} catch (Throwable e) {
signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
} finally {
unsubscribe();
}
}
上面的代码就是ScheduledAction的run()方法内容.
RxJavaSchedulersHook.createIoScheduler()
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}
Io scheduler使用的是CachedThreadScheduler。
CachedThreadScheduler
public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
KEEP_ALIVE_TIME默认是60秒.这个线程池会在线程空闲60秒之后回收线程.这个线程池会创建一个调度线程evictor.每隔KEEP_ALIVE_TIME时间就检查一遍线程ThreadWorker是否过期.如果过期了的话就将其从线程队列中移除:
static final class CachedWorkerPool {
private final ThreadFactory threadFactory;
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
this.threadFactory = threadFactory;
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
NewThreadWorker.tryEnableCancelPolicy(evictor);
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
evictorService = evictor;
evictorTask = task;
}
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
}
该线程池会在执行任务时优先使用空闲线程,如果没有空闲线程则创建一个新线程:
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
从代码可以看出每次调用get()方法时如果没有空闲线程就会新建一个线程.所以这里如果任务非常多则会造成OOM.所以使用时需要注意.
EventLoopWorker
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
@Override
public void call() {
pool.release(threadWorker);
}
}
可以看到每次创建EventLoopWorker都会将一个线程threadWorker标记好过期时间并放到线程队列expiringWorkerQueue的队尾:
void release(ThreadWorker threadWorker) {
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
提示
Scheduler的主要内容就是这些. 当然这篇博客看上去没有将各个方法连起来.那是因为这一篇也没法把他们连起来,流程是怎样的要在下篇博客中才讲到.看官可以先将内容在脑海中留下印象.以备看接下来的文章。