一点一点啃RxJava(二#线程调度#)

        用RxJava实现线程切换是非常方便的,而且线程切换也是RxJava最大的特点之一。

        依旧是先上一段示例代码和执行结果

Observable
                .create(new ObservableOnSubscribe<Object>() {
                    @Override
                    public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
                        Log.d("rxrx", "subscribe:" + Thread.currentThread().getName());
                        int i = 1+2+3+4+5+6+7+8+9+10;
                        observableEmitter.onNext(i);
                        i++;
                        observableEmitter.onNext(i);
                        observableEmitter.onComplete();
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {
                        Log.d("rxrx", "onSubscribe:" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Object s) {
                        Log.d("rxrx", "onNext:" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        Log.d("rxrx", "onError:" + throwable.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("rxrx", "onComplete:" + Thread.currentThread().getName());
                    }
                });
04-23 13:26:23.658 14381-14381/com.newhongbin.lalala D/rxrx: onSubscribe:main
04-23 13:26:23.659 14381-15071/com.newhongbin.lalala D/rxrx: subscribe:RxNewThreadScheduler-1
04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onNext:main
04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onNext:main
04-23 13:26:23.660 14381-14381/com.newhongbin.lalala D/rxrx: onComplete:main

        本文的示例代码与前文的示例代码主要差别就是两行:

.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

        Schedulers.newThread()返回的是从线程池中获取的一个线程,AndroidScheculers.mainThread()就是Android主线程,从执行结果可以看到,被观察者的subscribe方法是在新线程里面执行的,观察者的onSubscribe、onNext、onComplete/onError都是在主线程中执行的,很明显这两行代码的作用就是切换了代码执行的线程。

        关于这段示例代码我画了一张图来说明:


        ObservableCreate、ObservableSubscribeOn、ObservableObserveOn都是Observable的子类,很好的表现了RxJava的链式调用,使得代码更简洁。(一开始看这个源码的时候类名真的看得我快晕了,感觉都是差不多的命名,看久了都要看花眼。。。)在这里我们分别看一下这三个类的构造方法:

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
        可以看到每个类的构造方法都有一个source,而这个source就是链式调用的上游对象,这一点在上图中我表现为不同的颜色代表不同的类对象。
    梳理了大致的流程,然后来看看具体的代码实现,subscribeOn方法的参数是一个调度器,返回对象是ObservableSubscribeOn,调度器的作用简单来说就是指定代码所执行的线程,这个后面再讲。既然ObservableSubscribeOn是Observable的子类,而subscribe方法的真正实现又是在subscribeActual方法中,那么我们来看看ObservableSubscribeOn的subscribeActual方法的具体实现:
public void subscribeActual(Observer<? super T> s) {
        final ObservableSubscribeOn.SubscribeOnObserver parent = new ObservableSubscribeOn.SubscribeOnObserver(s);
        s.onSubscribe(parent);
        parent.setDisposable(this.scheduler.scheduleDirect(new Runnable() {
            public void run() {
                ObservableSubscribeOn.this.source.subscribe(parent);
            }
        }));
    }

        parent是进行封装过的观察者,封装完观察者之后,就执行onSubscribe方法,此时还没有进行线程调度,所以该方法是在主线程执行。继续往下走,调度器开始工作了,它在这里的作用就是将subscribe方法包装成一个任务,将该任务交给调度器的线程执行。执行subscribe方法的对象是ObservableSubscribe中的source,结合图片看,该source是ObservableCreate;ObservableCreate中执行subscribe方法的对象又是ObservableCreate的source,结合图片看,这里的source是ObservableOnSubscribe。到此为止,就完成了被观察者的线程调度,将

public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
                        Log.d("rxrx", "subscribe:" + Thread.currentThread().getName());
                        int i = 1+2+3+4+5+6+7+8+9+10;
                        observableEmitter.onNext(i);
                        i++;
                        observableEmitter.onNext(i);
                        observableEmitter.onComplete();
                    }

这些代码全部调度给了调度线程执行。

        如果不调用observeOn指定观察者的调度线程的话,观察者的事件接收处理和被观察者会在同一个线程。接下来看看observeOn方法。与subscribeOn一样,首先用调度器指定观察者的线程,示例代码中的调度器线程即Android中的主线程,该方法的返回对象是ObservableObserveOn,同样的我们也来看看ObservableObserveOn的subscribeActual方法:

protected void subscribeActual(Observer<? super T> observer) {
        if(this.scheduler instanceof TrampolineScheduler) {
            this.source.subscribe(observer);
        } else {
            Worker w = this.scheduler.createWorker();
            this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
        }

    }

        首先判断调度器是不是指定的当前线程,如果是,那。。。那就跟没有指定一样啦,执行subscribe方法,这里的source是ObservableSubscribeOn,所以又回到了上文说的流程中,即不指定观察者的调度器。如果不是当前线程,依然是执行subscribe方法,只不过观察者变成了ObserveOnObserver。

        ObserveOnObserver构造方法:

ObserveOnObserver(Observer<? super T> actual, Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        actual是我们传进来的初始Observser,worker可以简单理解为是调度器指定的工作线程。根据前文知道被观察者最后执行的onNext、onError、onComplete都会调用观察者的onNext、onError、onComplete,那么我们看看ObserveOnObserver的这三个方法做了什么:

public void onNext(T t) {
            if(!this.done) {
                if(this.sourceMode != 2) {
                    this.queue.offer(t);
                }

                this.schedule();
            }
        }
public void onError(Throwable t) {
            if(this.done) {
                RxJavaPlugins.onError(t);
            } else {
                this.error = t;
                this.done = true;
                this.schedule();
            }
        }
public void onComplete() {
            if(!this.done) {
                this.done = true;
                this.schedule();
            }
        }

        这三个方法好像都跟actual和worker没什么关系,但是我们捕捉到了两个点:

        1、所有onNext方法的参数都会被放到一个队列里面

        2、这三个方法都会执行schedule方法。

        看来关键的点还在后面啊

void schedule() {
            if(this.getAndIncrement() == 0) {
                this.worker.schedule(this);
            }
        }

        终于看到线程切换了,ObserveOnObserver实现了Runnable接口,schedule方法ObserveOnObserver自身调度进工作线程,再结合onNext的队列,是不是很像Android里面的Thread、Handler、MessageQueue、Message的模式呢。切换到工作线程之后,下一步就是等待执行Runnable的run方法了

public void run() {
            if(this.outputFused) {
                this.drainFused();
            } else {
                this.drainNormal();
            }
        }
        一般情况下,drainFused()方法交由actual执行执行onError和onComplete事件;drainNormal()方法执行onNext事件,循环将队列中的事件取出来交由actual执行,具体代码就不再展示了。因为run()方法被是由指定线程执行的,所以 onNext、onError、onComplete方法都是在指定线程执行,这样观察者的线程调度也就完成了。


schedule调度器

        前面讲了这么多的schedule,现在来讲讲它具体做了什么。通过Schedules可以得到几种RxJava定义好的schedule,比如SINGLE、COMPUTATION、IO、NEW_THREAD等。本文只分析示例中的Schedules.newThread()。

        newThreadSchedules类:

public final class NewThreadScheduler extends Scheduler {
    final ThreadFactory threadFactory;
    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    public Worker createWorker() {
        return new NewThreadWorker(this.threadFactory);
    }

    static {
        int priority = Math.max(1, Math.min(10, Integer.getInteger("rx2.newthread-priority", 5).intValue()));
        THREAD_FACTORY = new RxThreadFactory("RxNewThreadScheduler", priority);
    }
}

        前文分析subscribeOn方法和observeOn方法的时候,涉及到Scheduler的代码分别是

this.scheduler.scheduleDirect(new Runnable() {
            public void run() {
                ···
            }
        })
Worker w = this.scheduler.createWorker();
···
this.worker.schedule(this);

        而scheduler.scheduleDirect方法最后也是执行的worker的schedule方法,所以subscribeOn和observeOn都是把事件装进runnable交给worker处理,那我们只需要关心worker做了什么就可以了。Scheduler是一个抽象类,createWorker()方法返回Worker对象,理所当然不同的调度器返回的Worker也是不一样的。newThreadScheduler的createWorker()返回的是newThreadWorker类。

        newThreadWorker的schedule方法最核心的就是这两句:

if(delayTime <= 0L) {
                    f = this.executor.submit(sr);
                } else {
                    f = this.executor.schedule(sr, delayTime, unit);
                }

立即执行sr或者延迟执行sr,sr是将Runnable再包装后生成的Runnable。这里又出现了一个新东西executor,再回到newThreadWorker的构造方法:

private final ScheduledExecutorService executor;

public NewThreadWorker(ThreadFactory threadFactory) {
    this.executor = SchedulerPoolFactory.create(threadFactory);
}

        这里的executor是ThreadPoolExecutor的子类,实现了ScheduledExecutorService接口,是一个corePoolSize为1的线程池。这下就很明朗了,Worker的工作就是创建对应的线程池,将Runnable交给线程池处理。(在Android中我们可以选择将Runnable交给主线程的Handler达到切换到主线程处理的目的)


注意

        一般来说不太会这么用,但是还是要提醒一下,多次调用subscribeOn或者多次调用observeOn的情况。

       一段代码中,对不同的Observable调用多次subscribeOn是没有影响的,但是对同一个Obsrvable调用多次subscribeOn指定Scheduler,最终生效的是第一个指定的Scheduler;对Observer指定多个Scheduler,最终生效的是最后指定的Scheduler。



猜你喜欢

转载自blog.csdn.net/u013933272/article/details/80049231