RxJava(一)——基础

背景:

RxJava是目前最火的响应式编程框架之一。目前已经更新到RxJava2.0版本,我们平时在开发过程中遇到最多和使用最多的一种第三方框架,因为它使用起来方便快捷、成本低。而且现在有很多其他第三方框架都是基于RxJava来实现的,如Android 6.0之后的动态权限申请框架RxPermission。所以学习使用RxJava是迫在眉睫的事情,但是光会使用还不行,因为面试官要的永远不是你会使用某个框架,而是要你理解这些框架的实现原理。最好是能够自己去实现一个类似的框架的地步。但是很多朋友和我一样拿到这些框架,一通看,想学但是又不知道从何学起。然后就开始百度,然后都是一堆别人的学习心得,一股脑看几十篇,电脑一关又全忘了,我在哪?我在干嘛?我刚刚学了什么东西?那到底怎么学习呢?答案是带着问题去学习,去看源码。有人又说源码太复杂我看不懂,那就从最基本的开始,所有的框架都会基于某种设计模式以及Java基础知识(如:注解、反射等)来实现的。那我们就需要先去弄明白这些基础知识再来学习。

一、响应式编程

(1)、响应式编程思想

学习一个东西,我们首先要知道我们为什么要学习,学习它用来干嘛。RxJava是目前最火的响应式编程,那么什么是响应式编程呢?响应式编程其实是一种编程思想,它的目的是想通过某种方式能够建立事务于事务之间的关系。而在我们的Android开发中每一个界面其实都包裹着大量的业务逻辑,产品的每一个需求都对应着一个个业务逻辑的相互作用。每一个业务逻辑就可以看作是一个事务,业务逻辑与业务逻辑之间的关系就是事务与事务之间的关系。但是在Android开发前期,由于技术和环境的限制,这种编程思想还停留在想的阶段,直到Rx和其他一些支持这种思想的框架出现,才真正将这种思想应用到代码中。

响应式编程在代码中的体现就是“一种通过异步和数据流来构建事务关系的编程模型”,其中“事务的关系”是它的核心思想,“异步”和“数据流”是实现这种编程思想的关键。

(2)、核心与实现:事务的关系、异步与数据流

在App中事务的关系就是业务逻辑的关系。就拿一个应用的初始化来说,它需要完成登陆接口调用、SDK初始化、数据库初始化、服务初始化、心跳初始化等之后才能够进入首页。而这些初始化操作很可能都是些耗时且并行的操作。那么我们需要通过异步和操作符来完成这些前期操作,等这些前期操作(Rx中称作上游业务)完成之后,再通知主线程执行跳转首页操作(Rx中称作下游业务)。通知下游的方式有很多种,其中最棒的的方式就是通过数据(事件)流。每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。在前面说多线程的时候其实我们已经有了解过异步编程的好处了,但是异步编程除了之前了解的能够帮我们提高效率、降低阻塞等等好处之外,它还能够帮助我们构建事务的关系。在APP初始化中,我们能发现SDK初始化,数据库初始化,登陆接口这三个业务本身相互独立,应当在不同的线程环境中执行,以保证他们不会相互阻塞。而假如没有异步编程,我们可能只能在一个线程中顺序调用这三个相对耗时较多的业务,最终再去做首页跳转,这样做就使得这些初始化操作不能够相互独立了,不仅没有完美体现出业务本来的关系,反而会让你的程序“反应”更慢(因为三个任务不是并行执行的,而是顺序执行的)。

二、RxJava基础

(1)、RxJava的基本使用

对于像我一样的菜鸟程序员,碰到一种被大量应用到的新技术,很多时候完全做不到先去弄明白它的原理,并不仅仅是因为技术达不到,而是工作根本不给你时间去研究。第一要务是先不管三七二十一,先学会怎么用这个东西,并用它完成开发中的功能。只有活干完了才有时间来仔细的研究这些新知识的实现原理。RxJava的使用其实狠简单,主要分为一下几步:

a)、在项目的Gradle中添加如下依赖

implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.1.0'

b)、弄明白RxJava的三大主角,观察者、被观察者以及订阅是怎么个回事,以及他们之间是如何交互的。

<1>被观察者:Observable

它是我们真正处理耗时的业务逻辑的地方(一般称为下游),当逻辑处理完后产生一条或多条数据流,这些数据流再通过订阅者流向观察者。

<2>观察者:Observer

它是接收被观察者处理业务逻辑后产生的数据流的(称为上游)。当下游通过订阅者传递一条数据流到上游时,会在Observer的onNext方法中接收到下游上来的数据,我们就可以用这些数据进行一些如界面展示的操作了。

<3>订阅:Subscribe

订阅Subscribe 观察者通过订阅(subscribe)被观察者,把他们连接到一起。建立起一条下游到上游的数据流通道

废话那么多,到底在代码中怎么用这个RxJava框架呢。还是直接上代码比较直观。当然了,这里只是为了简单的演示一下怎么使用,并不是现实开发中就这样来写。要是这样简单的打个日志就这样去用,太大材小用了些。现实中使用常配合RxJava的操作符根据使用场景进行组合使用,让我们在完成业务逻辑的同时能够写出漂亮易懂的代码。

//1、创建被观察者 Observable,又叫可被观察的
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {


    //在复写的subscribe()里定义需要发送的事件
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        //通过 ObservableEmitter类对象(发射器) 产生 & 发送事件
        // ObservableEmitter类介绍
        // a. 定义:事件发射器
        // b. 作用:定义需要发送的事件 & 向观察者发送事件
        // 注:建议发送事件前检查观察者的isDisposed状态,以便在没有观察者时,让Observable停止发射数据
        if(!emitter.isDisposed()){
            /**
             * 处理复杂的业务逻辑
             * ............
             */
            //产生一条或多条数据流
            emitter.onNext("Hello RxJava2");
            emitter.onNext("Hello RxJava3");
            emitter.onNext("Hello RxJava4");
        }
    }
});
//2、创建观察者
Observer observer = new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {
        //接收下游的数据流(事件流)
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
};
//3、订阅Subscribe  观察者通过订阅(subscribe)被观察者,把他们连接到一起。建立起一条下游到上游的数据流通道
//这里先不要纠结为什么不是观察者(observer)订阅被观察者(observable)。开发这个框架的人时为了让我们写起代码来更加漂亮、更加流式。
observable.subscribe(observer);

(2)观察者模式

什么是观察者模式?观察者模式的定义是:定义对象之间一对多的依赖关系,使得每当一个对象改变了状态,则所有依赖于它的对象都会得到通知,并被动自动更新。最形象的描述就是现在我们微信关注的公众号(被观察者),一个公众号被很多人订阅,一旦公众号有文章推出时,就会通知所有订阅了该公众号的用户(观察者)。那么这个订阅和通知是怎么实现的呢?我们先来看一张观察者模式的UML类图。然后我们通过代码来自己模拟一个简单公众号实现。

那么如何实现一个简单的观察者模式呢?

/**
 * 创建时间:2019/7/8
 * 创建人:singleCode
 * 功能描述:被观察者的抽象
 **/
public abstract class IObservable {
//被观察者会存放一个观察者的清单
    public List<IObserver> observers = new ArrayList<>();
    public abstract void registerObserver(IObserver observer);
    public abstract void unregisterObserver(IObserver observer);
    public abstract void notifyObserver(Object o);
}
/**
 * 创建时间:2019/7/9
 * 创建人:singleCode
 * 功能描述:自定义微信公众号服务(被观察者的具体实现)
 **/
public class WechatService extends IObservable {
    @Override
    public void registerObserver(IObserver observer) {
        this.observers.add(observer);//添加观察者,可以发现这里我们都是面向抽象编程,这也是设计模式解耦的一大魅力
    }

    @Override
    public void unregisterObserver(IObserver observer) {
        if(this.observers.contains(observer)){
            this.observers.remove(observer);
        }
    }

    @Override
    public void notifyObserver(Object o) {
        System.out.println("notify :"+o);//发布新消息,通知所有观察者
        Iterator<IObserver> observerIterator = observers.iterator();
        while (observerIterator.hasNext()){
            IObserver observer = observerIterator.next();
            observer.update(o);
        }
    }
}
/**
 * 创建时间:2019/7/8
 * 创建人:singleCode
 * 功能描述:观察者的抽象
 **/
public interface IObserver {
    void update(Object o);
}
/**
 * 创建时间:2019/7/9
 * 创建人:singleCode
 * 功能描述:用户(观察者的具体实现)
 **/
public class User implements IObserver {
    private String name;
    public User(String name){
        this.name = name;
    }
    @Override
    public void update(Object o) {
        System.out.println(name+" update:"+o);
    }
}

执行结果如下:

(3)RxJava中的五种观察者模式

在RxJava中观察者模式主要由下表种五种组成,那么这五种观察者模式在实现和功能上有什么区别呢?

类型

描述

Observable<T>

能够发射0或n个数据,并以成功或错误事件终止。

Flowable<T>

能够发射0或n个数据,并以成功或错误事件终止。 支持Backpressure(背压),可以控制数据源发射的速度。

Single<T>

只发射单个数据或错误事件。

Completable

它从来不发射数据,只处理 onComplete 和 onError 事件。可以看成是Rx的Runnable。

Maybe<T>

能够发射0或者1个数据,要么成功,要么失败。有点类似于Optional

(4)、热Observable和冷Observable概念

冷Observable:当有观察者订阅时,才会开始执行发射数据流(事件),针对每个新加入的观察者所发射的数据流是独立的,一对一关系(即每新加一个观察者都会对该观察者重新发射数据流,而之前加入的观察者依旧保持原本的发射状态继续发射)。这句话啥意思呢?还是上代码吧。

public class ColdObservableDemo {
    public static void main(String[] args){
        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
                Observable.interval(10,TimeUnit.MILLISECONDS,Schedulers.computation())
                        .take(Integer.MAX_VALUE)
                        .subscribe(emitter::onNext);
            }
        });
        observable.subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("Consumer1:"+aLong);
            }
        });
  try {
            Thread.sleep(30)//阻塞30毫秒后再添加观察者2
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        observable.subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("-----Consumer2:"+aLong);
            }
        });
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 执行结果如下:

 Observable的create、just、range、fromxxx等操作符生成的Observable都是冷的。

热Observable:无论是否有观察者订阅,数据流(事件)始终都会发射。当有多个观察者订阅时,发射的数据流是共享的,一对多关系。(即观察者只能接收到从订阅那一刻发射出来的数据流,之前的数据流就再也接收不到了。),使用热Observable时需要注意内存泄漏,要及时关闭热Observable,避免内存泄漏发生。

Subject是一种热Observable,同时它既是被观察者也是观察者。下面我们就来看一下Subject的分类和各自的特点。

Subject

发生行为

AsyncSubject

无论订阅发生在什么时候,只发射最后一个数据

BehaviorSubject

发送订阅之前的一个数据和订阅之后的全部数据

ReplaySubject

无论订阅发生在什么时候,都发射全部数据

PublishSubject

发送订阅之后全部数据

AsyncSubject: 

 BehaviorSubject:

ReplaySubject: 

PublishSubject:

冷Observable转换成热Observable:冷Observable可以通过调用publish()操作符转换成为热Observable。然后再调用热Observable的connect操作符启动被观察者。

Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
    @Override
    public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
        Observable.interval(10,TimeUnit.MILLISECONDS,Schedulers.computation())
                .take(Integer.MAX_VALUE)
                .subscribe(emitter::onNext);
    }
}).publish();//调用Publish操作符将冷Observable转换成热Observable

((ConnectableObservable<Long>) observable).connect();//启动热Observable

observable.subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        System.out.println("Consumer1:"+aLong);
    }
});
observable.subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        System.out.println("-----Consumer2:"+aLong);
    }
});
try {
    Thread.sleep(30);//这里阻塞30毫秒后再添加一个观察者3
} catch (InterruptedException e) {
    e.printStackTrace();
}
observable.subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        System.out.println("----------Consumer3:"+aLong);
    }
});

冷Observable转换成热Observable后执行结果如下:

发布了29 篇原创文章 · 获赞 3 · 访问量 899

猜你喜欢

转载自blog.csdn.net/LVEfrist/article/details/95918944