RxJava2的三要素:
观察者模式思想,函数响应式编程
1. Observable 被观察者,它决定什么时候触发时间,在上游发送命令,决定异步操作模块的顺序和异步操作的次数。
2. Observer 观察者,它可以在不同的线程中执行任务,在下游待命状态的接受事件,响应被观察者的通知。
3. subscribe订阅事件 创建好了Observable和Observer ,需要将他们关联起来,才能实现链式调用。
subscribe的使用
先写一个简单的例子
Observable.just("Hello World")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.print("s");
}
});
just:是事件的创建
Consumer是消费者,用于接受单个值。
用于简单的消费事件类型。而subscribe有多个重载方法,rxjava2中Observable不再支持订阅Subscribler,而是需要使用Obserer作为观察者
subscribe(onNext) 接受的事件
subscribe(onNext,onError) 接受的事件,错误
subscribe(onNext,onError,onComplete) 接受的事件,错误,事件接受完成
subscribe(onNext,onError,onComplete,onSubscribe) 接受的事件,错误,,事件接受完成,开始订阅
Observable.just("Hellow World")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("subscrible");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
上述代码会依次输出:
subscrible
Hellow World
onComplete
RxJava2中五种观察者模式:
Hot Observable和Cold Observable
Hot Observable 无论有没有观察者进行订阅,事件始终会发生。当有多个订阅者时候,其关系是一对多的关系,共享订阅者的共享事件。
Cold Observable 只有观察者订阅了,才开始执行发射数据流的代码。并且与观察者Ibserver只能是一对一的关系。当有多个观察者订阅时候,各自的事件是独立的,消息流是重新完整发送的。
Cold Observable 例子如下:可以由just、creat、from等操作符生成。
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者1号输出:"+ mlong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者2号输出:"+ mlong);
}
};
Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
/**
* interval(周期次数,周期时间单位,调度计算);---->返回一个Long泛型的被观察者
* take 返回一个被观察者对象,发送由原被观察者(Long泛型的被观察者)的事件
*/
Observable.interval(10, TimeUnit.MILLISECONDS,
Schedulers.computation()).take(Long.MAX_VALUE).subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread());//指定下游在一个新的线程接受事件
observable.subscribe(subscriber1);//订阅观察者1号
observable.subscribe(subscriber2);//订阅观察者2号
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出以下:
观察者1号输出:0
观察者2号输出:0
观察者1号输出:1
观察者2号输出:1
观察者2号输出:2
观察者1号输出:2
观察者1号输出:3
观察者2号输出:3
观察者2号输出:4
观察者1号输出:4
观察者2号输出:5
观察者1号输出:5
观察者1号输出:6
观察者2号输出:6
观察者1号输出:7
观察者2号输出:7
观察者1号输出:8
观察者2号输出:8
观察者1号输出:9
观察者2号输出:9
输出结果并不是一致的,可以看出二者是相互独立的,符合一般的响应式思想!但是对于某些事件不确定何时发生及不确定Observable发射的元素数量变化的情况,需要使用到Hot Observable,比如UI交互的事件、网络环境变化,地理位置变化等情况。
转换成Hot Observable
使用publish,生成ConnectableObservable ,用上面例子更换如下:
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者1号输出:"+ mlong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者2号输出:"+ mlong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者3号输出:"+ mlong);
}
};
ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
/**
* interval(周期次数,周期时间单位,调度计算);---->返回一个Long泛型的被观察者
* take 返回一个被观察者对象,发送由原被观察者(Long泛型的被观察者)的事件
*/
Observable.interval(10, TimeUnit.MILLISECONDS,
Schedulers.computation()).take(Long.MAX_VALUE).subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread())
.publish();
//生成的ConnectableObservable必须使用connect才能执行
observable.connect();
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);
observable.subscribe(subscriber3);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
观察者1号输出:0
观察者2号输出:0
观察者3号输出:0
观察者1号输出:1
观察者2号输出:1
观察者3号输出:1
观察者1号输出:2
观察者2号输出:2
观察者3号输出:2
观察者1号输出:3
观察者2号输出:3
观察者3号输出:3
观察者1号输出:4
观察者2号输出:4
观察者3号输出:4
观察者1号输出:5
观察者2号输出:5
观察者3号输出:5
观察者1号输出:6
观察者2号输出:6
观察者3号输出:6
观察者1号输出:7
观察者2号输出:7
观察者3号输出:7
观察者1号输出:8
观察者2号输出:8
观察者3号输出:8
观察者1号输出:9
观察者2号输出:9
观察者3号输出:9
可以看出多个订阅者(观察者)共享同一个事件,而且这里的ConnectableObservable是线程安全的。
使用Subject/Processor :
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者1号输出:" + mlong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者2号输出:" + mlong);
}
};
Consumer<Long> subscriber3 = new Consumer<Long>() {
@Override
public void accept(Long mlong) throws Exception {
System.out.println("观察者3号输出:" + mlong);
}
};
Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
/**
* interval(周期次数,周期时间单位,调度计算);---->返回一个Long泛型的被观察者
* take 返回一个被观察者对象,发送由原被观察者(Long泛型的被观察者)的事件
*/
Observable.interval(10, TimeUnit.MILLISECONDS,
Schedulers.computation()).take(Long.MAX_VALUE).subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread());
PublishSubject<Long> subject = PublishSubject.create();
observable.subscribe(subject);
observable.subscribe(subscriber1);
observable.subscribe(subscriber2);
observable.subscribe(subscriber3);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出结果同上!
这里讲了Hot和Cold的区别!后续会讲到五种观察者模式。