RxJava 基本操作

简介

  • RxJava 是一个 基于事件流、实现异步操作的库

  • RxJava 和 RxAndroid 的关系:RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发

安装:

implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

在这里插入图片描述

  1. 初始化 Observable
  2. 初始化Observer
  3. 建立订阅关系subscribe

操作符

官方文档

create

  • create 操作符应该是最常见的操作符了,主要产生一个Observable发射器(被观察者对象)和一个Observer接收器(观察者对象),通过subscribe建立连接
  • Disposable可用于切断操作,让Observer不在接收上游事件
  • 在发射事件中执行emitter.onComplete()Observable表示发射完成,Observer不在接收后续事件
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.e(TAG, "emitter: 1");
        emitter.onNext(1);
        Log.e(TAG, "emitter: 2");
        emitter.onNext(2);
        Log.e(TAG, "emitter: 3");
        emitter.onNext(3);
        Log.e(TAG, "emitter: onComplete");
        emitter.onComplete();
        Log.e(TAG, "emitter: 4");
        emitter.onNext(4);
    }
}).subscribe(new Observer<Integer>() {
    private Disposable mDisposable;

    @Override
    public void onSubscribe(Disposable d) {
        mDisposable = d;
    }

    @Override
    public void onNext(Integer integer) {
        Log.e(TAG, "onNext:" + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError");
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete");
    }
});

输出信息:

emitter: 1
onNext:1
emitter: 2
onNext:2
emitter: 3
onNext:3
emitter: onComplete
onComplete
emitter: 4

map

Map 的作用是对Observable发射的每一个事件按照指定的函数去变化

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return "convert " + integer;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.e(TAG, "accept:" + s);
    }
});

输出信息:

accept:convert 1
accept:convert 2
accept:convert 3

zip

zip 专用于合并事件,也就是是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
    @Override
    public String apply(String s, Integer integer) throws Exception {
        return s + integer;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.e(TAG, "accept:" + s);
    }
});

private Observable<String> getStringObservable() {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
            Log.e(TAG, "emitter String:A");
            emitter.onNext("A");
            Log.e(TAG, "emitter String:B");
            emitter.onNext("B");
            Log.e(TAG, "emitter String:C");
            emitter.onNext("C");
        }
    });
}

private Observable<Integer> getIntegerObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.e(TAG, "emitter Integer:1");
            emitter.onNext(1);
            Log.e(TAG, "emitter Integer:2");
            emitter.onNext(2);
            Log.e(TAG, "emitter Integer:3");
            emitter.onNext(3);
            Log.e(TAG, "emitter Integer:4");
            emitter.onNext(4);
        }
    });
}

输出信息:

emitter String:A
emitter String:B
emitter String:C
emitter Integer:1
accept:A1
emitter Integer:2
accept:B2
emitter Integer:3
accept:C3
emitter Integer:4

concat

concat把两个发射器连接成一个发射器

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept :" + integer);
            }
        });

输出信息:

accept :1
accept :2
accept :3
accept :4
accept :5
accept :6

flatMap

  • FlatMap 可以把一个发射器 Observable 转换为多个 Observables,然后再把这些分散的 Observables装进一个单一的发射器 Observable
  • flatMap 并不能保证事件的顺序
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer);
        }
        int delayTime = (int) (1 + Math.random() * 10);
        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
    }
}).subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "flatMap : accept : " + s + "\n");
        }
    });

输出信息:

flatMap : accept : I am value 1
flatMap : accept : I am value 1
flatMap : accept : I am value 1
flatMap : accept : I am value 2
flatMap : accept : I am value 2
flatMap : accept : I am value 2
flatMap : accept : I am value 3
flatMap : accept : I am value 3
flatMap : accept : I am value 3

concatMap

concatMapFlatMap 的唯一区别就是 concatMap 保证了顺序

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).concatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            list.add("I am value " + integer);
        }
        int delayTime = (int) (1 + Math.random() * 10);
        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
    }
}).subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e(TAG, "accept:" + s);
        }
    });

输出信息:

accept:I am value 1
accept:I am value 1
accept:I am value 1
accept:I am value 2
accept:I am value 2
accept:I am value 2
accept:I am value 3
accept:I am value 3
accept:I am value 3

distinct

作用是去重

Observable.just(1, 1, 2, 2, 3, 4)
        .distinct()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e("TAG", "accept:" + integer);
            }
        });

输出信息:

accept:1
accept:2
accept:3
accept:4

filter

过滤掉不符合条件的数据

Observable.just(10, 20, 30, 40, 50)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer >= 30;
            }
        }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.e(TAG, "accept:" + integer);
    }
});

输出信息:

accept:30
accept:40
accept:50

buffer

buffer接收两个参数:count表示数量,skip表示步长

我们依次发射1 2 3 4 5数据,所以依次接收123,345,5

Observable.just(1, 2, 3, 4, 5)
    .buffer(3, 2)
    .subscribe(new Consumer<List<Integer>>() {
        @Override
        public void accept(List<Integer> integers) throws Exception {
            Log.e(TAG, "buffer size :" + integers.size());
            for (Integer i : integers) {
                Log.e(TAG, "accept:" + i);
            }
        }
    });

输出信息:

buffer size :3
accept:1
accept:2
accept:3
buffer size :3
accept:3
accept:4
accept:5
buffer size :1
accept:5

timer

设置定时任务

Log.e(TAG, "timer start:" + getNowStrTime());
Observable.timer(2, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.e(TAG, "timer end:" + getNowStrTime());
        }
    });

输出信息:

timer start:2018-09-23 18:14:01
timer end:2018-09-23 18:14:03

interval

interval 用于间隔时间执行,initialDelay表示第一次执行延迟时间,period表示间隔执行时间

Log.e(TAG, "timer start:" + getNowStrTime());
Observable.interval(3, 2, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.e(TAG, "accept:" + getNowStrTime());
        }
    });

输出信息:

timer start:2018-09-23 18:15:42
accept:2018-09-23 18:15:45
accept:2018-09-23 18:15:47
accept:2018-09-23 18:15:49
accept:2018-09-23 18:15:51
accept:2018-09-23 18:15:53
accept:2018-09-23 18:15:55
...

doOnNext

作用是让订阅者在接收数据之前做一些操作

Observable.just(1, 2, 3, 4)
    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "doOnNext:" + integer);
        }
    })
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

输出信息:

doOnNext:1
accept:1
doOnNext:2
accept:2
doOnNext:3
accept:3
doOnNext:4
accept:4

skip

跳过 count 个数目开始接收

Observable.just(1, 2, 3, 4, 5)
    .skip(2)
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

输出信息:

accept:3
accept:4
accept:5

take

表示至多接收 count 个数据

Observable.just(1, 2, 3, 4, 5)
    .take(2)
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

输出信息:

accept:1
accept:2

Single

Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()

Single.just(new Random().nextInt())
        .subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onSuccess(Integer integer) {
                Log.e(TAG, "onSuccess:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError:" + e.getMessage());
            }
        });

输出信息:

onSuccess:-788710247

debounce

去除发送频率过快的数据

// 去除发送间隔时间小于 500 毫秒的发射事件
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        Thread.sleep(400);
        emitter.onNext(2);
        Thread.sleep(500);
        emitter.onNext(3);
        Thread.sleep(100);
        emitter.onNext(4);
        Thread.sleep(700);
        emitter.onNext(5);
        emitter.onComplete();
    }
}).debounce(500, TimeUnit.MILLISECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });

输出信息:

accept:2
accept:4
accept:5

defer

每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
    @Override
    public ObservableSource<Integer> call() throws Exception {
        return Observable.just(1, 2, 3);
    }
});

observable.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe( Disposable d) {
    }

    @Override
    public void onNext( Integer integer) {
        Log.e(TAG, "defer : " + integer );
    }

    @Override
    public void onError( Throwable e) {
        Log.e(TAG, " onError : " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete");
    }
});

输出信息:

defer : 1
defer : 2
defer : 3
onComplete

last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项

Observable.just(1, 2, 3)
        .last(4)
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });

输出信息:

accept:3

merge

merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到发射器 A 发送完所有的事件再进行发射器 B 的发送

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

输出信息:

accept:1
accept:2
accept:3
accept:4
accept:5

reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值

Observable.just(1, 2, 3)
        .reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });

输出信息:

accept:6

scan

scanreduce基本一致,区别在于scan输出每一个步骤

Observable.just(1, 2, 3)
        .scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });

输出信息:

accept:1
accept:3
accept:6

window

按照实际划分窗口,将数据发送给不同的 Observable

// 每3秒划分一个Observable
Observable.interval(1, TimeUnit.SECONDS)
    .take(15)
    .window(3, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Observable<Long>>() {
        @Override
        public void accept(Observable<Long> longObservable) throws Exception {
            Log.e(TAG, "---");
            longObservable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept:" + aLong);
                    }
                });
        }
    });

输出信息:

---
accept:0
accept:1
---
accept:2
accept:3
accept:4
---
accept:5
accept:6
accept:7
---
accept:8
accept:9
accept:10
---
accept:11
accept:12
accept:13
---
accept:14

线程调度

subScribeOn

subscribeOn 用于指定 subscribe() 时所发生的线程

observeOn

observeOn 方法用于指定下游 Observer 回调发生的线程

线程切换

  • 简单的subscribeOn 指定的是发射事件的线程,observeOn 指定订阅者接收事件的线程
  • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略
  • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

Demo:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
        e.onNext(1);
        e.onComplete();
    }
})
    .subscribeOn(Schedulers.newThread())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
        }
    })
    .observeOn(Schedulers.io())
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
        }
    });

输出信息:

Observable thread is : RxNewThreadScheduler-1
After observeOn(mainThread),Current thread is main
After observeOn(io),Current thread is RxCachedThreadScheduler-2

说明:

该代码中,subscribeOn分别用 Schedulers.newThread()Schedulers.io() 对发射线程进行切换,但实际只响应Schedulers.newThread()observeOn每调度一次,线程便会切换一次

RxJava内置线程选项

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作

  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作

  • Schedulers.newThread() 代表一个常规的新线程

  • AndroidSchedulers.mainThread() 代表Android的主线程

猜你喜欢

转载自blog.csdn.net/qq_14876133/article/details/82827917