前言
本文主要过一遍RxJava的Creating操作符
Rx官网地址:http://reactivex.io/documentation/operators.html
正文
create
- 该操作符是所有创建操作符的根,其他操作符都是基于此操作符来实现的。
package zj.com.creating;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Create {
private static String TAG = "Create";
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("RxJava " + TAG + " 操作符 .... 1");
emitter.onNext("RxJava " + TAG + " 操作符 .... 2");
emitter.onNext("RxJava " + TAG + " 操作符 .... 3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
}
}
运行结果:
RxJava CreateonSubscribe __
RxJava CreateonNext __ RxJava Create 操作符 .... 1
RxJava CreateonNext __ RxJava Create 操作符 .... 2
RxJava CreateonNext __ RxJava Create 操作符 .... 3
just
这个操作符相当于一个Create的变种,他相对Create操作符而言,写法更加简便。他提供了多个构造。
package zj.com.creating;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Just {
private static String TAG = "Just";
public static void main(String[] args) {
Observable.just(TAG + " 操作符 .... 3",TAG + " 操作符 .... 2",TAG + " 操作符 .... 3").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
}
}
运行结果:
RxJava JustonSubscribe __
RxJava JustonNext __ Just 操作符 .... 3
RxJava JustonNext __ Just 操作符 .... 2
RxJava JustonNext __ Just 操作符 .... 3
RxJava JustonComplete __
from
convert various other objects and data types into Observables
将其他各式各样的对象和数据类型转换成Observables看看他有哪些实现方法
可以看到挺多的,然后,这个方法跟rx1.0还不一样,Rx1.0是重载的形式,它这个2.0直接细化到了api方法名上面了。本例先从简单的入手,fromArray,fromIterable…,至于这个Feture这个入参,这牵涉到与”Runnable、Callable、Future、FutureTask”这一系列的内容,本文暂时不展开,等研究透彻之后再补上这块内容。
示例:
package zj.com.creating;
import java.util.ArrayList;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class From {
private static String TAG = "From";
public static void main(String[] args) {
ArrayList list = new ArrayList();
for (int i = 0; i < 7; i++) {
list.add("星期" + i);
}
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
}
}
运行结果:
RxJava FromonSubscribe __
RxJava FromonNext __ 星期0
RxJava FromonNext __ 星期1
RxJava FromonNext __ 星期2
RxJava FromonNext __ 星期3
RxJava FromonNext __ 星期4
RxJava FromonNext __ 星期5
RxJava FromonNext __ 星期6
RxJava FromonComplete __
Defer
- do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
在observer订阅之前(也就是调用subscribe方法之前)都不创建Observable对象,然后当产生订阅关系的时候,才会为每一个obser创新创建一个Observable。
下边我们来进行验证。
首先看到这个defer就一个方法,没有重载函数。它的入参是泛型Callable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}
- 示例代码1:
deferObservable = Observable.just(valueStr);
valueStr = "22222222222";
deferObservable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(TAG + "________" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
运行结果:
Defer________1111111111111
- 示例代码2:
deferObservable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(valueStr);
}
});
valueStr = "22222222222";
deferObservable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(TAG + "________" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
运行结果:
Defer________22222222222
可以看出用defer,因为在subscribe之前,Observable并没有创建,所以对其进行赋值是有效的,所以输出结果是修改之后的22222。而直接用create或者just,在一开始初始化的时候Observable对象就已经创建了,所以再次对其赋值已经没用,所以最后输出的结果是一开始的111111。
Range
create an Observable that emits a particular range of sequential integers
创建一个发射特定连续证书区间的Observablerange does not by default operate on any particular Scheduler, but there is a variant that allows you to set the Scheduler by passing one in as a parameter.
range不会再特定的调度器上起作用,但是有个入参允许你将一个调度器作为参数传递进来设置可以看到range这个方法重载挺多
代码示例1:
Observable.range(1, 10).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(Integer s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
输出:
RxJava RangeonSubscribe __
RxJava RangeonNext __ 1
RxJava RangeonNext __ 2
RxJava RangeonNext __ 3
RxJava RangeonNext __ 4
RxJava RangeonNext __ 5
RxJava RangeonNext __ 6
RxJava RangeonNext __ 7
RxJava RangeonNext __ 8
RxJava RangeonNext __ 9
RxJava RangeonNext __ 10
RxJava RangeonComplete __
代码示例2:
最复杂的一个method:
/**
* Signals a range of long values, the first after some initial delay and the rest periodically after.
* <p>
* The sequence completes immediately after the last value (start + count - 1) has been reached.
* <p>
* <img width="640" height="195" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/intervalRange.s.png" alt=""> * <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you provide the {@link Scheduler}.</dd>
* </dl>
* @param start that start value of the range
* @param count the number of values to emit in total, if zero, the operator emits an onComplete after the initial delay.
* @param initialDelay the initial delay before signalling the first value (the start)
* @param period the period between subsequent values
* @param unit the unit of measure of the initialDelay and period amounts
* @param scheduler the target scheduler where the values and terminal signals will be emitted
* @return the new Observable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
if (count == 0L) {
return Observable.<Long>empty().delay(initialDelay, unit, scheduler);
}
long end = start + (count - 1);
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
看下参数:
* //start ->range的start值
* @param start that start value of the range
*
* //count ->总共发射数值的总量,如果为0,就会在delay参数之后直接调用onComplete方法
* @param count the number of values to emit in total, if zero, the operator emits an onComplete after the initial delay.
*
* //initialDelay 首个值开始发射之前的延迟时间
* @param initialDelay the initial delay before signalling the first value (the start)
*
* //period 值之间的时间间隙
* @param period the period between subsequent values
*
* //unit 测量过后的延迟时间的单位以及周期总量
* @param unit the unit of measure of the initialDelay and period amounts
*
* //scheduler 指定调度器
* @param scheduler the target scheduler where the values and terminal signals will be emitted
意思就是:延迟initialDelay个unit单位后,以period为周期,依次发射count个以start为初始值并递增的数字。
代码
//从1开始输出10个数据,延迟0秒执行,每隔1秒执行一次:
Observable.intervalRange(0, 11, 0, 1, TimeUnit.SECONDS/*,Schedulers.newThread()*/).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(Long s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
//这行是在main方法内才需要用到
try {
Thread.sleep(20000); // if this is missing then the JVM exits immediately and the new thread is stopped.
} catch (InterruptedException e) {
e.printStackTrace();
}
Empty/Never/Throw
Empty
这个操作符是直接调onComplete的。
Observable.<String>empty().subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
RxJava EmptyonSubscribe __
RxJava EmptyonComplete __
Never
这个操作符是创建一个什么都不做的Observable。(但是一旦订阅会调onSubscribe)
Observable.<String>never().subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
Throw
这个操作符是创建一个直接调onError的Observable。
Observable.<String>error(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
return null;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
输出结果:
RxJava ThrowonSubscribe __
RxJava ThrowonError __
Interval
- create an Observable that emits a sequence of integers spaced by a given time interval
创建一个发送序列带时间间隔的Observable。
还记得我们刚才写了一个rangeInterverl吗? 类似于定时器那个,大同小异。
Observable.interval(1,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(Long s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
这个方法会一直运行,直到当前线程被杀死。
RxJava IntervalonSubscribe __
RxJava IntervalonNext __ 0
RxJava IntervalonNext __ 1
RxJava IntervalonNext __ 2
RxJava IntervalonNext __ 3
RxJava IntervalonNext __ 4
RxJava IntervalonNext __ 5
RxJava IntervalonNext __ 6
RxJava IntervalonNext __ 7
RxJava IntervalonNext __ 8
RxJava IntervalonNext __ 9
RxJava IntervalonNext __ 10
RxJava IntervalonNext __ 11
...
Repeat
repeate不会创建一个Observable,它是一个辅助操作符。譬如,一个just操作符创建对象并发射,然后这个.repeate(3),表示重复发射3轮。
Observable.just(TAG + " 操作符 .... 3",TAG + " 操作符 .... 2",TAG + " 操作符 .... 1").repeat(3).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
输出结果:
RxJava RepeatonSubscribe __
RxJava RepeatonNext __ Repeat 操作符 .... 3
RxJava RepeatonNext __ Repeat 操作符 .... 2
RxJava RepeatonNext __ Repeat 操作符 .... 1
RxJava RepeatonNext __ Repeat 操作符 .... 3
RxJava RepeatonNext __ Repeat 操作符 .... 2
RxJava RepeatonNext __ Repeat 操作符 .... 1
RxJava RepeatonNext __ Repeat 操作符 .... 3
RxJava RepeatonNext __ Repeat 操作符 .... 2
RxJava RepeatonNext __ Repeat 操作符 .... 1
RxJava RepeatonComplete __
Timer
- 创建一个Observable,它在一个给定的延迟后发射一个值,需要注意,它只发射一次,所以有别于之前用的TimerTask。timer()只是用来创建一个Observable,并延迟发送一次的操作符,并不会按周期执行。
Observable.<String>timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(TAG + "_________" +"onSubscribe");
}
@Override
public void onNext(Long aLong) {
System.out.println(TAG + "_____onNext____" +aLong);
}
@Override
public void onError(Throwable e) {
System.out.println(TAG + "_________" +"onError");
}
@Override
public void onComplete() {
System.out.println(TAG + "_________" +"onComplete");
}
});
输出:
Timer_________onSubscribe
Timer_____onNext____0
Timer_________onComplete
Start
这个start也不是一个创建对象的操作符,他就跟repeat一样,是一个辅助操作符。它主要是在发射之前插入一个我们定义好的元素在发射队列的头。
具体有这么几个方法:
private static String TAG = "Start";
public static void main(String[] args) {
Observable.just(TAG + " 操作符 .... 3",TAG + " 操作符 .... 2",TAG + " 操作符 .... 3").startWith("乎乎").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("RxJava " + TAG + "onSubscribe __ ");
}
@Override
public void onNext(String s) {
System.out.println("RxJava " + TAG + "onNext __ " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("RxJava " + TAG + "onError __ ");
}
@Override
public void onComplete() {
System.out.println("RxJava " + TAG + "onComplete __ ");
}
});
}
输出:
RxJava StartonSubscribe __
RxJava StartonNext __ 乎乎
RxJava StartonNext __ Start 操作符 .... 3
RxJava StartonNext __ Start 操作符 .... 2
RxJava StartonNext __ Start 操作符 .... 3
RxJava StartonComplete __
Thanks
https://blog.csdn.net/jdsjlzx/article/details/52912701
https://www.imooc.com/video/15533
Demo
想获取demo的朋友:
这个链接是需要3分的,分多的朋友想散分给小弟可以下载这个链接。小弟先谢啦。
https://download.csdn.net/download/user11223344abc/10404664
当然我也会给出免分的链接,让像我这样没分的朋友也可以下载demo,我的github:
https://github.com/zj614android/AopDemo