学习小目标
1、了解Schedulers类,以及通过其创建的各种Scheduler的用途
2、结合上一篇RxJava系列1:网络请求中的线程切换理解该类创建的各类型线程作用
讲到了线程的切换,其中使用到了observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
这篇文章主要是学习Schedulers.io()和AndroidSchedulers.mainThread()等一些列相关参数,首先我们需要了解Schedulers类
1、Schedulers类
用于返回标准Scheduler实例的静态工厂方法
该类通过静态内部类的方式返回不同的Scheduler,先看源码,然后对各实例进行解释
Schedulers源码:
public final class Schedulers {
static final Scheduler SINGLE;
static final Scheduler COMPUTATION;
static final Scheduler IO;
static final Scheduler TRAMPOLINE;
static final Scheduler NEW_THREAD;
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
});
COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
});
IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
/** Utility class. */
private Schedulers() {
throw new IllegalStateException("No instances!");
}
/**
* Creates and returns a {@link Scheduler} intended for computational work.
* <p>
* This can be used for event-loops, processing callbacks and other computational work.
* <p>
* Do not perform IO-bound work on this scheduler. Use {@link #io()} instead.
* <p>
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
*
* @return a {@link Scheduler} meant for computation-bound work
*/
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
/**
* Creates and returns a {@link Scheduler} intended for IO-bound work.
* <p>
* The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
* <p>
* This can be used for asynchronously performing blocking IO.
* <p>
* Do not perform computational work on this scheduler. Use {@link #computation()} instead.
* <p>
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
*
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
/**
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
* current work completes.
*
* @return a {@link Scheduler} that queues work on the current thread
*/
public static Scheduler trampoline() {
return TRAMPOLINE;
}
/**
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
* <p>
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
*
* @return a {@link Scheduler} that creates new threads
*/
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
/**
* Returns the common, single-thread backed Scheduler instance.
* <p>
* Uses:
* <ul>
* <li>main event loop</li>
* <li>support Schedulers.from(Executor) and from(ExecutorService) with delayed scheduling</li>
* <li>support benchmarks that pipeline data from the main thread to some other thread and
* avoid core-bashing of computation's round-robin nature</li>
* </ul>
* @return a {@link Scheduler} that shares a single backing thread.
* @since 2.0
*/
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
/**
* Converts an {@link Executor} into a new Scheduler instance.
*
* @param executor
* the executor to wrap
* @return the new Scheduler wrapping the Executor
*/
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
/**
* Shuts down those standard Schedulers which support the SchedulerLifecycle interface.
* <p>The operation is idempotent and thread-safe.
*/
public static void shutdown() {
computation().shutdown();
io().shutdown();
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
SchedulerPoolFactory.shutdown();
}
/**
* Starts those standard Schedulers which support the SchedulerLifecycle interface.
* <p>The operation is idempotent and thread-safe.
*/
public static void start() {
computation().start();
io().start();
newThread().start();
single().start();
trampoline().start();
SchedulerPoolFactory.start();
}
}
public static Scheduler computation()
创建并返回用于计算工作的Scheduler,可以用于事件循环,处理回调和其他计算工作,不能在此调度程序上执行IO绑定工作
public static Scheduler io()
创建并返回用于IO绑定工作的Scheduler,该实现由Executor线程池支持,该线程池将根据需要增长,这可用于异步执行阻塞IO, 不要在此调度程序上执行计算工作。
在类线程上可执行io操作,如网络访问,数据库操作
public static Scheduler trampoline()
创建并返回一个Scheduler,该Scheduler将在当前工作完成之后在当前线程按照列队执行。
public static Scheduler newThread()
创建并返回Scheduler,该Scheduler会为每个工作单元创建一个新的线程,也就是说每个任务都会在一个新的线程中执行
public static Scheduler single()
创建并返回Scheduler,该Scheduler是一个单独的后台线程实例,将被所有的任务共用
public static Scheduler from(Executor executor)
将传入的Executor转换为新的Scheduler实例
AndroidSchedulers.mainThread()来至另外一个包,需要gradle中导入
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
AndroidSchedulers源码如下:
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
public static Scheduler mainThread()
会返回一个在Android主线程上执行操作的Scheduler;
public static Scheduler from(Looper looper)
返回一个在传入的Looper对应的线程上执行的Scheduler
2、各类型线程的作用
根据源码的解释我们可以自然而然的知道,
subscribeOn(Schedulers.io())的意思是在IO线程上注册对被观察者的监听,也就是让我们的网络操作在io线程上执行,并被观察
observeOn(AndroidSchedulers.mainThread())的意思是在主线程监听数据变化,并可以执行UI相关操作。
就目前我也只是用过以上两种,其他的使用后续再做补充