7章 RxJava高级用法(一)

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_30379689/article/details/85019432

CSDN学院课程地址

7. RxJava高级用法(一)

7.1 自定义Operator

自定义Operator属于RxJava的高级用法,可以自己自定义一些适用于常见应用场景的操作符。实现自定义Operator很简单,只需要实现RxJava提供的ObservableOperator接口,实现对应的功能即可,同时,使用lift操作符将自定义操作符应用到我们的程序中。下面我们使用自定义Operator,该操作符的作用是将List集合转换成String类型的输出

1、实现ObservableOperator,创建自定义Operator

public class CustomOperator implements ObservableOperator<String, List<String>> {

    @Override
    public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
        return new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {
                observer.onSubscribe(d);
            }

            @Override
            public void onNext(List<String> strings) {
                observer.onNext(strings.toString());
            }

            @Override
            public void onError(Throwable e) {
                observer.onError(e);
            }

            @Override
            public void onComplete() {
                observer.onComplete();
            }
        };
    }
}

2、使用lift操作符添加自定义Operator

public class Main {
    public static void main(String[] args) {
        //创建被观察者
        Observable.create(new ObservableOnSubscribe<List<String>>() {
            @Override
            //默认在主线程里执行该方法
            public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
                ArrayList<String> list = new ArrayList<>();
                list.add("1");
                list.add("2");
                list.add("3");
                list.add("4");
                e.onNext(list);
                e.onComplete();
            }
        })
                .lift(new CustomOperator())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("onNext=" + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
    }
}

3、输出结果

onNext=[1, 2, 3, 4]
onComplete

7.2 自定义Transformer

自定义Transformer表示一个批量操作符的变换器,如果你在很多Observable中使用相同的一系列操作符,比如每次都要使用到map+take+doOnNext等操作,那么就可以定义一个通用的Transformer对象,里面可以将需要重复用到的操作符打包成Transformer对象,使用compose操作符将Transformer对象应用到我们的Observable上即可

实现自定义Transformer很简单,只需要实现RxJava提供的ObservableTransformer接口,实现对应的功能即可,同时,使用compose操作符将自定义Transformer应用到我们的程序中。下面我们使用自定义Transformer,该Transformer的作用是将发射的数据从Integer转换成String,并取2个数据项,同时在发射的时候监听发射事件,进行输出的打印

1、实现ObservableTransformer,创建自定义Transformer

public class CustomTransformer implements ObservableTransformer<Integer, String> {
    @Override
    public ObservableSource<String> apply(io.reactivex.Observable<Integer> upstream) {
        return upstream.take(2).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "序号:" + integer + "发射成功";
            }
        }).doOnNext(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s + ",准备发射");
            }
        });
    }
}

2、使用compose操作符添加自定义Transformer

public class Main {

    public static void main(String[] args) {
        //创建被观察者
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        })
                .compose(new CustomTransformer())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
}

3、输出结果

序号:1发射成功,准备发射
序号:1发射成功
序号:2发射成功,准备发射
序号:2发射成功

在安卓开发中,通常我们也会自定义Transformer来实现我们常用的线程切场景,具体如下

扫描二维码关注公众号,回复: 4523175 查看本文章
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    };
}

public static <T> FlowableTransformer<T, T> schedulersTransformerForFlowable() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    };
}

7.3 自定义Plugin

自定义Plugin表示自定义插件,自定义插件可以在RxJavaPlugins中提供的接口中去插入自己的一段代码操作,类似于面向切面编程,或者理解成Android的Hook。如果你需要在所有的订阅事件中去插入一段统一的操作,或者是监听所有订阅事件发生异常时的回调,都可以使用自定义插件。在实际应用中,目前并未发现有什么作用

实现自定义Plugin只需要调用RxJavaPlugins提供的set方法即可,下面我们通过例子输出Observable和Observer的地址信息,来验证每次订阅的时候,回调自定义Plugin的方法中,插件对象和源对象是否为同一个对象

1、通过设置ObservableSubscribe,每次对Observable操作的时候回调

public class Main {

    public static void main(String[] args) {

        RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());//任意操作符都有回调
        RxJavaPlugins.setOnObservableSubscribe(new CustomObservableSubscribe());//每次subscribe时候有回调

        Observable observable = getObservable();
        Observer<Integer> observer = getObserver();

        System.out.println("main observable.toString:" + observable.toString());
        System.out.println("main observer.toString:" + observer.toString());

        observable.subscribe(observer);
    }

    public static Observable getObservable() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(5);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        });
    }

    public static Observer<Integer> getObserver() {
        return new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("onNext=" + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        };
    }
}

2、CustomObservableAssembly

public class CustomObservableAssembly implements Function<Observable, Observable> {
    @Override
    public Observable apply(Observable observable) throws Exception {
        System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
        observable.take(2);
        return observable;
    }
}

3、CustomObservableSubscribe

public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
    @Override
    public Observer apply(Observable observable, Observer observer) throws Exception {
        System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
        return observer;
    }
}

4、输出结果

地址相同说明是同个对象,自定义插件Hook成功

CustomObservableAssembly observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
CustomObservableSubscribe observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca,observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
onNext=5
onNext=2
onNext=3

补充:

可以通过设置ErrorHandler,发生异常时会回调

RxJavaPlugins.setErrorHandler();    

可以通过设置SchedulerHandler来Hook到对应的schedule

RxJavaPlugins.setIoSchedulerHandler();
RxJavaPlugins.setNewThreadSchedulerHandler();
RxJavaPlugins.setComputationSchedulerHandler();
RxJavaPlugins.setSingleSchedulerHandler();

错误演示:

由于CustomObservableAssembly是在任意操作符操作的时候都会回调,所以在回调里面是不可以对observable再进行操作符的操作,否则回调里面observable的操作符还是会回调CustomObservableAssembly自身,导致死循环,发生StackOverflowError

public class CustomObservableAssembly implements Function<Observable, Observable> {
    @Override
    public Observable apply(Observable observable) throws Exception {
        System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
        observable.take(2);
        return observable;
    }
}

由于CustomObservableSubscribe是在subscribe之后进行的回调,如果在回调里面对observable进行操作符的操作,这个时候是不会生效的,因为在subscribe之后onNext的函数是不会再处理后面新添的操作符,原理与源码有关

public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
    @Override
    public Observer apply(Observable observable, Observer observer) throws Exception {
        System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
        observable.take(2);
        return observer;
    }
}

猜你喜欢

转载自blog.csdn.net/qq_30379689/article/details/85019432