用RxJava实现线程切换是非常方便的,而且线程切换也是RxJava最大的特点之一。
依旧是先上一段示例代码和执行结果
Observable .create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception { Log.d("rxrx", "subscribe:" + Thread.currentThread().getName()); int i = 1+2+3+4+5+6+7+8+9+10; observableEmitter.onNext(i); i++; observableEmitter.onNext(i); observableEmitter.onComplete(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable disposable) { Log.d("rxrx", "onSubscribe:" + Thread.currentThread().getName()); } @Override public void onNext(Object s) { Log.d("rxrx", "onNext:" + Thread.currentThread().getName()); } @Override public void onError(Throwable throwable) { Log.d("rxrx", "onError:" + throwable.getMessage()); } @Override public void onComplete() { Log.d("rxrx", "onComplete:" + Thread.currentThread().getName()); } });
04-23 13:26:23.658 14381-14381/com.newhongbin.lalala D/rxrx: onSubscribe:main 04-23 13:26:23.659 14381-15071/com.newhongbin.lalala D/rxrx: subscribe:RxNewThreadScheduler-1 04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onNext:main 04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onNext:main 04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onComplete:main
本文的示例代码与前文的示例代码主要差别就是两行:
.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread())
Schedulers.newThread()返回的是从线程池中获取的一个线程,AndroidScheculers.mainThread()就是Android主线程,从执行结果可以看到,被观察者的subscribe方法是在新线程里面执行的,观察者的onSubscribe、onNext、onComplete/onError都是在主线程中执行的,很明显这两行代码的作用就是切换了代码执行的线程。
关于这段示例代码我画了一张图来说明:
ObservableCreate、ObservableSubscribeOn、ObservableObserveOn都是Observable的子类,很好的表现了RxJava的链式调用,使得代码更简洁。(一开始看这个源码的时候类名真的看得我快晕了,感觉都是差不多的命名,看久了都要看花眼。。。)在这里我们分别看一下这三个类的构造方法:
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; }
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; }可以看到每个类的构造方法都有一个source,而这个source就是链式调用的上游对象,这一点在上图中我表现为不同的颜色代表不同的类对象。
梳理了大致的流程,然后来看看具体的代码实现,subscribeOn方法的参数是一个调度器,返回对象是ObservableSubscribeOn,调度器的作用简单来说就是指定代码所执行的线程,这个后面再讲。既然ObservableSubscribeOn是Observable的子类,而subscribe方法的真正实现又是在subscribeActual方法中,那么我们来看看ObservableSubscribeOn的subscribeActual方法的具体实现:
public void subscribeActual(Observer<? super T> s) { final ObservableSubscribeOn.SubscribeOnObserver parent = new ObservableSubscribeOn.SubscribeOnObserver(s); s.onSubscribe(parent); parent.setDisposable(this.scheduler.scheduleDirect(new Runnable() { public void run() { ObservableSubscribeOn.this.source.subscribe(parent); } })); }
parent是进行封装过的观察者,封装完观察者之后,就执行onSubscribe方法,此时还没有进行线程调度,所以该方法是在主线程执行。继续往下走,调度器开始工作了,它在这里的作用就是将subscribe方法包装成一个任务,将该任务交给调度器的线程执行。执行subscribe方法的对象是ObservableSubscribe中的source,结合图片看,该source是ObservableCreate;ObservableCreate中执行subscribe方法的对象又是ObservableCreate的source,结合图片看,这里的source是ObservableOnSubscribe。到此为止,就完成了被观察者的线程调度,将
public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception { Log.d("rxrx", "subscribe:" + Thread.currentThread().getName()); int i = 1+2+3+4+5+6+7+8+9+10; observableEmitter.onNext(i); i++; observableEmitter.onNext(i); observableEmitter.onComplete(); }
这些代码全部调度给了调度线程执行。
如果不调用observeOn指定观察者的调度线程的话,观察者的事件接收处理和被观察者会在同一个线程。接下来看看observeOn方法。与subscribeOn一样,首先用调度器指定观察者的线程,示例代码中的调度器线程即Android中的主线程,该方法的返回对象是ObservableObserveOn,同样的我们也来看看ObservableObserveOn的subscribeActual方法:
protected void subscribeActual(Observer<? super T> observer) { if(this.scheduler instanceof TrampolineScheduler) { this.source.subscribe(observer); } else { Worker w = this.scheduler.createWorker(); this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize)); } }
首先判断调度器是不是指定的当前线程,如果是,那。。。那就跟没有指定一样啦,执行subscribe方法,这里的source是ObservableSubscribeOn,所以又回到了上文说的流程中,即不指定观察者的调度器。如果不是当前线程,依然是执行subscribe方法,只不过观察者变成了ObserveOnObserver。
ObserveOnObserver构造方法:
ObserveOnObserver(Observer<? super T> actual, Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; }
actual是我们传进来的初始Observser,worker可以简单理解为是调度器指定的工作线程。根据前文知道被观察者最后执行的onNext、onError、onComplete都会调用观察者的onNext、onError、onComplete,那么我们看看ObserveOnObserver的这三个方法做了什么:
public void onNext(T t) { if(!this.done) { if(this.sourceMode != 2) { this.queue.offer(t); } this.schedule(); } }
public void onError(Throwable t) { if(this.done) { RxJavaPlugins.onError(t); } else { this.error = t; this.done = true; this.schedule(); } }
public void onComplete() { if(!this.done) { this.done = true; this.schedule(); } }
这三个方法好像都跟actual和worker没什么关系,但是我们捕捉到了两个点:
1、所有onNext方法的参数都会被放到一个队列里面
2、这三个方法都会执行schedule方法。
看来关键的点还在后面啊
void schedule() { if(this.getAndIncrement() == 0) { this.worker.schedule(this); } }
终于看到线程切换了,ObserveOnObserver实现了Runnable接口,schedule方法将ObserveOnObserver自身调度进工作线程,再结合onNext的队列,是不是很像Android里面的Thread、Handler、MessageQueue、Message的模式呢。切换到工作线程之后,下一步就是等待执行Runnable的run方法了
public void run() { if(this.outputFused) { this.drainFused(); } else { this.drainNormal(); } }一般情况下,drainFused()方法交由actual执行执行onError和onComplete事件;drainNormal()方法执行onNext事件,循环将队列中的事件取出来交由actual执行,具体代码就不再展示了。因为run()方法被是由指定线程执行的,所以 onNext、onError、onComplete方法都是在指定线程执行,这样观察者的线程调度也就完成了。
schedule调度器
前面讲了这么多的schedule,现在来讲讲它具体做了什么。通过Schedules可以得到几种RxJava定义好的schedule,比如SINGLE、COMPUTATION、IO、NEW_THREAD等。本文只分析示例中的Schedules.newThread()。
newThreadSchedules类:
public final class NewThreadScheduler extends Scheduler { final ThreadFactory threadFactory; private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler"; private static final RxThreadFactory THREAD_FACTORY; private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority"; public NewThreadScheduler() { this(THREAD_FACTORY); } public NewThreadScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; } @NonNull public Worker createWorker() { return new NewThreadWorker(this.threadFactory); } static { int priority = Math.max(1, Math.min(10, Integer.getInteger("rx2.newthread-priority", 5).intValue())); THREAD_FACTORY = new RxThreadFactory("RxNewThreadScheduler", priority); } }
前文分析subscribeOn方法和observeOn方法的时候,涉及到Scheduler的代码分别是
this.scheduler.scheduleDirect(new Runnable() { public void run() { ··· } })
Worker w = this.scheduler.createWorker(); ··· this.worker.schedule(this);
而scheduler.scheduleDirect方法最后也是执行的worker的schedule方法,所以subscribeOn和observeOn都是把事件装进runnable交给worker处理,那我们只需要关心worker做了什么就可以了。Scheduler是一个抽象类,createWorker()方法返回Worker对象,理所当然不同的调度器返回的Worker也是不一样的。newThreadScheduler的createWorker()返回的是newThreadWorker类。
newThreadWorker的schedule方法最核心的就是这两句:
if(delayTime <= 0L) { f = this.executor.submit(sr); } else { f = this.executor.schedule(sr, delayTime, unit); }
立即执行sr或者延迟执行sr,sr是将Runnable再包装后生成的Runnable。这里又出现了一个新东西executor,再回到newThreadWorker的构造方法:
private final ScheduledExecutorService executor; public NewThreadWorker(ThreadFactory threadFactory) { this.executor = SchedulerPoolFactory.create(threadFactory); }
这里的executor是ThreadPoolExecutor的子类,实现了ScheduledExecutorService接口,是一个corePoolSize为1的线程池。这下就很明朗了,Worker的工作就是创建对应的线程池,将Runnable交给线程池处理。(在Android中我们可以选择将Runnable交给主线程的Handler达到切换到主线程处理的目的)
注意
一般来说不太会这么用,但是还是要提醒一下,多次调用subscribeOn或者多次调用observeOn的情况。
一段代码中,对不同的Observable调用多次subscribeOn是没有影响的,但是对同一个Obsrvable调用多次subscribeOn指定Scheduler,最终生效的是第一个指定的Scheduler;对Observer指定多个Scheduler,最终生效的是最后指定的Scheduler。