此源码分析基于 socket.io-client-java 0.9.0
先来个 socket.io-client-java 客户端实例:
import java.net.URISyntaxException;
import java.util.Scanner;
import com.itdreamer.util.DataUtil;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
public class Client {
public static void main(String[] args) throws URISyntaxException, InterruptedException {
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
//失败重连的时间间隔
options.reconnectionDelay = 1000;
//连接超时时间(ms)
options.timeout = 500;
final Socket socket = IO.socket("http://localhost:8090", options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println(DataUtil.getCurrentDataTime() + ":client connect! ");
socket.send("hello server, my name is client");
}
});
socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println(DataUtil.getCurrentDataTime() + ":client disconnect!");
}
});
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object obj : args) {
System.out.println(DataUtil.getCurrentDataTime() + ":receive server message="+obj);
}
}
});
socket.connect();
System.out.println(DataUtil.getCurrentDataTime() + ":client console input......");
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String message = sc.next();
System.out.println(DataUtil.getCurrentDataTime() + ":client console send data="+message);
socket.send(message);
}
}
}
socket.io-client-java 封装得很不错,只要简单几个方法,把自己的业务逻辑实现就搞定了。
本方从以下两个方面进行分析:
1、socket 连接是如何创建的
2、服务端响应的信息是如何接收并调用定义事件对应Listener的
socket 连接是如何创建的
连接门面对象
io.socket.client.Socket
入口方法
socket.connect();
/**
* Connects the socket.
*/
public Socket connect() {
return this.open();
}
/**
* Connects the socket.
*/
public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Socket.this.connected) return;
Socket.this.subEvents();
Socket.this.io.open(); // ensure open
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
Socket.this.emit(EVENT_CONNECTING);
}
});
return this;
}
设置底层共用事件监听器
io.socket.client.Manager
public Manager open(){
return open(null);
}
/**
* Connects the client.
*
* @param fn callback.
* @return a reference to this object.
*/
public Manager open(final OpenCallback fn) {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("readyState %s", Manager.this.readyState));
if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return;
logger.fine(String.format("opening %s", Manager.this.uri));
Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
final io.socket.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
Manager.this.readyState = ReadyState.OPENING;
Manager.this.skipReconnect = false;
// 添加一系列共用事件监听器
// propagate transport event.
socket.on(Engine.EVENT_TRANSPORT, new Listener() {
@Override
public void call(Object... args) {
self.emit(Manager.EVENT_TRANSPORT, args);
}
});
final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
@Override
public void call(Object... objects) {
self.onopen();
if (fn != null) fn.call(null);
}
});
On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
logger.fine("connect_error");
self.cleanup();
self.readyState = ReadyState.CLOSED;
self.emitAll(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception) data : null);
fn.call(err);
} else {
// Only do this if there is no fn to handle the error
self.maybeReconnectOnOpen();
}
}
});
if (Manager.this._timeout >= 0) {
final long timeout = Manager.this._timeout;
logger.fine(String.format("connection attempt will timeout after %d", timeout));
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("connect attempt timed out after %d", timeout));
openSub.destroy();
socket.close();
socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
self.emitAll(EVENT_CONNECT_TIMEOUT, timeout);
}
});
}
}, timeout);
Manager.this.subs.add(new On.Handle() {
@Override
public void destroy() {
timer.cancel();
}
});
}
Manager.this.subs.add(openSub);
Manager.this.subs.add(errorSub);
//建立连接
Manager.this.engine.open();
}
});
return this;
}
根据transportName创建 Transport 传输对象
io.socket.engineio.client.Socket
/**
* Connects the client.
*
* @return a reference to to this object.
*/
public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
String transportName;
if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) {
transportName = WebSocket.NAME;
} else if (0 == Socket.this.transports.size()) {
// Emit error on next tick so it can be listened to
final Socket self = Socket.this;
EventThread.nextTick(new Runnable() {
@Override
public void run() {
self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available"));
}
});
return;
} else {
transportName = Socket.this.transports.get(0);
}
Socket.this.readyState = ReadyState.OPENING;
// 根据transportName创建具体Transport对象,这里是WebSocket
Transport transport = Socket.this.createTransport(transportName);
Socket.this.setTransport(transport);
//打开连接
transport.open();
}
});
return this;
}
private Transport createTransport(String name) {
logger.fine(String.format("creating transport '%s'", name));
Map<String, String> query = new HashMap<String, String>(this.query);
query.put("EIO", String.valueOf(Parser.PROTOCOL));
query.put("transport", name);
if (this.id != null) {
query.put("sid", this.id);
}
Transport.Options opts = new Transport.Options();
opts.hostname = this.hostname;
opts.port = this.port;
opts.secure = this.secure;
opts.path = this.path;
opts.query = query;
opts.timestampRequests = this.timestampRequests;
opts.timestampParam = this.timestampParam;
opts.policyPort = this.policyPort;
opts.socket = this;
opts.callFactory = this.callFactory;
opts.webSocketFactory = this.webSocketFactory;
// 这里 look ...........
Transport transport;
if (WebSocket.NAME.equals(name)) {
transport = new WebSocket(opts);
} else if (Polling.NAME.equals(name)) {
transport = new PollingXHR(opts);
} else {
throw new RuntimeException();
}
this.emit(EVENT_TRANSPORT, transport);
return transport;
}
调用Transport实现类WebSocket对象创建连接
io.socket.engineio.client.Transport
public Transport open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Transport.this.readyState == ReadyState.CLOSED || Transport.this.readyState == null) {
Transport.this.readyState = ReadyState.OPENING;
Transport.this.doOpen();
}
}
});
return this;
}
调用okhttp3进行连接创建
io.socket.engineio.client.transports.WebSocket
protected void doOpen() {
Map<String, List<String>> headers = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
this.emit(EVENT_REQUEST_HEADERS, headers);
final WebSocket self = this;
okhttp3.WebSocket.Factory factory = webSocketFactory != null ? webSocketFactory : new OkHttpClient();
// 请求链接设置 uri()
Request.Builder builder = new Request.Builder().url(uri());
for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
for (String v : entry.getValue()) {
builder.addHeader(entry.getKey(), v);
}
}
final Request request = builder.build();
ws = factory.newWebSocket(request, new WebSocketListener() {
// 打开连接
@Override
public void onOpen(okhttp3.WebSocket webSocket, Response response) {
final Map<String, List<String>> headers = response.headers().toMultimap();
EventThread.exec(new Runnable() {
@Override
public void run() {
self.emit(EVENT_RESPONSE_HEADERS, headers);
self.onOpen();
}
});
}
// 接收字符消息
@Override
public void onMessage(okhttp3.WebSocket webSocket, final String text) {
if (text == null) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onData(text);
}
});
}
// 接收byte消息
@Override
public void onMessage(okhttp3.WebSocket webSocket, final ByteString bytes) {
if (bytes == null) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onData(bytes.toByteArray());
}
});
}
// 关闭连接
@Override
public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onClose();
}
});
}
// 异常处理
@Override
public void onFailure(okhttp3.WebSocket webSocket, final Throwable t, Response response) {
if (!(t instanceof Exception)) {
return;
}
EventThread.exec(new Runnable() {
@Override
public void run() {
self.onError("websocket error", (Exception) t);
}
});
}
});
}
// 组合请求链接
protected String uri() {
Map<String, String> query = this.query;
if (query == null) {
query = new HashMap<String, String>();
}
String schema = this.secure ? "wss" : "ws";
String port = "";
if (this.port > 0 && (("wss".equals(schema) && this.port != 443)
|| ("ws".equals(schema) && this.port != 80))) {
port = ":" + this.port;
}
if (this.timestampRequests) {
query.put(this.timestampParam, Yeast.yeast());
}
String derivedQuery = ParseQS.encode(query);
if (derivedQuery.length() > 0) {
derivedQuery = "?" + derivedQuery;
}
boolean ipv6 = this.hostname.contains(":");
return schema + "://" + (ipv6 ? "[" + this.hostname + "]" : this.hostname) + port + this.path + derivedQuery;
}
okhttp3连接创建[实际创建连接处]
请看 --》okhttp websocket源码篇
至此连接创建流程结束
服务端响应信息是如何接收并调用定义事件对应Listener
从上面连接的建立 okhttp3、WebSocket 部分可以看出,okhttp3在连接创建后会运行一个轮询读取websocket消息的方法
/** 开启轮询读取websocket的消息 */ loopReader();
而实际收到消息后,会调用 WebSocket 监控器类定义的 onMessage 方法执行响应消息回调,如果是字符信息会是json字符串
以下对象完成响应消息业务处理方法执行
io.socket.emitter.Emitter
protected void onData(String data) {
this.onPacket(Parser.decodePacket(data));
}
protected void onPacket(Packet packet) {
this.emit(EVENT_PACKET, packet);
}
/**
* 实际回调响应消息业务处理方法
*
* Executes each of listeners with the given args.
*
* @param event an event name.
* @param args
* @return a reference to this object.
*/
public Emitter emit(String event, Object... args) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) {
for (Listener fn : callbacks) {
fn.call(args);
}
}
return this;
}
总结
从整体来看,socketio-client-java是对okttp3进行封装,隐藏掉底层实现,让我们只需调用简单API就能完成业务实现