版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xiogjie_67/article/details/78362586
package com.gdc.rxjava;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.Subscriber;
public class TestFilting {
public static void main(String[] args) {
// testDebounce();
// testDistinct();
// testElementAt();
// testFilter();
// testFirst();
// testIgnoreElements();
// testLast();
// testSample();
// testSkip();
// testTake();
testTakeLast();
}
private static void testDebounce() {
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> arg0) {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg0.onNext(i);
}
arg0.onCompleted();
} catch (InterruptedException e) {
arg0.onError(e);
}
}
}).debounce(2, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testDistinct() {
Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testElementAt() {
Observable.just(1, 2, 3, 2, 3).elementAt(3).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testFilter() {
Observable.just(1, 2, 3, 2, 3).distinct().filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer arg0) {
// 在此指定过滤规则
return arg0 > 2;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testFirst() {
Observable.just(9, 2, 3, 2, 3).distinct().first().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testIgnoreElements() {
Observable.just(123).ignoreElements().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testLast() {
Observable.just(9, 2, 3, 2, 3).distinct().last().subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testSample() {
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> arg0) {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
arg0.onNext(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).sample(4, TimeUnit.SECONDS).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testSkip() {
Observable.just(1, 2, 3, 4, 5).skip(2).skipLast(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testTake() {
Observable.just(1, 2, 3, 4, 5).take(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
private static void testTakeLast() {
Observable.just(1, 2, 3, 4, 5).takeLast(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
System.out.println("onError():" + arg0);
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():" + arg0);
}
});
}
}