首先看使用代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//被观察者的create需要一个ObserableSubscribe与之产生关联
e.onNext("");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
以上是示例代码 下面进行源码分析
思路分析 Obserable.create方法 进去源码代码如下:
第一步:看create方法的参数 点进去发现它是一个接口
![7560118-25e1cab22194cafd.png](https://upload-images.jianshu.io/upload_images/7560118-25e1cab22194cafd.png)
Obserable的create方法.png
可以看到create方法 传入了一个ObservableOnSubscribe<T> source的东西,那么进入ObservableOnSubscribe这个源代码
可以看到它是一个接口代码如下:
![7560118-c8d66980a1594050.png](https://upload-images.jianshu.io/upload_images/7560118-c8d66980a1594050.png)
ObservableOnSubscrible接口.png
可以看到接口里面有个ObservableEmitter<T> 这个是一个监听器等下会调用的
第二步再来看RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
这里创建了一个ObservableCreate的实例点进去看下如下:
![7560118-547db6be8d58d3d4.png](https://upload-images.jianshu.io/upload_images/7560118-547db6be8d58d3d4.png)
观察者Obserable的子类.png