这篇文章主要介绍在RxJava中的Scheduler实现。阅读之前请先看数据的发送与接收

目录

先看个demo

Observable.create((Observable.OnSubscribe<Integer>) subscriber -> {
            System.out.println("start:" + Thread.currentThread().getName());
            subscriber.onNext(1);
            subscriber.onCompleted();
        }).subscribeOn(Schedulers.computation()).map(integer -> {
            System.out.println(integer + ":" + Thread.currentThread().getName());
            return integer + 1;
        }).observeOn(Schedulers.io())
                .map(integer -> {
                    System.out.println(integer + ":" + Thread.currentThread().getName());
                    return integer + 1;
                }).observeOn(Schedulers.newThread())
                .subscribe(integer -> System.out.println("action:" + Thread.currentThread().getName()));

demo中使用了subscribeOn和observeOn来进行线程切换.其中observeOn每次切换线程都会生效,但是subscribeOn只会在第一次调用时生效.至于原因我们在下篇文章中揭晓. 这篇文章我们主要介绍的是常用的Schedulers.io()和Schedulers.computation(). 这两者的应用场景跟其名字一样,一个是在io比较多的场景使用,一个是cpu计算密集场景使用.

Schedulers.computation()

public static Scheduler computation() {
    return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);
}
private static Schedulers getInstance() {
        for (;;) {
            Schedulers current = INSTANCE.get();
            if (current != null) {
                return current;
            }
            current = new Schedulers();
            if (INSTANCE.compareAndSet(null, current)) {
                return current;
            } else {
                current.shutdownInstance();
            }
        }
    }

    private Schedulers() {
        @SuppressWarnings("deprecation")
        RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();

        Scheduler c = hook.getComputationScheduler();
        if (c != null) {
            computationScheduler = c;
        } else {
            computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
        }

        Scheduler io = hook.getIOScheduler();
        if (io != null) {
            ioScheduler = io;
        } else {
            ioScheduler = RxJavaSchedulersHook.createIoScheduler();
        }

        Scheduler nt = hook.getNewThreadScheduler();
        if (nt != null) {
            newThreadScheduler = nt;
        } else {
            newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
        }
    }

如上代码. Schedulers.computation()会调用Schedulers的单例方法getInstance().并在第一次调用时初始化computationScheduler,ioScheduler以及newThreadScheduler.

我们先看RxJavaSchedulersHook.createComputationScheduler()

RxJavaSchedulersHook.createComputationScheduler()

public static Scheduler createComputationScheduler() {
    return createComputationScheduler(new RxThreadFactory("RxComputationScheduler-"));
}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
    private static final long serialVersionUID = -8841098858898482335L;

    public static final ThreadFactory NONE = new ThreadFactory() {
        @Override public Thread newThread(Runnable r) {
            throw new AssertionError("No threads allowed.");
        }
    };

    final String prefix;

    public RxThreadFactory(String prefix) {
        this.prefix = prefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, prefix + incrementAndGet());
        t.setDaemon(true);
        return t;
    }
}

rxJava提供了一个线程工厂类.在创建新线程时会使用固定的前缀以及原子自增数作为线程名.

public static Scheduler createComputationScheduler(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }
    return new EventLoopsScheduler(threadFactory);
}

public final class EventLoopsScheduler extends Scheduler implements SchedulerLifecycle
public EventLoopsScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
    start();
}

@Override
public void start() {
    FixedSchedulerPool update = new FixedSchedulerPool(threadFactory, MAX_THREADS);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

createComputationScheduler会传入线程工厂RxThreadFactory,并返回一个EventLoopsScheduler.

EventLoopsScheduler的内部我们可以看到它创建了一个固定大小的线程池(FixedSchedulerPool). 该大小为MAX_THREADS。

static final int MAX_THREADS;
static {
    int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0);
    int cpuCount = Runtime.getRuntime().availableProcessors();
    int max;
    if (maxThreads <= 0 || maxThreads > cpuCount) {
        max = cpuCount;
    } else {
        max = maxThreads;
    }
    MAX_THREADS = max;
}

MAX_THREADS等于cpu核心数.所以computation scheduler创建的是一个大小为cpu核心数的线程池.这样每个线程会占用一个cpu,减少了线程切换的耗时从而提高效率,但是他不适合有阻塞的场景.因为一旦产生阻塞就会导致cpu被浪费. 因为cpu数量是有限的,如果有超过cpu数量的任务就必须排队,所以它也不适合大量数据的场景.最好是小于或等cpu数量。

FixedSchedulerPool

他是EventLoopsScheduler的内部类.

static final class FixedSchedulerPool {
    final int cores;

    final PoolWorker[] eventLoops;
    long n;

    FixedSchedulerPool(ThreadFactory threadFactory, int maxThreads) {
        // initialize event loops
        this.cores = maxThreads;
        this.eventLoops = new PoolWorker[maxThreads];
        for (int i = 0; i < maxThreads; i++) {
            this.eventLoops[i] = new PoolWorker(threadFactory);
        }
    }
}

PoolWorker

static final class PoolWorker extends NewThreadWorker {
    PoolWorker(ThreadFactory threadFactory) {
        super(threadFactory);
    }
}
public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        executor = exec;
    }

从上面可以看到PoolWorker继承了NewThreadWorker,而NewThreadWorker在构造方法中通过Executors创建了一个大小为1的ScheduledExecutorService. 所以PoolWorker和ScheduledExecutorService的关系是1:1,FixedSchedulerPool和PoolWorker的关系是1:cpu核心数。线程池ScheduledExecutorService保存在NewThreadWorker的局部变量executor中.

EvetLoopWorker

这个类会在下一篇文章用上.我们先看下他的方法实现.至于他用来干什么的,在看了下篇博客之后就明白了.

static final class EventLoopWorker extends Scheduler.Worker {
        private final SubscriptionList serial = new SubscriptionList();
        private final CompositeSubscription timed = new CompositeSubscription();
        private final SubscriptionList both = new SubscriptionList(serial, timed);
        private final PoolWorker poolWorker;

        EventLoopWorker(PoolWorker poolWorker) {
            this.poolWorker = poolWorker;

        }

        @Override
        public void unsubscribe() {
            both.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return both.isUnsubscribed();
        }

        @Override
        public Subscription schedule(final Action0 action) {
            if (isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            return poolWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    action.call();
                }
            }, 0, null, serial);
        }

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            return poolWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    action.call();
                }
            }, delayTime, unit, timed);
        }
    }

可以看到他的schedule方法都会使用poolWorker的scheduleActual.

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction, parent);
        parent.add(run);

        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

上面就是poolWorker的实现. 可以看到函数action会被封装成ScheduledAction.而ScheduledAction实现了Runnable接口.所以scheduleActual方法会讲函数ScheduledAction提交到线程池中.并且设置调度的时间(delayTime)。等到线程池executor调度到时就会调用到action的call方法:

@Override
public void run() {
    try {
        lazySet(Thread.currentThread());
        action.call();
    } catch (OnErrorNotImplementedException e) {
        signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
    } catch (Throwable e) {
        signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
    } finally {
        unsubscribe();
    }
}

上面的代码就是ScheduledAction的run()方法内容.

RxJavaSchedulersHook.createIoScheduler()

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory == null");
    }
    return new CachedThreadScheduler(threadFactory);
}

Io scheduler使用的是CachedThreadScheduler。

CachedThreadScheduler

public CachedThreadScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    this.pool = new AtomicReference<CachedWorkerPool>(NONE);
    start();
}

@Override
public void start() {
    CachedWorkerPool update =
        new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
    if (!pool.compareAndSet(NONE, update)) {
        update.shutdown();
    }
}

KEEP_ALIVE_TIME默认是60秒.这个线程池会在线程空闲60秒之后回收线程.这个线程池会创建一个调度线程evictor.每隔KEEP_ALIVE_TIME时间就检查一遍线程ThreadWorker是否过期.如果过期了的话就将其从线程队列中移除:

 static final class CachedWorkerPool {
        private final ThreadFactory threadFactory;
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        private final CompositeSubscription allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;

        CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
            this.threadFactory = threadFactory;
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeSubscription();

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                    @Override public Thread newThread(Runnable r) {
                        Thread thread = threadFactory.newThread(r);
                        thread.setName(thread.getName() + " (Evictor)");
                        return thread;
                    }
                });
                NewThreadWorker.tryEnableCancelPolicy(evictor);
                task = evictor.scheduleWithFixedDelay(
                        new Runnable() {
                            @Override
                            public void run() {
                                evictExpiredWorkers();
                            }
                        }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
                );
            }
            evictorService = evictor;
            evictorTask = task;
        }

        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }
 }

该线程池会在执行任务时优先使用空闲线程,如果没有空闲线程则创建一个新线程:

ThreadWorker get() {
    if (allWorkers.isUnsubscribed()) {
        return SHUTDOWN_THREADWORKER;
    }
    while (!expiringWorkerQueue.isEmpty()) {
        ThreadWorker threadWorker = expiringWorkerQueue.poll();
        if (threadWorker != null) {
            return threadWorker;
        }
    }

    // No cached worker found, so create a new one.
    ThreadWorker w = new ThreadWorker(threadFactory);
    allWorkers.add(w);
    return w;
}

从代码可以看出每次调用get()方法时如果没有空闲线程就会新建一个线程.所以这里如果任务非常多则会造成OOM.所以使用时需要注意.

EventLoopWorker

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
        private final CompositeSubscription innerSubscription = new CompositeSubscription();
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        final AtomicBoolean once;

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.once = new AtomicBoolean();
            this.threadWorker = pool.get();
        }

        @Override
        public void call() {
            pool.release(threadWorker);
        }
}

可以看到每次创建EventLoopWorker都会将一个线程threadWorker标记好过期时间并放到线程队列expiringWorkerQueue的队尾:

void release(ThreadWorker threadWorker) {
    threadWorker.setExpirationTime(now() + keepAliveTime);
    expiringWorkerQueue.offer(threadWorker);
}

提示

Scheduler的主要内容就是这些. 当然这篇博客看上去没有将各个方法连起来.那是因为这一篇也没法把他们连起来,流程是怎样的要在下篇博客中才讲到.看官可以先将内容在脑海中留下印象.以备看接下来的文章。