1,from将一组数据,按顺序依次发射出去
private void rxFromUse() {
List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
list.add("from4");
Observable fromObservable = Observable.from(list);
fromObservable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("1--" + s);
}
});
}
2,interval定时操作,设置每隔段时间发送一个整形的数值
private void rxInterval() {
Observable intervalObservable = Observable.interval(1, TimeUnit.SECONDS);
Subscription subscription = intervalObservable.subscribe(new Action1<Long>() {
@Override
public void call(Long o) {
//System.out.println("1--Interval-----" + o);
}
});
}
3,map转化,将数据按某种函数处理后在发送
private void rxMap() {
Observable.just("abc")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return "2222";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
4,flatMap将数据处理后依然返回Observable对象
private void rxFlatMap() {
query("22222")
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(List<String> urls) {
return Observable.from(urls);
}
})
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return !s.equals("1--3333");
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
}
Observable<List<String>> query(String text) {
List<String> l = new ArrayList();
l.add("1--2222");
l.add("1--3333");
l.add("1--4444");
return Observable.just(l);
}
5,线程切换
private void rxScheduler() {
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("number:" + integer);
}
});
}
/**
* 模拟在线程中长时间获取数据,在主线程中显示数据
*/
private void rxObservable() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("初始化。。。");
SystemClock.sleep(2000);
subscriber.onNext("休眠2秒");
SystemClock.sleep(3000);
subscriber.onNext("休眠3秒");
SystemClock.sleep(5000);
subscriber.onNext("休眠5秒");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("2--onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("2--" + s);
}
});
}
6
/**
* 按顺序连接多个Observables。需要注意的是Observable.concat(a,b)等价于a.concatWith(b)
*/
private void rxConcat() {
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable.concat(observable1, observable2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("1--Concat----" + integer);
}
});
}
7
/**
* 在数据序列的开头增加一项数据。startWith的内部也是调用了concat
*/
private void rxStartWith() {
Observable.just(1, 3, 4, 5)
.startWith(6, 7, 8, 9)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("1--StartWith------" + integer);
}
});
}
/**
* 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。
* 其中mergeDelayError将异常延迟到其它没有错误
* 的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知
*/
private void rxMerge() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
Observable<Integer> observable1 = Observable.just(6, 8, 9);
Observable.merge(observable, observable1).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("1--Merge----" + integer);
}
});
}
/**
* zip--使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不
* 一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合
*/
private void rxZip() {
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);
Observable<Integer> observable2 = Observable.just(5, 6, 7);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer + "--and--" + integer2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("1--Zip-----" + s);
}
});
}
/**
*
*/
private void rxCombineLatest() {
Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);
Observable<Integer> observable2 = Observable.just(5, 6, 7);
Observable.combineLatest(observable1, observable2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer + "--and--" + integer2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("1--CombineLatest-----" + s);
}
});
}
/**
* 过滤数据。内部通过OnSubscribeFilter过滤数据。
*/
private void rxOfType() {
Observable.just(3, 4, 5, 6, "7")
.ofType(Integer.class)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("1--OfType-----" + integer);
}
});
}
/**
* take--只发射开始的N项数据或者一定时间内的数据。内部通过OperatorTake和OperatorTakeTimed过滤数据。
* takeLast--只发射最后的N项数据或者一定时间内的数据]
* takeFirst--提取满足条件的第一项
* first/firstOrDefault--只发射第一项(或者满足某个条件的第一项)数据,可以指定默认值
* last/lastOrDefault--只发射最后一项(或者满足某个条件的最后一项)数据,可以指定默认值
* skip--跳过开始的N项数据或者一定时间内的数据
* skipLast--跳过最后的N项数据或者一定时间内的数据
*/
private void rxTake() {
Observable.just(3, 4, 5, 6, 7)
.take(3)//发射前3个数据项
.take(100, TimeUnit.MILLISECONDS)//发射100毫秒内的数据
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("1--Take-----" + integer);
}
});
}
/**
* Distinct--过滤重复数据
* distinctUntilChanged--过滤掉连续重复的数据
*/
private void rxDistinct() {
List<String> l = new ArrayList();
l.add("1--2222");
l.add("1--2222");
l.add("1--4444");
Observable.from(l)
.distinct()
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("3--Distinct-----" + s);
}
});
}
/**
* 超时
*/
private void rxTimeOut() {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
subscriber.onCompleted();
}
}).timeout(200, TimeUnit.MILLISECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("3--TimeOut---onError--" + e.toString());
}
@Override
public void onNext(Integer integer) {
System.out.println("3--TimeOut---onNext--" + integer);
}
});
}
/**
* all---判断所有的数据项是否满足某个条件
* exists---判断是否存在数据项满足某个条件
* contains---判断在发射的所有数据项中是否包含指定的数据,内部调用的其实是exists
*/
private void rxAll() {
List<Integer> l = new ArrayList();
l.add(2);
l.add(3);
l.add(4);
Observable.from(l).all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer s) {
return s > 3;
}
}).subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
System.out.println("3--All---Boolean--" + aBoolean);
}
});
}
/**
* sequenceEqual: 用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
*/
private void rxSequenceEqual() {
List<String> list1 = new ArrayList();
list1.add("123");
list1.add("234");
list1.add("456");
List<String> list2 = new ArrayList<>();
list2.add("123");
list2.add("234");
list2.add("456");
Observable.sequenceEqual(Observable.from(list1), Observable.from(list2))
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
System.out.println("3--SequenceEqual---Boolean--" + aBoolean);
}
});
}
/**
* reduce: 对序列使用reduce()函数并发射最终的结果
* collect: 使用collect收集数据到一个可变的数据结构
*/
private void rxReduce() {
Observable.just(2, 3, 4, 5)
.reduce((sum, item) -> sum + item)
.subscribe(integer -> Log.d("1--", integer.toString()));//14
Observable.just(3, 4, 5, 6)
.collect(new Func0<List<Integer>>() { //创建数据结构
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new Action2<List<Integer>, Integer>() { //收集器
@Override
public void call(List<Integer> integers, Integer integer) {
integers.add(integer);
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
}
/**
* toList: 收集原始Observable发射的所有数据到一个列表
* toSortedList: 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表
* toMap: 将序列数据转换为一个Map。我们可以根据数据项生成key和生成value。
* map: 对Observable发射的每一项数据都应用一个函数来变换
*/
private void rxToList() {
Observable.just(2, 3, 4, 5)
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
Observable.just(6, 2, 3, 4, 5)
.toSortedList(new Func2<Integer, Integer, Integer>() {//自定义排序
@Override
public Integer call(Integer integer, Integer integer2) {
return integer - integer2; //>0 升序 ,<0 降序
}
})
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.d("JG", integers.toString()); // [2, 3, 4, 5, 6]
}
});
Observable.just(6, 2, 3, 4, 5)
.toMap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "key:" + integer; //根据数据项生成map的key
}
}, new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "value:" + integer; //根据数据项生成map的kvalue
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> stringStringMap) {
Log.d("JG", stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
}
});
Observable.just(6, 2, 3, 4, 5)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "item:" + integer;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("JG", s);
}
});//item:6,item:2....
}
/**
* retry: 当原始Observable在遇到错误时进行重试
* retryWhen: 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry
*/
private void rxRetry() {
Observable.just(1, "2", 3)
.cast(Integer.class)
.retry(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("4--", integer.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d("4--", "onError");
}
})
;//1,1,1,1,onError
Observable.just(1, "2", 3)
.cast(Integer.class)
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
@Override
public Observable<Long> call(Observable<? extends Throwable> observable) {
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("JG", integer.toString());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.d("JG", "onError");
}
});
//1,1
}
/**
* materialize: 将Observable转换成一个通知列表。
*/
private void rxMaterialize() {
Observable.just(1, 2, 3)
.materialize()
.subscribe(new Action1<Notification<Integer>>() {
@Override
public void call(Notification<Integer> integerNotification) {
Log.d("4--", integerNotification.getKind() + " " + integerNotification.getValue());
}
});
}