RxJava2.0的基本使用
初次接触RxJava:
1.导入依赖:
implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
2.大概意思:
rxjava就是一种观察者模式下的编程,由被观察者,观察者和订阅关系组成。
3.简单示例(同步操作,暂时还不会异步)
package com.example.play;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
private Disposable mDisposable;
private static final String TAG="JavaTag";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//被观察者
Observable novel=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//计划表
emitter.onNext("连载1");
emitter.onNext("连载2");
emitter.onNext("连载3");
emitter.onComplete();
}
});
//观察者
Observer<String> reader=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable=d;
Log.e(TAG,"onSubscribe");
}
@Override
public void onNext(String value) {
if ("2".equals(value)){
mDisposable.dispose();
return;
}
Log.e(TAG,"onNext:"+value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError="+e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete()");
}
};
novel.subscribe(reader);//一行代码搞定
}
}
运行结果:
这就是一个模板示例,需要注意的就是Disposable对象,它其实是用来控制订阅关系的,调用dispose()方法就可以停止订阅关系,就好像一部连载的小说,你不再接收推送更新的消息了。
比如:我修改被观察者订阅事件表
Observable novel=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//计划表
emitter.onNext("连载1");
emitter.onNext("连载2");
emitter.onNext("连载3");
emitter.onNext("2");
emitter.onNext("连载4");
emitter.onComplete();
}
});
新增了"连载4"和一条"2",我们在观察者的订阅表中做过判断,如果收到计划为"2"就调用dispose()方法,接下来我们看一下运行结果:
推送果然停止了,你的第4次连载我不再接收了。
4.小结:
我们观察运行结果可以发现,onSubscribe()方法总是第一个调用,然后调用onNext()方法,所有计划顺利完成之后会调用onComplete()方法。
RxJava学习-简书(强烈推荐)
RxJava2.0的异步和链式编程
异步:
何为异步,同步的反义词就是异步,RxJava的真正强大之处就在于异步处理。其他的不说了,先上一个示例:
package com.example.play;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
private Disposable mDisposable;
private static final String TAG="JavaTag";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//这里面可以做相关逻辑操作,处理,通过onNext返回到主线程,执行UI等
e.onNext("连载1");
e.onNext("连载2");
e.onNext("连载3");
e.onComplete();
}
})
.observeOn(AndroidSchedulers.mainThread()) //回调在主线程
.subscribeOn(Schedulers.io()) //执行在io线程
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onNext(String value) {
//这里也可以添加相应操作,即回调的时候
Log.e(TAG,"onNext"+value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError"+e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"Complete");
}
});
}
}
运行结果:
RxJava是支持异步的,但是RxJava是如何做到的呢?这里就需要Scheduler。Scheduler,英文名调度器,它是RxJava用来控制线程。当我们没有设置的时候,RxJava遵循哪个线程产生就在哪个线程消费的原则,也就是说线程不会产生变化,始终在同一个。然后我们一般使用RxJava都是后台执行,前台调用,本着这个原则,我们需要调用observeOn(AndroidSchedulers.mainThread()),observeOn是事件回调的线程,AndroidSchedulers.mainThread()一看就知道是主线程,subscribeOn(Schedulers.io()),subscribeOn是事件执行的线程,Schedulers.io()是子线程,这里也可以用Schedulers.newThread(),只不过io线程可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。
异步实例
package com.example.play;
import androidx.appcompat.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.widget.TextView;
import java.io.IOException;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
public class MainActivity extends AppCompatActivity {
private Disposable mDisposable;
private static final String TAG="JavaTag";
private TextView show_text;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
show_text=(TextView)findViewById(R.id.show_text);
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//这里面可以做相关逻辑操作,处理,通过onNext返回到主线程,执行UI等
e.onNext(sendInternetRequest());
e.onComplete();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onNext(String value) {
Log.e(TAG,"onNext"+value);
show_text.setText(value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,"onError"+e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"Complete");
}
});
}
private String sendInternetRequest() {
String responseData = null;
try {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://www.baidu.com")
.build();
Response response = client.newCall(request).execute();
responseData=response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
return responseData;
}
}
进行网络请求,回调时设置UI显示数据。