首先我提出来两个问题:
1.Rxjava是如何做到线程切换的
2.线程切换我多次调用subscribeOn和多次调用observeOn,对数据流有什么影响
对Rxjava的数据处理流程不是很清楚的,可以看我上一篇文章
数据处理及订阅流程分析
下面这段代码相信很多人都写过
upstream.subscribeOn(SchedulerProvider.net())
.observeOn(AndroidSchedulers.mainThread());
上游的Observable进行线程切换在子线程发射,然后订阅者代码运行到主线程。
接下来我们看一下subscribeOn的源码
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
创建了一个ObservableSubscribeOn并返回,把自身与scheduler通过构造方法传入到ObservableSubscribeOn。显然ObservableSubscribeOn一定继承了Observable.这里我们发现,向下游传递的Observable已经被替换成了ObservableSubscribeOn.上游的Observable被ObservableSubscribeOn接管了,又是代理模式的应用。分析ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
构造方法里把上游的observable赋值到自己的source成员变量,同时把Scheduler保存到自己的成员变量。然后唯一的功能方法subscribeActual(observer),非常得简洁。把下游的observer通过SubscribeOnObserver代理,然后调用parent的setDisposable方法,参数为scheduler.scheduleDirect(new SubscribeTask(parent))。
我们一看SubscribeOnObserver这个observer的源码便能清楚是什么鬼了
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
没几行代码,也很简洁,关注一下setDisposable(Disposable )方法,其实就是把scheduler返回的Disposable赋值给自己。也就是说ObservableSubscribeOn可以dispose,scheduler返回的那个disposable.通过我的上篇文章数据处理及订阅流程分析我们知道ObservableSubscribeOn的subscribeActual方法执行发生在数据源被订阅的时候。也就是说订阅的时候把下游的observer包装为一个SubscribeTask,在scheduler的scheduleDirect方法中执行。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这里的source就是上游的Observable,上游的observable的subscribe方法在run方法中运行。scheduler作为后续文章讲,它的scheduleDirect方法就是把SubscribeTask运行在子线程。
再梳理一遍:当最下游的observable发生订阅的时候 -->会调用ObservableSubscribeOn的subscribeActual(observer)方法,这里的参数就是我们下游的observer,这个observer被包装后作为一个task,上游的source.subscribe(parent)方法运行在子线程,parent实例为SubscribeOnObserver,,上游source.subscribe方法内部实现,都会想方设法执行parent的onNext,onComplete等方法,就达成了包装类SubscribeOnObserver(即parent)这些方法在子线程运行,同时返回一个disposable,这个disposable给Observer用来取消订阅。又有些饶了,其实一个时序图就能说明一切。点击查看大图
建议读者跟着源码一起看,思考一下,就明白了。
回答第二个问题,多次到用subscribeOn会怎样?subscribeOn会把下游的Observer方法的onNext,onComplete,onError方法包装在SubscribeOnObserver方法中,这些方法会在订阅时运行在子线程的task中,多次调用会多次包装下游的Observer,进行线程切换到子线程。那多次切换最终会切换成啥样呢?其实仔细想一下,答案就在这段代码中
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
下游的subscrbeActual先执行,当然最靠近上游的,是程序最后一次执行subscrbeActual方法啦,就是第一次调用subscribeOn在哪个线程中执行,最终结果会在哪个线程中执行。这个要牢记下游的subscribeActual最先执行就能知道结果了。因为每次subscribeActual都会切换线程(仅针对ObservableSubscribeOn里的方法)。
接下来分析observerOn方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
创建一个 ObservableObserveOn返回,老样子创建了一个新的Observable向下传递,看ObservableSubscribeOn源码
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
构造方法与上面分析相似 ,直接看subscribeActual方法,暂时不关心scheduler instanceof TrampolineScheduler的情况,接下来就是上游Observable source订阅ObserveOnObserver,看ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
....省略一些
}
代码稍微有些多,看schedule()方法,worker.schedule(this);拿worker执行自己的runable方法,runable方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainFused();
}
}
有兴趣的可以分析drainFused,drainFused,我在以后的文章中会单独分析这drain相关方法。其实它就是调用了下游对应的onNext,onCompete,onError等方法,
梳理一下:也就是说我们每次调用observeOn(scheduler)方法,都会包装一个新的Observable向下传递,同时把下游的Observer包装成ObserveOnObserver,又是代理,通过它里面的worker在上游的Observer执行onNext等方法,在ObserveOnObserver的onNext做线程切换,调用下游的observer的onNext方法,从而实现线程切换。
那么多次调用observeOn方法时,会怎样呢?思考一下,其实很容易了,Observer的发射方法调用顺序是从上游,一直传递到包装下游,也就是说最后一次执行线程切换有效,当然中间的线程切换也有效果,只是最后一次Observer为我们传递的Observer而已,所以我们的Observer方法的onNext相关方法在哪个线程,就取决于最后一次调用了。
备注:Observer的onSubscribe方法,是从上游向下游调用的,这一点恰好跟subscribeOnActual相反。仔细想一下就知道了。其实说一千道一万,也没有自己梳理一遍强。