Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.i("xiang","发射.1");
emitter.onNext(1);
Log.i("xiang","发射.2");
emitter.onNext(2);
Log.i("xiang","发射.3");
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
Log.i("xiang","变换...");
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("xiang","subscribe...");
}
});
以上为一个基本的Rxjava建立订阅的方式
执行结果为:
* 11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 发射.1
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 变换...
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: subscribe...
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 发射.2
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 变换...
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: subscribe...
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 发射.3
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: 变换...
11-28 16:52:43.173 9547-9547/com.xlz.rxjavatest I/xiang: subscribe...
* */
观察运行结果可以知道,emitter执行一次onNext,则整个流程执行一遍, 就是发射--->变换---->accept
从subscribe方法看起
@SchedulerSupport
(
SchedulerSupport
.
NONE
)
public final
Disposable
subscribe
(Consumer<?
super
T
> onNext) {
return
subscribe(onNext
,
Functions.
ERROR_CONSUMER
,
Functions.
EMPTY_ACTION
,
Functions.
emptyConsumer
())
;
}
点进去
public final
Disposable
subscribe
(Consumer<?
super
T
> onNext
,
Consumer<?
super
Throwable> onError
,
Action onComplete
,
Consumer<?
super
Disposable> onSubscribe) {
ObjectHelper.
requireNonNull
(onNext
,
"onNext is null"
)
;
ObjectHelper.
requireNonNull
(onError
,
"onError is null"
)
;
ObjectHelper.
requireNonNull
(onComplete
,
"onComplete is null"
)
;
ObjectHelper.
requireNonNull
(onSubscribe
,
"onSubscribe is null"
)
;
LambdaObserver<
T
> ls =
new
LambdaObserver<
T
>(onNext
,
onError
,
onComplete
,
onSubscribe)
;
subscribe(ls)
;
return
ls
;
}
这里可以看到,将onNext,onError,onComplete,onSubscribe封装成一个LamdaObserver对象
点进去
public final class
LambdaObserver<
T
>
extends
AtomicReference<Disposable>
implements
Observer<
T
>
,
Disposable {
private static final long
serialVersionUID
= -
7251123623727029452L
;
final
Consumer<?
super
T
>
onNext
;
final
Consumer<?
super
Throwable>
onError
;
final
Action
onComplete
;
final
Consumer<?
super
Disposable>
onSubscribe
;
public
LambdaObserver
(Consumer<?
super
T
> onNext
,
Consumer<?
super
Throwable> onError
,
Action onComplete
,
Consumer<?
super
Disposable> onSubscribe) {
super
()
;
this
.
onNext
= onNext
;
this
.
onError
= onError
;
this
.
onComplete
= onComplete
;
this
.
onSubscribe
= onSubscribe
;
}
@Override
public void
onSubscribe
(Disposable s) {
if
(DisposableHelper.
setOnce
(
this,
s)) {
try
{
onSubscribe
.accept(
this
)
;
}
catch
(Throwable ex) {
Exceptions.
throwIfFatal
(ex)
;
onError(ex)
;
}
}
}
@Override
public void
onNext
(
T
t) {
if
(!isDisposed()) {
try
{
onNext
.accept(t)
;
}
catch
(Throwable e) {
Exceptions.
throwIfFatal
(e)
;
onError(e)
;
}
}
}
@Override
public void
onError
(Throwable t) {
if
(!isDisposed()) {
dispose()
;
try
{
onError
.accept(t)
;
}
catch
(Throwable e) {
Exceptions.
throwIfFatal
(e)
;
RxJavaPlugins.
onError
(
new
CompositeException(t
,
e))
;
}
}
}
@Override
public void
onComplete
() {
if
(!isDisposed()) {
dispose()
;
try
{
onComplete
.run()
;
}
catch
(Throwable e) {
Exceptions.
throwIfFatal
(e)
;
RxJavaPlugins.
onError
(e)
;
}
}
}
@Override
public void
dispose
() {
DisposableHelper.
dispose
(
this
)
;
}
@Override
public boolean
isDisposed
() {
return
get() == DisposableHelper.
DISPOSED
;
}
}
这里没什么好说的, 就是把几个Consumer对象组合在一起了,在调用onSubsribe、onNext 、onError、onComplete的时候调用响应的Cosumer的方法
继续看suscribe(ls)方法
点进去
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
可以看到,这里是调用了subscribeActual(observer)
subscribeActual是Observable的抽象方法
从Debug得知,
这里实际调用的是子类对象ObservableCreate的subscribeActual方法
如下:
@Override
protected void
subscribeActual
(Observer<?
super
T
> observer) {
CreateEmitter<
T
> parent =
new
CreateEmitter<
T
>(observer)
;
observer.onSubscribe(parent)
;
try
{
source
.subscribe(parent)
;
}
catch
(Throwable ex) {
Exceptions.
throwIfFatal
(ex)
;
parent.onError(ex)
;
}
}
可以看到,这里用了CreateEmitter,那么看一下CreateEmitter
static final class
CreateEmitter<
T
>
extends
AtomicReference<Disposable>
implements
ObservableEmitter<
T
>
,
Disposable {
private static final long
serialVersionUID
= -
3434801548987643227L
;
final
Observer<?
super
T
>
observer
;
CreateEmitter
(Observer<?
super
T
> observer) {
this
.
observer
= observer
;
}
@Override
public void
onNext
(
T
t) {
if
(t ==
null
) {
onError(
new
NullPointerException(
"onNext called with null. Null values are generally not allowed in 2.x operators and sources."
))
;
return;
}
if
(!isDisposed()) {
observer
.onNext(t)
;
}
}
@Override
public void
onError
(Throwable t) {
if
(t ==
null
) {
t =
new
NullPointerException(
"onError called with null. Null values are generally not allowed in 2.x operators and sources."
)
;
}
if
(!isDisposed()) {
try
{
observer
.onError(t)
;
}
finally
{
dispose()
;
}
}
else
{
RxJavaPlugins.
onError
(t)
;
}
}
@Override
public void
onComplete
() {
if
(!isDisposed()) {
try
{
observer
.onComplete()
;
}
finally
{
dispose()
;
}
}
}
@Override
public void
setDisposable
(Disposable d) {
DisposableHelper.
set
(
this,
d)
;
}
@Override
public void
setCancellable
(Cancellable c) {
setDisposable(
new
CancellableDisposable(c))
;
}
@Override
public
ObservableEmitter<
T
>
serialize
() {
return new
SerializedEmitter<
T
>(
this
)
;
}
@Override
public void
dispose
() {
DisposableHelper.
dispose
(
this
)
;
}
@Override
public boolean
isDisposed
() {
return
DisposableHelper.
isDisposed
(get())
;
}
}
CreateEmitter是ObservableCreate的静态内部类
里面有Oberser的一个成员,在构造方法里将Observer对象赋给个这个成员
,那么在调用CreateEmitter的onNext,onComplete,实际就是在调用
Observer的onNext,onComplete方法。这就是观察者模式。
这就可以解释,emitter调用onNext方法后,Comsuer的accept方法也会执行了。
下面继续看
observer.onSubscribe(parent)
;
这个observer实际就是LamdaObserver对象
那么实际这里是调用了LamdaObserver的onSubscribe方法
如下:
@Override
public void
onSubscribe
(Disposable s) {
if
(DisposableHelper.
setOnce
(
this,
s)) {
try
{
onSubscribe
.accept(
this
)
;
}
catch
(Throwable ex) {
Exceptions.
throwIfFatal
(ex)
;
onError(ex)
;
}
}
}
首先看DisposableHelper.setOnce(this,s)
public static boolean
setOnce
(AtomicReference<Disposable> field
,
Disposable d) {
ObjectHelper.
requireNonNull
(d
,
"d is null"
)
;
if
(!field.compareAndSet(
null,
d)) {
d.dispose()
;
if
(field.get() !=
DISPOSED
) {
reportDisposableSet
()
;
}
return false;
}
return true;
}
这里用到了compareAndSet(expect,update)方法,看注释
Atomically sets the value to the given updated value
* if the current value {
@code
==} the expected value.
如果当前value和expect相等,则设置为update的新值
成功返回true,失败返回false
那么对于setOnce方法,如果field为null,则if里面不会执行,setOnce直接返回true
否则返回false
//没分析完,关于Diposeable
继续看
subscribActual
里的source.subscribe(parent);
这里的source是
ObservableOnSubscribe
对象
可以看到建立订阅后,就会调用ObservableOnSubscribe的
subscribe(ObservableEmitter<Integer> emitter)方法
即开始发射事件
下面分析map
@SchedulerSupport
(
SchedulerSupport
.
NONE
)
public final
<
R
> Observable<
R
>
map
(Function<?
super
T
,
?
extends
R
> mapper) {
ObjectHelper.
requireNonNull
(mapper
,
"mapper is null"
)
;
return
RxJavaPlugins.
onAssembly
(
new
ObservableMap<
T
,
R
>(
this,
mapper))
;
}
这里看到onAssembly方法传入了一个ObservableMap对象
看ObservableMap 构造方法
public
ObservableMap
(ObservableSource<
T
> source
,
Function<?
super
T
,
?
extends
U
> function) {
super
(source)
;
this
.
function
= function
;
}
可以看到,把function对象赋给了ObservableMap的成员变量function
继续看onAssembly方法
public static
<
T
> Observable<
T
>
onAssembly
(Observable<
T
> source) {
Function<Observable
,
Observable> f =
onObservableAssembly
;
if
(f !=
null
) {
return
apply
(f
,
source)
;
}
return
source
;
}
通过调试得知,onObservableAssembly 是null,那么这里直接返回source
即返回ObservableMap对象
那么原来的Observable对象通过map方法后变成了ObservableMap对象
继续看ObservableMap全部源码
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
那么在执行Observable的subscribe时,实际是在执行ObservableMap的subscribeActual方法
在emitter执行onNext方法就是执行了MapObserver的onNext方法
这样就起到了变换的作用