Android通过Okhttp3实现socket长连接
由于项目中用到了实时刷新服务器数据的功能,通常的方式有两种:
1.客户端定时请求服务器,然后刷新本地数据(这种方式客户端请求量较大,对服务端带宽也有要求)。
2.通过客户端与服务器端建立socket长连接的方式,客户端获取到服务器数据变化,进行数据的刷新操作(需要客户端和服务器配合,建立socket长连接)。
两种方式在之前的工作过程中都有遇到,其优劣网上的介绍也比较多,在此不做过多分析,本文主要介绍第二种方式,即通过客户端与服务端建立socket长连接方式。
通过客户端与服务器端建立socket长连接,网上也有很多的方式,之前曾用过github上的一个开源工具——android-websocket,具使用方法,github上有详细介绍。接下来开始正式介绍通过Okhttp3实现socket长连接。
1.封装一个WebSocket状态的类WebSStatus
主要用于记录当前Socket连接状态(此处封装方法参考WebSocket封装)
public class WsStatus {
public final static int CONNECTED = 1;
public final static int CONNECTING = 0;
public final static int RECONNECT = 2;
public final static int DISCONNECTED = -1;
class CODE {
public final static int NORMAL_CLOSE = 1000;
public final static int ABNORMAL_CLOSE = 1001;
}
class TIP {
public final static String NORMAL_CLOSE = "normal close";
public final static String ABNORMAL_CLOSE = "abnormal close";
}
}
2.创建一个socket接口IWsManager
主要用于封装长连接所需要的方法
interface IWsManager {
WebSocket getWebSocket();
void startConnect();
void stopConnect();
boolean isWsConnected();
int getCurrentStatus();
void setCurrentStatus(int currentStatus);
boolean sendMessage(String msg);
boolean sendMessage(ByteString byteString);
}
3.创建长连接状态监听类WsStatusListener
创建一个抽象类,定义不同状态的方法
public abstract class WsStatusListener {
public void onOpen(Response response) {
}
public void onMessage(String text) {
}
public void onMessage(ByteString bytes) {
}
public void onReconnect() {
}
public void onClosing(int code, String reason) {
}
public void onClosed(int code, String reason) {
}
public void onFailure(Throwable t, Response response) {
}
}
4.创建socket长连接管理类
主要用于实现IWsManager接口
public class WsManager implements IWsManager {
private final static int RECONNECT_INTERVAL = 10 * 1000; //重连自增步长
private final static long RECONNECT_MAX_TIME = 120 * 1000; //最大重连间隔
private Context mContext;
private String wsUrl;
private WebSocket mWebSocket;
private OkHttpClient mOkHttpClient;
private Request mRequest;
private int mCurrentStatus = WsStatus.DISCONNECTED; //websocket连接状态
private boolean isNeedReconnect; //是否需要断线自动重连
private boolean isManualClose = false; //是否为手动关闭websocket连接
private WsStatusListener wsStatusListener;
private Lock mLock;
private Handler wsMainHandler = new Handler(Looper.getMainLooper());
private int reconnectCount = 3; //重连次数
private Runnable reconnectRunnable = new Runnable() {
@Override
public void run() {
if (wsStatusListener != null) {
wsStatusListener.onReconnect();
}
buildConnect();
}
};
private WebSocketListener mWebSocketListener = new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, final Response response) {
mWebSocket = webSocket;
setCurrentStatus(WsStatus.CONNECTED);
connected();
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onOpen(response);
}
});
} else {
wsStatusListener.onOpen(response);
}
}
}
@Override
public void onMessage(WebSocket webSocket, final ByteString bytes) {
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onMessage(bytes);
}
});
} else {
wsStatusListener.onMessage(bytes);
}
}
}
@Override
public void onMessage(WebSocket webSocket, final String text) {
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onMessage(text);
}
});
} else {
wsStatusListener.onMessage(text);
}
}
}
@Override
public void onClosing(WebSocket webSocket, final int code, final String reason) {
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onClosing(code, reason);
}
});
} else {
wsStatusListener.onClosing(code, reason);
}
}
}
@Override
public void onClosed(WebSocket webSocket, final int code, final String reason) {
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onClosed(code, reason);
}
});
} else {
wsStatusListener.onClosed(code, reason);
}
}
}
@Override
public void onFailure(WebSocket webSocket, final Throwable t, final Response response) {
tryReconnect();
if (wsStatusListener != null) {
if (Looper.myLooper() != Looper.getMainLooper()) {
wsMainHandler.post(new Runnable() {
@Override
public void run() {
wsStatusListener.onFailure(t, response);
}
});
} else {
wsStatusListener.onFailure(t, response);
}
}
}
};
public WsManager(Builder builder) {
mContext = builder.mContext;
wsUrl = builder.wsUrl;
isNeedReconnect = builder.needReconnect;
mOkHttpClient = builder.mOkHttpClient;
this.mLock = new ReentrantLock();
}
private void initWebSocket() {
if (mOkHttpClient == null) {
mOkHttpClient = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.build();
}
if (mRequest == null) {
mRequest = new Request.Builder()
.url(wsUrl)
.build();
}
mOkHttpClient.dispatcher().cancelAll();
try {
mLock.lockInterruptibly();
try {
mOkHttpClient.newWebSocket(mRequest, mWebSocketListener);
} finally {
mLock.unlock();
}
} catch (InterruptedException e) {
}
}
@Override
public WebSocket getWebSocket() {
return mWebSocket;
}
public void setWsStatusListener(WsStatusListener wsStatusListener) {
this.wsStatusListener = wsStatusListener;
}
@Override
public synchronized boolean isWsConnected() {
return mCurrentStatus == WsStatus.CONNECTED;
}
@Override
public synchronized int getCurrentStatus() {
return mCurrentStatus;
}
@Override
public synchronized void setCurrentStatus(int currentStatus) {
this.mCurrentStatus = currentStatus;
}
@Override
public void startConnect() {
isManualClose = false;
buildConnect();
}
@Override
public void stopConnect() {
isManualClose = true;
disconnect();
}
private void tryReconnect() {
if (!isNeedReconnect | isManualClose) {
return;
}
if (!isNetworkConnected(mContext)) {
setCurrentStatus(WsStatus.DISCONNECTED);
return;
}
setCurrentStatus(WsStatus.RECONNECT);
long delay = reconnectCount * RECONNECT_INTERVAL;
wsMainHandler
.postDelayed(reconnectRunnable, delay > RECONNECT_MAX_TIME ? RECONNECT_MAX_TIME : delay);
reconnectCount++;
}
private void cancelReconnect() {
wsMainHandler.removeCallbacks(reconnectRunnable);
reconnectCount = 0;
}
private void connected() {
cancelReconnect();
}
private void disconnect() {
if (mCurrentStatus == WsStatus.DISCONNECTED) {
return;
}
cancelReconnect();
if (mOkHttpClient != null) {
mOkHttpClient.dispatcher().cancelAll();
}
if (mWebSocket != null) {
boolean isClosed = mWebSocket.close(WsStatus.CODE.NORMAL_CLOSE, WsStatus.TIP.NORMAL_CLOSE);
//非正常关闭连接
if (!isClosed) {
if (wsStatusListener != null) {
wsStatusListener.onClosed(WsStatus.CODE.ABNORMAL_CLOSE, WsStatus.TIP.ABNORMAL_CLOSE);
}
}
}
setCurrentStatus(WsStatus.DISCONNECTED);
}
private synchronized void buildConnect() {
if (!isNetworkConnected(mContext)) {
setCurrentStatus(WsStatus.DISCONNECTED);
return;
}
switch (getCurrentStatus()) {
case WsStatus.CONNECTED:
case WsStatus.CONNECTING:
break;
default:
setCurrentStatus(WsStatus.CONNECTING);
initWebSocket();
}
}
//发送消息
@Override
public boolean sendMessage(String msg) {
return send(msg);
}
@Override
public boolean sendMessage(ByteString byteString) {
return send(byteString);
}
private boolean send(Object msg) {
boolean isSend = false;
if (mWebSocket != null && mCurrentStatus == WsStatus.CONNECTED) {
if (msg instanceof String) {
isSend = mWebSocket.send((String) msg);
} else if (msg instanceof ByteString) {
isSend = mWebSocket.send((ByteString) msg);
}
//发送消息失败,尝试重连
if (!isSend) {
tryReconnect();
}
}
return isSend;
}
public static final class Builder {
private Context mContext;
private String wsUrl;
private boolean needReconnect = true;
private OkHttpClient mOkHttpClient;
public Builder(Context val) {
mContext = val;
}
public Builder wsUrl(String val) {
wsUrl = val;
return this;
}
public Builder client(OkHttpClient val) {
mOkHttpClient = val;
return this;
}
public Builder needReconnect(boolean val) {
needReconnect = val;
return this;
}
public WsManager build() {
return new WsManager(this);
}
}
此处代码主要功能点有,实现Okhttp3中WebSocketListener的监听方法,用于建立长连接后,服务端消息的获取。其中,
mOkHttpClient = new OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.build();
即Okhttp3中创建client方法,retryOnConnectionFailure(true)表示返回连接失败时重试。
Request request = new Request.Builder()
.url(wsUrl)
.build();
即Okhttp3中传入长连接地址wsUrl,创建长连接请求对象。
send(Object msg)方法,则是在建立长连接后,通过onOpen()方法获取到的WebSocket,进行消息的发送,支持的消息格式除了字符类型的文本内容,还可以将如图像,声音,视频等内容转为ByteString发送。WsStatusListener中的onOpen()方法主要用于,完成长连接的初始化后,进行长连接的初始化操作,比如连接初始化、心跳等操作;onMessage()方法主要用于服务端消息的接收,可根据具体连接协议对返回信息做相应的处理;onClosed()方法主要用于长连接断开后的处理,如心跳的处理等;onFailure()方法主要用于长连接异常的处理。
5.长连接的初始化
wsBaseManager = new WsManager.Builder(getBaseContext())
.client(new OkHttpClient().newBuilder()
.pingInterval(15, TimeUnit.SECONDS)
.retryOnConnectionFailure(true)
.build())
.needReconnect(true)
.wsUrl(WEBSOCKET_URL)
.build();
wsBaseManager.setWsStatusListener(wsBaseStatusListener);
wsBaseManager.startConnect();
其中WEBSOCKET_URL即为长连接地址。
创建监听类wsBaseStatusListener
WsStatusListener wsBaseStatusListener = new WsStatusListener() {
@Override
public void onOpen(Response response) {
super.onOpen(response);
//协议初始化 心跳等
}
@Override
public void onMessage(String text) {
super.onMessage(text);
//消息处理
}
@Override
public void onMessage(ByteString bytes) {
super.onMessage(bytes);
//消息处理
}
@Override
public void onClosing(int code, String reason) {
super.onClosing(code, reason);
}
@Override
public void onClosed(int code, String reason) {
super.onClosed(code, reason);
}
@Override
public void onFailure(Throwable t, Response response) {
super.onFailure(t, response);
}
};
至此,通过Okhttp3建立socket长连接的方法就基本介绍完毕了,如有问题或者不同见解欢迎留言。