先上一下代码,再简单解说一下
package org.tio.core.task;
import java.nio.ByteBuffer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Aio;
import org.tio.core.ChannelAction;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.PacketHandlerMode;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.AioListener;
import org.tio.core.intf.Packet;
import org.tio.core.stat.ChannelStat;
import org.tio.core.stat.IpStat;
import org.tio.core.utils.ByteBufferUtils;
import org.tio.utils.SystemTimer;
/**
* 解码任务对象,一个连接对应一个本对象
*
* @author 谭耀武
* 2012-08-09
*/
public class DecodeRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DecodeRunnable.class);
/**
*
* @param channelContext
* @param packet
* @param byteCount
* @author tanyaowu
*/
public static void handler(ChannelContext channelContext, Packet packet, int byteCount) {
GroupContext groupContext = channelContext.getGroupContext();
PacketHandlerMode packetHandlerMode = groupContext.getPacketHandlerMode();
HandlerRunnable handlerRunnable = channelContext.getHandlerRunnable();
if (packetHandlerMode == PacketHandlerMode.QUEUE) {
handlerRunnable.addMsg(packet);
groupContext.getTioExecutor().execute(handlerRunnable);
} else {
handlerRunnable.handler(packet);
}
}
private ChannelContext channelContext = null;
/**
* 上一次解码剩下的数据
*/
private ByteBuffer lastByteBuffer = null;
/**
* 新收到的数据
*/
private ByteBuffer newByteBuffer = null;
/**
*
*/
public DecodeRunnable(ChannelContext channelContext) {
this.channelContext = channelContext;
}
/**
* 清空处理的队列消息
*/
public void clearMsgQueue() {
lastByteBuffer = null;
newByteBuffer = null;
}
/**
* @see java.lang.Runnable#run()
*
* @author tanyaowu
* 2017年3月21日 下午4:26:39
*
*/
@Override
public void run() {
ByteBuffer byteBuffer = newByteBuffer;
if (byteBuffer != null) {
if (lastByteBuffer != null) {
byteBuffer = ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
lastByteBuffer = null;
}
} else {
return;
}
label_2: while (true) {
try {
int initPosition = byteBuffer.position();
GroupContext groupContext = channelContext.getGroupContext();
Packet packet = null;
Integer packetNeededLength = channelContext.getPacketNeededLength();
if (packetNeededLength != null) {
log.info("{}, 解码所需长度:{}", channelContext, packetNeededLength);
int readableLength = byteBuffer.limit() - initPosition;
if (readableLength >= packetNeededLength) {
packet = groupContext.getAioHandler().decode(byteBuffer, channelContext);
}
} else {
packet = groupContext.getAioHandler().decode(byteBuffer, channelContext);
}
if (packet == null)// 数据不够,解不了码
{
lastByteBuffer = ByteBufferUtils.copy(byteBuffer, initPosition, byteBuffer.limit());
ChannelStat channelStat = channelContext.getStat();
int decodeFailCount = channelStat.getDecodeFailCount() + 1;
channelStat.setDecodeFailCount(decodeFailCount);
int len = byteBuffer.limit() - initPosition;
log.info("{} 解码失败, 本次共失败{}次,参与解码的数据长度共{}字节", channelContext, decodeFailCount, len);
if (decodeFailCount > 5) {
if (packetNeededLength == null) {
log.warn("{} 解码失败, 本次共失败{}次,参与解码的数据长度共{}字节,请考虑要不要拉黑这个ip", channelContext, decodeFailCount, len);
}
}
return;
} else //解码成功
{
channelContext.setPacketNeededLength(null);
channelContext.getStat().setLatestTimeOfReceivedPacket(SystemTimer.currentTimeMillis());
ChannelStat channelStat = channelContext.getStat();
channelStat.setDecodeFailCount(0);
int afterDecodePosition = byteBuffer.position();
int len = afterDecodePosition - initPosition;
channelContext.getGroupContext().getGroupStat().getReceivedPackets().incrementAndGet();
channelContext.getStat().getReceivedPackets().incrementAndGet();
List<Long> list = groupContext.ipStats.durationList;
for (Long v : list) {
IpStat ipStat = groupContext.ipStats.get(v, channelContext.getClientNode().getIp());
ipStat.getReceivedPackets().incrementAndGet();
}
channelContext.traceClient(ChannelAction.RECEIVED, packet, null);
packet.setByteCount(len);
AioListener aioListener = channelContext.getGroupContext().getAioListener();
try {
if (log.isDebugEnabled()) {
log.debug("{} 收到消息 {}", channelContext, packet.logstr());
}
aioListener.onAfterReceived(channelContext, packet, len);
} catch (Throwable e) {
log.error(e.toString(), e);
}
if (log.isDebugEnabled()) {
log.debug("{}, 解包获得一个packet:{}", channelContext, packet.logstr());
}
handler(channelContext, packet, len);
int remainingLength = byteBuffer.limit() - byteBuffer.position();
if (remainingLength > 0)//组包后,还剩有数据
{
if (log.isDebugEnabled()) {
log.debug("{},组包后,还剩有数据:{}", channelContext, remainingLength);
}
continue label_2;
} else//组包后,数据刚好用完
{
lastByteBuffer = null;
log.debug("{},组包后,数据刚好用完", channelContext);
return;
}
}
} catch (Throwable e) {
channelContext.setPacketNeededLength(null);
log.error(channelContext + ", " + byteBuffer + ", 解码异常:" + e.toString(), e);
Aio.close(channelContext, e, "解码异常:" + e.getMessage());
if (e instanceof AioDecodeException) {
GroupContext groupContext = channelContext.getGroupContext();
List<Long> list = groupContext.ipStats.durationList;
for (Long v : list) {
IpStat ipStat = groupContext.ipStats.get(v, channelContext.getClientNode().getIp());
ipStat.getDecodeErrorCount().incrementAndGet();
}
}
return;
}
}
}
/**
* @param newByteBuffer the newByteBuffer to set
*/
public void setNewByteBuffer(ByteBuffer newByteBuffer) {
this.newByteBuffer = newByteBuffer;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + ":" + channelContext.toString();
}
}
对于半包
业务端需要在AioHandler.decode()里返回一个null对象给框架,框架拿到null后,就会认为这是个半包,进而把收到的数据暂存到DecodeRunnable.lastByteBuffer,当后面再收到数据时,把DecodeRunnable.lastByteBuffer和新收到的数据组成一个新的bytebuffer给业务端,如此循环,直到业务端能组成一个packet对象给框架层。
对于粘包
业务端在AioHandler.decode()方法中,解码一个packet对象返回给框架后,框架会判断是否有多余的byte没有被处理,如果有,则拿剩下的byte(bytebuffer)让业务端继续解码,直到业务端返回null或是返回packet但没有剩余byte为止。
小结
框架层已经做好半包和粘包的工作,业务层只需要按着业务协议解码即可,框架会处理好剩下的byte或是上次没处理完的byte的。
不好意思,又是一篇不太长的博客,希望对大家有帮助!