使用RxJava的subscribeOn
和observeOn
可以方便地进行线程切换,但我发现很多人由于对subscribeOn
的理解不到位,在使用中会发生意想不到的bug。
subscribeOn
提起subscribeOn
,很多人都知道它可以用来切换上游线程,且只有第一次生效。这种理解明显是带有错误的,看一段代码
val observable = Observable.create<Int> { emitter ->
log("Subscribing")
thread(name = "Main thread", isDaemon = false) {
log("1 - emitting"); emitter.onNext(1)
log("2 - emitting"); emitter.onNext(2)
log("3 - emitting"); emitter.onNext(3)
emitter.onComplete()
}
}
observable
.subscribeOn(Schedulers.computation())
.map {
log("$it - after subscribeOn")
it * 10
}.test()
.awaitTerminalEvent() // Wait until observable completes
上面的输出结果是什么呢?
[RxComputationThreadPool-1] Subscribing
[Main thread] 1 - emitting
[Main thread] 1 - after subscribeOn
[Main thread] 2 - emitting
[Main thread] 2 - after subscribeOn
[Main thread] 3 - emitting
[Main thread] 3 - after subscribeOn
虽然subscribeOn
切换了线程,但是after subscribeOn
仍然打印在main thread。
subscribeOn
只是用来决定在哪个线程订阅,如果订阅之后没有切县城操作,数据会在当前线程(订阅时的线程)发射,也就是很多人理解的切换“上游线程”。但是像上面例子中,根据Subscribing
的打印得知虽然订阅发生在RxComputationThreadPool
,但是切换到Main thread发射数据,后续数据一直在主线程流动,不会因为subscribeOn再次切换线程
所以在开发中务必要注意:上游数据不一定来自subscribeOn
的线程。也许有人会说上面的例子太极端,一般不会在订阅后乱切线程的,但实际业务场景远比想象的复杂,看下面一段实际业务代码
val observable = Observable.create<Location> { emitter ->
// Library callback to receive location updates
val locationCallback = object : LocationCallback() {
override fun onLocationResult(locationResult: LocationResult?) {
locationResult?.lastLocation?.let { emitter.onNext(it) }
}
}
// Stop listening to updates when the stream is disposed
emitter.setCancellable { locationClient.removeLocationUpdates(locationCallback) }
// Request location updates to the created callback
locationClient.requestLocationUpdates(
locationRequest,
locationCallback,
Looper.getMainLooper() // make sure callback run in mainthread
)
}
observable
.subscribeOn(Schedulers.io)
.map { location -> fetchDataForLocation(location) } // network call
.subscribe { locationData -> showData(locationData) }
逻辑很清晰,通过LocationClient
获取最新的位置,然后通过fetchDataForLocation
将位置上报获取需要显示的数据。requestLocationUpdates
传入了一个Looper.getMainLooper()
,因为这个三方库的API要求callback必须在主线程。
因此,虽然subscribeOn
切到了io线程,但是fetchDataForLocation
还是发生在ui线程,造成Crash。因此正确的做法是在fetchDataForLocation
之前通过observeOn
切换到io线程。
PublishSubject
其实最容易混淆订阅线程与发射线程的场景当属Subject了。当订阅一个PublishSubject
时,数据的来源只取决于Subject.onNext
的线程,与订阅所在的线程完全无关。很多PublishSubject的使用场景中subscribeOn
的使用是无意义的
val subject = PublishSubject.create<Int>()
val observer1 = subject
.subscribeOn(Schedulers.io())
.doOnNext { log("$it - I want this happen on an IO thread") }
.test()
val observer2 = subject
.subscribeOn(Schedulers.newThread())
.doOnNext { log("$it - I want this happen on a new thread") }
.test()
Thread.sleep(10); subject.onNext(1)
Thread.sleep(10); subject.onNext(2)
Thread.sleep(10); subject.onNext(3)
Thread.sleep(10); subject.onComplete()
observer1.awaitTerminalEvent()
observer2.awaitTerminalEvent()
结果如下,subscribeOn
的指定完全没有意义
[Test worker] 1 - I want this happen on an IO thread
[Test worker] 1 - I want this happen on a new thread
[Test worker] 2 - I want this happen on an IO thread
[Test worker] 2 - I want this happen on a new thread
[Test worker] 3 - I want this happen on an IO thread
[Test worker] 3 - I want this happen on a new thread
只有通过observeOn
显示地切换线程,才能达到预期效果
val subject = PublishSubject.create<Int>()
val observer1 = subject
.observeOn(Schedulers.io())
.doOnNext { log("$it - I want this happen on an IO thread") }
.test()
val observer2 = subject
.observeOn(Schedulers.newThread())
.doOnNext { log("$it - I want this happen on a new thread") }
.test()
Thread.sleep(10); subject.onNext(1)
Thread.sleep(10); subject.onNext(2)
Thread.sleep(10); subject.onNext(3)
Thread.sleep(10); subject.onComplete()
observer1.awaitTerminalEvent()
observer2.awaitTerminalEvent()
[RxNewThreadScheduler-1] 1 - I want this happen on a new thread
[RxCachedThreadScheduler-1] 1 - I want this happen on an IO thread
[RxNewThreadScheduler-1] 2 - I want this happen on a new thread
[RxCachedThreadScheduler-1] 2 - I want this happen on an IO thread
[RxNewThreadScheduler-1] 3 - I want this happen on a new thread
[RxCachedThreadScheduler-1] 3 - I want this happen on an IO thread
BehaviorSubject
需要特别注意BehaviorSubject等有sticky效果的Subject,数据发射除了来自onNext
,还可能来自于订阅,此时的发射数据的线程有可能来自subscribeOn
val subject = BehaviorSubject.create<Int>()
val observer1 = subject
.subscribeOn(Schedulers.io())
.doOnNext { log("$it - First observer") }
.test()
subject.onNext(1) // Will be emitted as part of the subscription
val observer2 = subject
.subscribeOn(Schedulers.io())
.doOnNext { log("$it - Second observer") }
.test()
val observer3 = subject
.subscribeOn(Schedulers.newThread())
.doOnNext { log("$it - Third observer") }
.test()
Thread.sleep(10); subject.onNext(2)
Thread.sleep(10); subject.onNext(3)
Thread.sleep(10); subject.onComplete()
observer1.awaitTerminalEvent()
observer2.awaitTerminalEvent()
observer3.awaitTerminalEvent()
[Test worker] 1 - First observer
[RxCachedThreadScheduler-1] 1 - Second observer
[RxNewThreadScheduler-2] 1 - Third observer
[Test worker] 2 - First observer
[Test worker] 2 - Second observer
[Test worker] 2 - Third observer
[Test worker] 3 - First observer
[Test worker] 3 - Second observer
[Test worker] 3 - Third observer
第一条日志数据来自observer1
订阅之后的onNext
,此时相当于PublishSubject
,打印在onNext
线程;紧接着第二三条日志的数据分别来自observer2
和observer3
的订阅时的BehaviorSubject
的缓存,所以发生在订阅线程;后续日志的数据均来自onNext。
总结
RxJava可以方便地帮助开发者进行线程切换,但前提是必须弄清楚subsribeOn
等线程相关操作符的准确含义,特别是在PublishSubject
的使用中,不要被subsribeOn
的出现而误判了当前线程。