示例
一个简单的HelloWorld:
Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
没有使用任何subscribeOn
和observeOn
时开始和结束都运行在默认的main线程中。添加一个
subscribeOn
:Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
很显然subscribeOn
指定了任务开始执行时的线程,如果中途没有再切换线程,结果也会在该线程中接收。在2的基础上再添加一个
observeOn
:Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
使用observeOn
后,结果在新线程中接收,如果再切换一次呢?在3的基础上再添加一个
observeOn
,并在两个observeOn之间添加一个map操作,打印线程名:Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { Log.e(TAG, "Map:" + Thread.currentThread().getName()); return "Complete"; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
observeOn
改变了map操作所在的线程,并在随后的observeOn
再次切换线程。在4的基础上再添加一个
subscribeOn
:Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { Log.e(TAG, "Map:" + Thread.currentThread().getName()); return "Complete"; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
新添加的subscribeOn
没有效果,结果与4的结果相同。调换5中的两个
subscribeOn
:Flowable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { Log.e(TAG, "Start:" + Thread.currentThread().getName()); Thread.sleep(1000); return "Done"; } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { Log.e(TAG, "Map:" + Thread.currentThread().getName()); return "Complete"; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e(TAG, s + ": " + Thread.currentThread().getName()); } });
结果:
结果发生改变,开始的操作在main线程中。
结论
- 默认情况下,
Observable
及其所有链式操作都在当前线程下工作; subscribeOn
可以指定Observable
开始操作的线程,如果之后没有切换线程,后续所有操作及结果都在该线程下运行和获取;subscribeOn
调用的位置任意,多次调用仅第一次有效;observeOn
可以切换线程,改变后续链式操作所在的线程,可以多次调用,每次调用都切换一次线程。