工作中并没有使用到,而且我觉得像vertx等优秀的框架已经提供了非常完善的异步事件驱动的功能,Spring也有ApplicationExentPublisher来做类似的事。今天就是写个入门例子,看看。emmmm
当然这里只是简单例子,实际使用中应该进一步封装,Spring中的ApplicationExentPublisher就是很好的运行方式(封装思路)的参照物。
public class RxJava {
public static void main(String[] args) {
/**
* 上游消息的产生,这里的String在实际中就是我们业务上要传输的对象
*/
Observable<String> obserbable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("First Msg!");
emitter.onNext("Sceond Msg!");
}
});
/**
* 简单的事件处理方式
*/
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(Thread.currentThread().getName() + " ----consumer---:" + s);
}
};
/**
* 更丰富的事件处理方式,RxJava2.0是这个 1.0中是另一个类
*/
Observer<String> observer = new Observer<String>() {
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName()+ " ---Observer.onComplete:");
}
@Override
public void onError(Throwable arg0) {
System.out.println(Thread.currentThread().getName()+ " ---Observer.onError:" + arg0);
}
@Override
public void onNext(String arg0) {
System.out.println(Thread.currentThread().getName()+ " ---Observer.onNext:" + arg0);
}
@Override
public void onSubscribe(Disposable arg0) {
System.out.println(Thread.currentThread().getName()+ " ---Observer.onSubscribe:" + arg0);
}
};
/**
* observeOn是异步处理的关键。这里使用Schedulers.newThread()代表事件处理是新开启的线程中。
* 实际应用应设置为线程池,尽量用自定义的,比较可控
*
*/
obserbable.observeOn(Schedulers.newThread()).subscribe(consumer);
obserbable.observeOn(Schedulers.newThread()).subscribe(observer);
/**
* 阻塞住为了观察上面两个事件的执行结果
*/
for(;;);
}
}