以前异步处理提高并发的思路是用消息队列实现异步模型,需要多一个中间件,多一层维护,这个比较适合大型应用,中小型应用完全没必要这么做,所以但我看到了RxJava提供的基于JVM的数据流异步模型时,我就用了几天的时间查阅了一些资料做了一个demo,很简单的使用案例;
1:什么是RxJava
RxJava - JVM的反应式扩展 - 用于使用Java VM的可观察序列组合异步和基于事件的程序的库。
简单而言就是一个异步模型。
2:为什么要使用RxJava
异步模型,java本身已经提供了很多框架,以及很多第三方的框架,都能实现异步处理和异步信息通信。不过个人感觉对于实际的业务逻辑实现起来还是略微有些复杂,而且有时还得自己写中间件或者中间变量,才能实现完全的异步操作。
就我本人而言,现在还并没有深入理解RxJava的实现原理,我把它理解为内部集成一个类消息队列机制,基于消息机制和java8流的特性,可以实现代码逻辑简洁明了,修改方便,能满足大部分的场景并且基本上不用书写额外的连接代码。
3:RxJava的工作机制
RxJava2.x中出现了2中观察者模式:
- Observable ( 被观察者 ) / Observer ( 观察者 )
Flowable (被观察者)/ Subscriber (观察者)
最新的接口默认使用Flowable/Subscriber方法
基本实现流程实现是:
- 通过
Flowable.fromCallable
方法传入解析数据和定义回调 - 调度线程执行处理任务
- 解析数据,并进行数据存储或返回
- 通过订阅
subscribe
,处理或返回请求结果
4:RxJava的hello world
maven中引入依赖:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.12</version> </dependency>
书写HelloWorld.class
import io.reactivex.*; public class HelloWorld { public static void main(String[] args) { Flowable.just("Hello world").subscribe(System.out::println); } }
just方法实现数据输入和数据发布,subscribe订阅这个发布
5:简单的demo实现后台应用计算
package com.demo.examples; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; public class Work { private void Background_computation() throws InterruptedException { // method 1 Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "do over"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace); // 由于处理异步,需要等待异步执行完成再结束主线程 Thread.sleep(2000); // <--- wait for the flow to finish } }
6: demo路径
https://github.com/ailice001/RxJava_simple_demo
------------------可能遇到的问题java版本不一致,最低jdk1.8