优化程序结构:避免回调地狱
本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明.
环境:
主机:WIN10
开发环境:Android Studio 2.2 Preview 3
说明:
在编程中,经常会处理到回调函数。如果一个模块中回调函数过多,特别是在回调中处理的业务还有回调,就会导致回调嵌套。这就会导致代码结构混乱,俗称回调地狱。如下图:
来源:https://kongwsh.github.io/2016/09/11/callback/
经过思考,单片机项目中的编程经验可以应用在此处解决问题。
回调本质上是异步,类似单片机的中断。受限于单片机的主频较低,一般不在中断中处理复杂逻辑,而仅是产生信号,由状态机捕捉此信号,然后在主流程中进行处理。
单片机编程的语言是C,是面向过程的语言。在java等高级语言中,都是以面向对象编程思路为主。但处理业务流程时,恰恰是面向过程的。
参照以上思路,解决方案如下:
在回调中并不处理业务,而仅是产生信号,保存参数,然后触发状态机处理业务,这样就能让程序结构足够清晰。
源代码:
更改之前的代码:
package com.bazhangkeji.classroom.session.create;
import com.bazhangkeji.classroom.common.Events;
import com.bazhangkeji.classroom.common.RxBus;
import com.bazhangkeji.classroom.structure.SessionInfo;
import com.bazhangkeji.classroom.login.LoginState;
import com.bazhangkeji.classroom.net.Protocol;
import com.bazhangkeji.classroom.net.TcpClient;
import com.bazhangkeji.classroom.repository.cache.SessionCache;
import java.util.ArrayList;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class CreateSessionManager implements Protocol{
private GetSessionServerAddress getSessionServerAddress;
private CreateSession createSession;
private CreateSessionSuccess createSessionSuccess;
private OnResultListener observer;
private boolean isStart = false;
private SessionInfo sessionInfo;
private String dstId;
private Disposable disposableListen;
public CreateSessionManager() {
getSessionServerAddress = new GetSessionServerAddress();
createSession = new CreateSession();
createSessionSuccess = new CreateSessionSuccess();
sessionInfo = new SessionInfo();
}
/**
* 创建1V1语音会话
* @param dstId: 对方id
*/
public void create1V1Voice(String dstId, OnResultListener observer) {
if (isStart) {
return;
}
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.type = SESSION_TYPE_1V1_VOICE;
sessionInfo.members.add(LoginState.getInstance().getUserId());
sessionInfo.members.add(dstId);
this.dstId = dstId;
start(observer);
}
/**
* 创建1V1视频会话
* @param dstId: 对方id
*/
public void create1V1Video(String dstId, OnResultListener observer) {
if (isStart) {
return;
}
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.type = SESSION_TYPE_1V1_VIDEO;
sessionInfo.members.add(LoginState.getInstance().getUserId());
sessionInfo.members.add(dstId);
this.dstId = dstId;
start(observer);
}
/**
* 创建1VN语音会话
* @param groupId: 群组id
* @param audioChannel: 语音通道
* @param members: 成员列表
*/
public void create1VN(String groupId, String audioChannel, ArrayList<String> members, OnResultListener observer) {
if (isStart) {
return;
}
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.groupId = groupId;
sessionInfo.audioChannel = audioChannel;
sessionInfo.type = SESSION_TYPE_1VN;
sessionInfo.members.addAll(members);
start(observer);
}
private void start(OnResultListener observer) {
if (isStart) {
return;
}
stop();
isStart = true;
this.observer = observer;
getSessionServerAddress.start(new GetSessionServerAddress.OnResultListener() {
@Override
public void notifySuccess(Object observable, String ip, int port) {
sessionInfo.sessionServerIp = ip;
sessionInfo.sessionServerPort = port;
disposableListen = RxBus.getInstance().toObservable(Events.TcpMakeConnectResult.class)
.observeOn(Schedulers.io())
.subscribe(t -> dealTcpMakeConnectResult((Events.TcpMakeConnectResult)t));
TcpClient.getInstance().makeConnect(ip, port);
}
@Override
public void notifyFailure(Object observable) {
observer.notifyFailure(this, OnResultListener.GET_SESSION_SERVER_ADDRESS_TIMEOUT);
stop();
}
});
}
private void dealTcpMakeConnectResult(Events.TcpMakeConnectResult connectResult) {
disposableListen.dispose();
if (connectResult.result) {
switch (sessionInfo.type) {
case SESSION_TYPE_1V1_VOICE:
createSession.create1V1Voice(dstId, new ObserverCreateSession());
break;
case SESSION_TYPE_1V1_VIDEO:
createSession.create1V1Video(dstId, new ObserverCreateSession());
break;
case SESSION_TYPE_1VN:
createSession.create1VN(sessionInfo.groupId, sessionInfo.audioChannel, sessionInfo.members, new ObserverCreateSession());
break;
}
} else {
observer.notifyFailure(this, OnResultListener.WAIT_TCP_CONNECT_TIMEOUT);
stop();
}
}
/**
* 停止
*/
public void stop() {
if (disposableListen != null && !disposableListen.isDisposed()) {
disposableListen.dispose();
}
getSessionServerAddress.releaseResource();
createSession.releaseResource();
isStart = false;
}
/**
* 是否忙碌
* @return true:忙碌.false:空闲
*/
public boolean isBusy() {
return isStart;
}
class ObserverCreateSession implements CreateSession.OnResultListener {
@Override
public void notifySuccess(Object observable, SessionInfo sessionInfo) {
SessionCache.getInstance().joinSession(CreateSessionManager.this.sessionInfo);
createSessionSuccess.sendFrame();
observer.notifySuccess(CreateSessionManager.this, CreateSessionManager.this.sessionInfo);
stop();
}
@Override
public void notifyFailure(Object observable, int result) {
observer.notifyFailure(CreateSessionManager.this, result);
stop();
}
}
public interface OnResultListener {
// 结果
int SUCCESS = 0;
int GROUP_NOT_EXIST = 1;
int NOT_OWNER = 2;
int SESSION_MEMBERS_TOO_MANY = 3;
int TCP_DISCONNECT = 4;
int CREATE_SESSION_TIMEOUT = 5;
int GET_SESSION_SERVER_ADDRESS_TIMEOUT = 6;
int WAIT_TCP_CONNECT_TIMEOUT = 7;
/**
* 创建会话成功
* @param observable: 被观察者
* @param sessionInfo: 会话信息
*/
void notifySuccess(Object observable, SessionInfo sessionInfo);
/**
* 创建会话失败
* @param observable: 被观察者
* @param result: 结果
*/
void notifyFailure(Object observable, int result);
}
}
更改之后的代码:
package com.bazhangkeji.classroom.session.create;
import com.bazhangkeji.classroom.common.Events;
import com.bazhangkeji.classroom.common.RxBus;
import com.bazhangkeji.classroom.structure.SessionInfo;
import com.bazhangkeji.classroom.login.LoginState;
import com.bazhangkeji.classroom.net.Protocol;
import com.bazhangkeji.classroom.net.TcpClient;
import com.bazhangkeji.classroom.repository.cache.SessionCache;
import java.util.ArrayList;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class CreateSessionManager implements Protocol{
private final int S_FREE = 0;
private final int S_GET_SESSION_SERVER_ADDRESS = 1;
private final int S_TCP_MAKE_CONNECT = 2;
private final int S_CREATE_SESSION = 3;
// 状态机
private int stateMachine;
private boolean stateFirstRun;
// 信号
private Signal signal;
private OnResultListener observer;
private SessionInfo sessionInfo;
private String dstId;
private int createSessionFailureResult;
// 业务
private GetSessionServerAddress getSessionServerAddress;
private CreateSession createSession;
private CreateSessionSuccess createSessionSuccess;
private Disposable disposableListen;
public CreateSessionManager() {
stateMachine = S_FREE;
stateFirstRun = true;
signal = new Signal();
getSessionServerAddress = new GetSessionServerAddress();
createSession = new CreateSession();
createSessionSuccess = new CreateSessionSuccess();
sessionInfo = new SessionInfo();
}
private void stateMachineRun() {
for (int i = 0; i < 2; i++) {
switch (stateMachine) {
case S_FREE:
s_free();
break;
case S_GET_SESSION_SERVER_ADDRESS:
s_get_session_server_address();
break;
case S_TCP_MAKE_CONNECT:
s_tcp_make_connect();
break;
case S_CREATE_SESSION:
s_create_session();
break;
}
}
}
private void s_free() {
if (signal.sigStart) {
signal.init();
stateFirstRun = true;
stateMachine = S_GET_SESSION_SERVER_ADDRESS;
}
}
private void s_get_session_server_address() {
if (stateFirstRun) {
stateFirstRun = false;
getSessionServerAddress.start(new ListenGetSessionServerAddress());
}
if (signal.sigGetSessionServerAddressFailure) {
observer.notifyFailure(this, OnResultListener.GET_SESSION_SERVER_ADDRESS_TIMEOUT);
stop();
} else if (signal.sigGetSessionServerAddressSuccess) {
signal.init();
stateFirstRun = true;
stateMachine = S_TCP_MAKE_CONNECT;
}
}
private void s_tcp_make_connect() {
if (stateFirstRun) {
stateFirstRun = false;
disposableListen = RxBus.getInstance().toObservable(Events.TcpMakeConnectResult.class)
.observeOn(Schedulers.io())
.subscribe(t -> dealTcpMakeConnectResult((Events.TcpMakeConnectResult) t));
TcpClient.getInstance().makeConnect(sessionInfo.sessionServerIp, sessionInfo.sessionServerPort);
}
if (signal.sigTcpMakeConnectFailure) {
observer.notifyFailure(this, OnResultListener.WAIT_TCP_CONNECT_TIMEOUT);
stop();
} else if (signal.sigTcpMakeConnectSuccess) {
signal.init();
stateFirstRun = true;
stateMachine = S_CREATE_SESSION;
}
}
private void s_create_session() {
if (stateFirstRun) {
stateFirstRun = false;
switch (sessionInfo.type) {
case SESSION_TYPE_1V1_VOICE:
createSession.create1V1Voice(dstId, new ListenCreateSession());
break;
case SESSION_TYPE_1V1_VIDEO:
createSession.create1V1Video(dstId, new ListenCreateSession());
break;
case SESSION_TYPE_1VN:
createSession.create1VN(sessionInfo.groupId, sessionInfo.audioChannel, sessionInfo.members, new ListenCreateSession());
break;
}
}
if (signal.sigCreateSessionFailure) {
observer.notifyFailure(CreateSessionManager.this, createSessionFailureResult);
stop();
} else if (signal.sigCreateSessionSuccess) {
SessionCache.getInstance().joinSession(CreateSessionManager.this.sessionInfo);
createSessionSuccess.sendFrame();
observer.notifySuccess(CreateSessionManager.this, CreateSessionManager.this.sessionInfo);
stop();
}
}
/**
* 停止
*/
public void stop() {
signal.init();
stateFirstRun = true;
getSessionServerAddress.releaseResource();
createSession.releaseResource();
if (disposableListen != null && !disposableListen.isDisposed()) {
disposableListen.dispose();
}
stateMachine = S_FREE;
}
private void dealTcpMakeConnectResult(Events.TcpMakeConnectResult connectResult) {
disposableListen.dispose();
if (connectResult.result) {
signal.sigTcpMakeConnectSuccess = true;
stateMachineRun();
} else {
signal.sigTcpMakeConnectFailure = true;
stateMachineRun();
}
}
class ListenGetSessionServerAddress implements GetSessionServerAddress.OnResultListener {
@Override
public void notifySuccess(Object observable, String ip, int port) {
sessionInfo.sessionServerIp = ip;
sessionInfo.sessionServerPort = port;
signal.sigGetSessionServerAddressSuccess = true;
stateMachineRun();
}
@Override
public void notifyFailure(Object observable) {
signal.sigGetSessionServerAddressFailure = true;
stateMachineRun();
}
}
class ListenCreateSession implements CreateSession.OnResultListener {
@Override
public void notifySuccess(Object observable, SessionInfo sessionInfo) {
signal.sigCreateSessionSuccess = true;
stateMachineRun();
}
@Override
public void notifyFailure(Object observable, int result) {
createSessionFailureResult = result;
signal.sigCreateSessionFailure = true;
stateMachineRun();
}
}
/**
* 创建1V1语音会话
* @param dstId: 对方id
*/
public void create1V1Voice(String dstId, OnResultListener observer) {
if (stateMachine != S_FREE) {
return;
}
this.observer = observer;
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.type = SESSION_TYPE_1V1_VOICE;
sessionInfo.members.add(LoginState.getInstance().getUserId());
sessionInfo.members.add(dstId);
this.dstId = dstId;
signal.sigStart = true;
stateMachineRun();
}
/**
* 创建1V1视频会话
* @param dstId: 对方id
*/
public void create1V1Video(String dstId, OnResultListener observer) {
if (stateMachine != S_FREE) {
return;
}
this.observer = observer;
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.type = SESSION_TYPE_1V1_VIDEO;
sessionInfo.members.add(LoginState.getInstance().getUserId());
sessionInfo.members.add(dstId);
this.dstId = dstId;
signal.sigStart = true;
stateMachineRun();
}
/**
* 创建1VN语音会话
* @param groupId: 群组id
* @param audioChannel: 语音通道
* @param members: 成员列表
*/
public void create1VN(String groupId, String audioChannel, ArrayList<String> members, OnResultListener observer) {
if (stateMachine != S_FREE) {
return;
}
this.observer = observer;
sessionInfo.init();
sessionInfo.owner = LoginState.getInstance().getUserId();
sessionInfo.groupId = groupId;
sessionInfo.audioChannel = audioChannel;
sessionInfo.type = SESSION_TYPE_1VN;
sessionInfo.members.addAll(members);
signal.sigStart = true;
stateMachineRun();
}
/**
* 是否忙碌
* @return true:忙碌.false:空闲
*/
public boolean isBusy() {
return stateMachine == S_FREE;
}
class Signal {
boolean sigStart = false;
boolean sigGetSessionServerAddressSuccess = false;
boolean sigGetSessionServerAddressFailure = false;
boolean sigTcpMakeConnectSuccess = false;
boolean sigTcpMakeConnectFailure = false;
boolean sigCreateSessionSuccess = false;
boolean sigCreateSessionFailure = false;
void init() {
sigStart = false;
sigGetSessionServerAddressSuccess = false;
sigGetSessionServerAddressFailure = false;
sigTcpMakeConnectSuccess = false;
sigTcpMakeConnectFailure = false;
sigCreateSessionSuccess = false;
sigCreateSessionFailure = false;
}
}
public interface OnResultListener {
// 结果
int SUCCESS = 0;
int GROUP_NOT_EXIST = 1;
int NOT_OWNER = 2;
int SESSION_MEMBERS_TOO_MANY = 3;
int TCP_DISCONNECT = 4;
int CREATE_SESSION_TIMEOUT = 5;
int GET_SESSION_SERVER_ADDRESS_TIMEOUT = 6;
int WAIT_TCP_CONNECT_TIMEOUT = 7;
/**
* 创建会话成功
* @param observable: 被观察者
* @param sessionInfo: 会话信息
*/
void notifySuccess(Object observable, SessionInfo sessionInfo);
/**
* 创建会话失败
* @param observable: 被观察者
* @param result: 结果
*/
void notifyFailure(Object observable, int result);
}
}