Schedulers
observeOn和subscribeOn方法将Scheduler作为参数。顾名思义,Scheduler是一种用来安排执行各个操作的工具。将如何调用操作的细节取决于所使用的Scheduler的实现。您可以创建自己的Scheduler实现,但大多数时候您会发现RxJava已经为您提供了一组针对常见情况的Scheduler。您可以从Scheduler上的工厂方法获取现有实现。
现有的Scheduler如下:
- immediate 同步执行Scheduler操作。
- trampoline 队列在当前线程完成之后执行。
- newThread 为每个计划的工作单元创建一个新线程。
- computation 用于cpu工作
- io 用于io工作
- test 用于测试和调试
在当前实现中,computation和io调度器实际上不是单一实现。这种分离的关键是要有不同的实例,同时也记录你的意图。
许多Rx operators都在内部使用schedules。您到目前为止看到的Observable运算符,所有异步运算符都有重载调度程序。您可以指定每个operator使用的schedules。
Advanced features of schedulers
用于Rx schedule的方法和实现不是特定于Rx的。实际上,它们比较标准,可以在没有任何Rx代码的情况下使用。除了设计自定义异步操作符之外,通常不必直接使用schedule。在自定义运算符中使用schedule不仅方便,而且还允许异步运算符变得可测试。
createWorker()部分有点趣的,它返回一个Scheduler.Worker。worker接受操作并在单个线程上按顺序执行它们。在某种程度上,worker本身就是一个schedule,但我们不会将其称为schedule以避免混淆。
Scheduling an action
然后,该操作将排队等待在分配了该worker的线程上执行。
正如您对schedule所期望的那样,您还可以使用以下方法安排一次或重复执行的操作:
我们可以在这里看到,执行的延迟是从调度时刻开始计算的。指定的时间不是任务之间的强制睡眠时间。如果有工作准备好执行,worker人可以同时工作。
Canceling work
Scheduler.Worker继承啦Subscription。在worker上调用unsubscribe方法将导致队列被清空并且所有待处理的任务都被取消。我们可以通过修改前面的例子来看到。
第二个任务永远不会被执行,因为它之前的任务取消了一切。正在执行的操作将被中断。在下一个示例中,我们将创建一个睡眠时间为2000毫秒的任务。在它开始执行后500ms我们取消了对工人的所有工作。这会导致InterruptedException
正如看到的那样,schedule返回Subscription。您可以通过在安排时创建的Subscription取消单个任务。
ImmediateScheduler
ImmediateScheduler根本不做任何调度。scheduler只是同步执行操作,并在操作完成时返回。嵌套的调度请求将导致以递归方式执行操作。
输出:
TrampolineScheduler
TrampolineScheduler的worker也是同步的,但不会嵌套任务。相反,它从初始任务开始,执行时调度的任何任务将在当前任务完成后排队等候。
输出:
NewThreadScheduler
NewThreadScheduler创建每个都有自己的线程的工作者。每个计划任务都将在与该特定工作者相对应的线程上执行。
输出:
下届再续!
原文:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%204%20-%20Concurrency/1.%20Scheduling%20and%20threading.md