这次聊的是RxJava的线程切换流程,先看一张图:
这个时序图对应的RxJava代码:
public void demo2(){
createObservable()
.subscribeOn(Schedulers.newThread())//被观察者在新线程执行subscribe
.observeOn(AndroidSchedulers.mainThread())//观察者在主线程执行onNext onComplete
.subscribe(createObserver());
}
//创建观察者
private Observer createObserver() {
return new Observer() {
@Override
public void onSubscribe(Disposable d) {
printThreadId("onSubscribe");
}
@Override
public void onNext(Object o) {
printThreadId("onNext" + o);
}
@Override
public void onError(Throwable e) {
printThreadId("onError" + e.getMessage());
}
@Override
public void onComplete() {
printThreadId("onComplete");
}
};
}
//创建被观察者
private Observable createObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
printThreadId("subscribe");
e.onNext(1);
e.onComplete();
}
});
}
执行结果:
/**
12-06 15:34:35.577 27544-27544/com.study.rxjavademo I/ThreadDemos: onSubscribe thread_name: main
12-06 15:34:35.587 27544-27608/com.study.rxjavademo I/ThreadDemos: subscribe thread_name: RxNewThreadScheduler-1
12-06 15:34:35.627 27544-27544/com.study.rxjavademo I/ThreadDemos: onNext1 thread_name: main
12-06 15:34:35.627 27544-27544/com.study.rxjavademo I/ThreadDemos: onComplete thread_name: main
*/
上面的代码什么意思呢?
我们可以把上述代码模拟一个网络请求:
1.被观察者是网络请求
//创建被观察者
private Observable createObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
printThreadId("subscribe");
e.onNext(1);
e.onComplete();
}
});
}
subscribe方法执行的是耗时操作,所以我们应该让它在非主线程执行。
2.观察者相当于等待网络返回的回调
//创建观察者
private Observer createObserver() {
return new Observer() {
@Override
public void onSubscribe(Disposable d) {
printThreadId("onSubscribe");
}
@Override
public void onNext(Object o) {
printThreadId("onNext" + o);
}
@Override
public void onError(Throwable e) {
printThreadId("onError" + e.getMessage());
}
@Override
public void onComplete() {
printThreadId("onComplete");
}
};
}
我们希望在onNext返回结果进行一些ui调整,所以应该设置在主线程。
3.如何设置在哪个线程呢?
.subscribeOn(Schedulers.newThread())//被观察者在新线程执行subscribe
.observeOn(AndroidSchedulers.mainThread())//观察者在主线程执行onNext onComplete
现在场景有了,结果也有了。那这些在RxJava中是如何实现的呢?
接下来分析整个过程的源码。
首先创建被观察者
//创建被观察者
private Observable createObservable() {
return Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
printThreadId("subscribe");
e.onNext(1);
e.onComplete();
}
});
}
根据RxJava源码分析(一),上段代码最终生成的被观察者是ObservableCreate。
过程:通过create方法将参数ObservableOnSubscribe传递给ObservableCreate,ObservableOnSubscribe保存为source字段。
切换被观察者的线程
subscribeOn(Schedulers.newThread())
这句代码,将被观察者的public void subscribe(ObservableEmitter e)方法执行在新的线程。
new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
//新线程执行
printThreadId("subscribe");
e.onNext(1);
e.onComplete();
}
}
接下来我们看一下这段代码的源码实现
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
重点关注:
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
可以理解为new了一个ObservableSubscribeOn被观察者并返回,this是上面的ObservableCreate
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
...
}
这段代码重点在于:ObservableCreate保存为source字段,后面调用会用到。
切换观察者线程
.observeOn(AndroidSchedulers.mainThread())
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
返回了observeOn(scheduler, false, bufferSize());
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这段代码表明,真正返回的被观察者是ObservableObserveOn,并且this表示ObservableSubscribeOn。还是同样的套路,将ObservableSubscribeOn保存为source字段
订阅观察者
.subscribe(createObserver());
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
这里只需要关注:subscribeActual(observer);
这段代码实际的执行者是:ObservableObserveOn
所以接下来的重点就转移到了ObservableObserveOn的subscribeActual()方法
ObservableObserveOn.subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
代码很简单,就是将source(实际是:ObservableSubscribeOn)订阅了ObserveOnObserver(对observer进行了封装)
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
}
接下来既然ObservableSubscribeOn订阅了ObserveOnObserver,那么肯定就是执行ObservableSubscribeOn的subscribeActual方法了
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
上面的代码
1.首先将传入的观察者ObserveOnObserver进行了一层装饰,转换成了SubscribeOnObserver,
2.然后通过parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))实现了线程的切换。
首先看SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它其实是一个Runnable
run方法将source(ObservableCreate)和parent(SubscribeOnObserver)联系了起来
即:ObservableCreate订阅了SubscribeOnObserver
这里插一段代码,即线程是如何切换的呢?
看下面源码
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这里先留一个疑问,最后再统一解释。
我们继续:
即:ObservableCreate订阅了SubscribeOnObserver
订阅以后,那么按照套路肯定就是就需要执行ObservableCreate的subscribeActual方法了
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这一段代码,依然延续了之前的套路,即:
1.将SubscribeOnObserver传入CreateEmitter,并保存起来
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}
上面方法可以看到ObservableEmitter的onNext方法最终是执行了observer.onNext(t)方法
2.source.subscribe(parent)
这里的source肯定就是ObservableOnSubscribe了,也就是调用了
ObservableOnSubscribe的subscribe()方法了,即:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
printThreadId("subscribe");
e.onNext(1);
e.onComplete();
}
});
所以就可以解释为什么:
代码中的e.onNext(1)调用会反馈给观察者的onNext方法了
至此整个流程分析完了,现在还留下一个问题,
线程是怎么切换的?
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
由于我们创建的Scheduler是Schedulers.newThread(),这里的Scheduler其实是NewThreadScheduler
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
所以final Worker w = createWorker()创建的的worker是NewThreadWorker
结下来看NewTheadWorker的schedule源码,发现最终执行的是如下代码:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
f = executor.submit((Callable)sr);这句话创建了一个新的线程,执行了run方法
因此:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这里面的run方法就在新线程中执行了,即source(实际为:ObservableCreate)的actual方法在子线程执行,即ObservableOnSubscribe的subscribe方法在子线程执行。
对于
todo
当ObservableEmitter的onNext执行时,最终会调用ObserveOnObserver的onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
最终会调用worker.schedule(this);
这里的worker是HandlerScheduler的HandlerWorker,下面是HanderWorker的shedule方法
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
可以看到最终实现是:
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
通过handler发送消息来执行,而这个handler是在主线程创建的,所以最终onNext是在主线程执行。
为什么handler是主线程创建的呢?
答案在AndroidSchedulers的源码中:
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));这句话说明了一切。