前情回顾
《Tcp网络通讯详解》快速跳转
《Tcp网络通讯详解二(解决分包粘包)》 快速跳转
发现的问题
在《Tcp网络通讯详解二(解决分包粘包)》中我们解决了分包和粘包的问题,解决问题的方式是 消息体前加上消息长度的标识,每次读取消息时先读取长度标识,然后再判断我们接收的消息是不是完整的,不完整的消息我们等待下一次消息信号再处理,从而解决了分包和粘包的问题。
但是我们在Tcp消息时用到了一个临时缓存空间readBuff,这个缓存空间需要我们定义一个大小,上篇文章中我们定义的缓存空间的大小BUFFER_SIZE为1024。
那这样就埋下了一个问题炸弹,当我们的单条消息传输大小大于BUFFER_SIZE,这个时候就会引爆这个炸弹,我们读取到这条消息的长度标识,假如说这个标识是2000,发现我们的readBuff中的缓存数据小于2000,这个时候我们默认这条消息不完整,然后等待下次一块处理,但是发现我们的缓存区已经用完了,就永远也无法再接收到余下的信息,造成的结果就是Socket彻底堵塞瘫痪。
那能不能把BUFFER_SIZE给调大调成特别特别大,首先肯定的告诉你,可以这样干,但是没人愿意这么干,因为绝大部分短消息的时候都用不了这么大的缓存空间,就会造成极大的内存浪费。
总结:临时缓存区,取了一个最大内存,其实绝大数时间内存是浪费的,而且后期一旦有更大的消息传输可能还需要扩容缓存区大小,造成更大的内存浪费。
改进
既然发现了问题,那么就要尝试着去修正,问题出在readBuff内存爆炸,那么我们的解决方案就在readBuff上下手,我们可以让readBuff空间不存储任何接到的缓存,接到缓存就丢给具体的消息处理对象,这样我们就算把BUFFER_SIZE定义为1每次只接收1字节的消息都不会造成拥堵,当然1是太夸张了,一条消息接收几百次也不是我们所希望的。但是定义一个256或512还是可以的。
实现
实现SocketClient 如下:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using UnityEngine;
namespace S
{
/// <summary>
/// Socket客户端
/// </summary>
public class SocketClient : ISocketClient
{
protected ReceiveDataHandler receiveDataHandler;
protected Socket socket;
protected SocketType socketType = SocketType.Stream;
protected AddressFamily addressFamily = AddressFamily.InterNetwork;
protected ProtocolType protocolType = ProtocolType.Tcp;
/// <summary>
/// 接收到的服务器消息队列
/// </summary>
protected Queue<byte[]> msgQueue;
/// <summary>
/// 开始进行连接
/// </summary>
/// <param name="ip">服务器ip地址</param>
/// <param name="port">服务器端口</param>
public void BeginConnect(string ip, int port)
{
if (IsConnected())
{
Debug.LogWarning("当前客户端已经连接服务器,禁止继续连接!");
return;
}
socket = new Socket(addressFamily, socketType, protocolType);
IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse(ip), port);
socket.BeginConnect(endPoint, OnConnect, socket);
}
/// <summary>
/// 服务器连接成功
/// </summary>
/// <param name="ar"></param>
public virtual void OnConnect(IAsyncResult ar)
{
try
{
socket.EndConnect(ar);
msgQueue = new Queue<byte[]>();
receiveDataHandler = new ReceiveDataHandler(this);
socket.BeginReceive(receiveDataHandler.Buffer, receiveDataHandler.CurIndex, receiveDataHandler.EmptySize,
SocketFlags.None,
OnReceive, socket);
}
catch (Exception e)
{
Debug.LogError($"Client 连接服务器失败,Error:{e}");
}
}
/// <summary>
/// 接收到服务器消息
/// </summary>
/// <param name="ar"></param>
public virtual void OnReceive(IAsyncResult ar)
{
int count = socket.EndReceive(ar);
receiveDataHandler.OnReceive(count);
socket.BeginReceive(receiveDataHandler.Buffer, receiveDataHandler.CurIndex,
receiveDataHandler.EmptySize, SocketFlags.None,
OnReceive, socket);
}
/// <summary>
/// 向服务器发送字符串
/// </summary>
/// <param name="msg">字符串消息</param>
public virtual void Send(string msg)
{
byte[] datas = Encoding.UTF8.GetBytes(msg);
Send(datas);
}
/// <summary>
/// 向服务器发送字符串(异步)
/// </summary>
/// <param name="msg">字符串消息</param>
/// <param name="callback">发送结束的回调</param>
public virtual void SendAsync(string msg,AsyncCallback callback)
{
byte[] datas = Encoding.UTF8.GetBytes(msg);
SendAsync(datas,callback);
}
/// <summary>
/// 向服务器发送字节流
/// </summary>
/// <param name="datas">字节流消息</param>
public virtual void Send(byte[] datas)
{
if (!IsConnected()) return;
int dataLen = datas.Length;
//消息头
byte[] headDatas = BitConverter.GetBytes(dataLen);
//组合消息头和消息体的完整消息
byte[] fullDatas = headDatas.Concat(datas).ToArray();
socket.Send(fullDatas);
}
/// <summary>
/// 向服务器发送字节流(异步)
/// </summary>
/// <param name="datas">字节流消息</param>
/// <param name="callback">发送结束的回调</param>
public virtual void SendAsync(byte[] datas,AsyncCallback callback)
{
if (!IsConnected()) return;
int dataLen = datas.Length;
//消息头
byte[] headDatas = BitConverter.GetBytes(dataLen);
//组合消息头和消息体的完整消息
byte[] fullDatas = headDatas.Concat(datas).ToArray();
socket.BeginSend(fullDatas, 0, fullDatas.Length, SocketFlags.None, callback, socket);
}
/// <summary>
/// 是否已经连接
/// </summary>
/// <returns></returns>
public bool IsConnected()
{
return socket!=null&&socket.Connected;
}
/// <summary>
/// 关闭连接
/// </summary>
public void CloseConnect()
{
if (!IsConnected()) return;
socket.Shutdown(SocketShutdown.Both);
socket.Close();
socket.Dispose();
socket = null;
if (receiveDataHandler!=null)
{
receiveDataHandler.Dispose();
receiveDataHandler = null;
}
}
/// <summary>
/// 压入一个消息
/// </summary>
/// <param name="datas">消息字节流</param>
public void EnqueueMsg(byte[] datas)
{
if (msgQueue == null) return;
msgQueue.Enqueue(datas);
}
/// <summary>
/// 取出一个消息
/// </summary>
/// <returns>消息字节流</returns>
public byte[] DequeueMsg()
{
if (msgQueue == null) return null;
return msgQueue.Dequeue();
}
/// <summary>
/// 未处理的消息数量
/// </summary>
/// <returns></returns>
public int GetMsgCount()
{
return msgQueue==null?0:msgQueue.Count;
}
/// <summary>
/// 是否存在未处理的消息
/// </summary>
/// <returns></returns>
public bool HasMsg()
{
return GetMsgCount() > 0;
}
}
}
可以看到我们所有的缓存数据的处理都是通过ReceiveDataHandler 对象来的,那么下面来实现ReceiveDataHandler
using System;
namespace S
{
/// <summary>
/// 接收到服务器数据的处理器
/// </summary>
public class ReceiveDataHandler
{
private SocketClient client;
/// <summary>
/// 接收数据的缓存区
/// </summary>
public byte[] Buffer { get; private set; }
/// <summary>
/// 缓存区总大小
/// </summary>
public const int TotalSize = 256;
/// <summary>
/// 缓存区中待处理数据的起始位置
/// </summary>
public int CurIndex { get; private set; }
/// <summary>
/// 缓存区中空置的大小
/// </summary>
public int EmptySize => TotalSize - CurIndex;
/// <summary>
/// 消息体缓存区
/// </summary>
private MsgBuffer m_MsgBuffer;
/// <summary>
/// 消息头缓存区
/// </summary>
private HeadBuffer m_HeadBuffer;
/// <summary>
/// 待处理数据大小
/// </summary>
private int WaitingSize = 0;
public ReceiveDataHandler(SocketClient client)
{
this.client = client;
Buffer = new byte[TotalSize];
m_HeadBuffer = new HeadBuffer(sizeof(Int32));
m_MsgBuffer = new MsgBuffer(0);
CurIndex = 0;
WaitingSize = 0;
}
/// <summary>
/// 接收到服务器数据
/// </summary>
/// <param name="Size">数据大小</param>
public void OnReceive(int Size)
{
WaitingSize += Size;
while (WaitingSize > 0)
{
if (!m_HeadBuffer.IsDone)
{
AddSize(m_HeadBuffer.AddBuffer(Buffer, CurIndex, WaitingSize));
}
if (m_HeadBuffer.IsDone)
{
if (m_MsgBuffer.IsEmpty) m_MsgBuffer.Reset(m_HeadBuffer.Value);
AddSize(m_MsgBuffer.AddBuffer(Buffer, CurIndex, WaitingSize));
if (m_MsgBuffer.IsDone) //接收到一条完整的消息
{
client.EnqueueMsg(m_MsgBuffer.Datas);
m_MsgBuffer.Clear();
m_HeadBuffer.Clear();
}
}
}
}
/// <summary>
/// 缓存区当前索引移位
/// </summary>
/// <param name="realSize">移位量级</param>
void AddSize(int realSize)
{
CurIndex += realSize;
WaitingSize -= realSize;
if (CurIndex == TotalSize) CurIndex = 0;
}
/// <summary>
/// 释放当前对象
/// </summary>
public void Dispose()
{
client = null;
Buffer = null;
CurIndex = 0;
WaitingSize = 0;
if (m_HeadBuffer!=null)
{
m_HeadBuffer.Dispose();
m_HeadBuffer = null;
}
if (m_MsgBuffer!=null)
{
m_MsgBuffer.Dispose();
m_MsgBuffer = null;
}
}
}
}
定义Buffer基础类,这个对象主要用来接收一条完整的消息,能判断消息是否完整,然后再定义消息头缓存区和消息体缓存区都分别继承自Buffer,实现如下
using System;
namespace S
{
/// <summary>
/// 数据缓存区
/// </summary>
public class Buffer
{
/// <summary>
/// 缓存区数据
/// </summary>
public byte[] Datas { get; protected set; }
/// <summary>
/// 缓存区总大小
/// </summary>
public int TotalSize { get; private set; }
/// <summary>
/// 缓存区已经写入的大小
/// </summary>
public int CurSize { get; private set; }
/// <summary>
/// 剩余未使用的缓存区大小
/// </summary>
public int EmptySize => TotalSize - CurSize;
/// <summary>
/// 缓存区是否使用完毕
/// </summary>
public bool IsDone => CurSize == TotalSize;
/// <summary>
/// 缓存区是不是还未使用
/// </summary>
public bool IsEmpty => CurSize == 0;
/// <summary>
/// 构造缓存区
/// </summary>
/// <param name="TotalSize">缓存区大小</param>
public Buffer(int TotalSize)
{
this.TotalSize = TotalSize;
CurSize = 0;
Datas = new byte[TotalSize];
}
/// <summary>
/// 增加缓存数据
/// </summary>
/// <param name="datas">缓存数据</param>
/// <param name="startIndex">缓存数据的起始位置</param>
/// <param name="len">缓存数据的长度</param>
/// <returns>返回实际写入数据的大小</returns>
public virtual int AddBuffer(byte[] datas, int startIndex, int len)
{
if (IsDone) return 0;
len = EmptySize >= len ? len : EmptySize;
Array.Copy(datas, startIndex, Datas, CurSize, len);
CurSize += len;
return len;
}
/// <summary>
/// 清除缓存区数据
/// </summary>
public virtual void Clear()
{
Datas = null;
Datas = new byte[TotalSize];
CurSize = 0;
}
/// <summary>
/// 重新设置缓存区
/// </summary>
/// <param name="TotalSize">缓存区大小</param>
public void Reset(int TotalSize)
{
this.TotalSize = TotalSize;
Clear();
}
/// <summary>
/// 释放当前缓存区
/// </summary>
public virtual void Dispose()
{
Datas = null;
CurSize = 0;
TotalSize = 0;
}
}
/// <summary>
/// 消息头缓存区
/// </summary>
public class HeadBuffer : Buffer
{
public int Value { get; private set; }
public HeadBuffer(int TotalSize) : base(TotalSize)
{
}
public override int AddBuffer(byte[] datas, int startIndex, int len)
{
int realLen = base.AddBuffer(datas, startIndex, len);
if (IsDone) Value = BitConverter.ToInt32(Datas, 0);
return realLen;
}
public override void Clear()
{
base.Clear();
Value = 0;
}
}
/// <summary>
/// 消息体缓存区
/// </summary>
public class MsgBuffer : Buffer
{
public MsgBuffer(int TotalSize) : base(TotalSize)
{
}
}
}
到这里新改进的客户端的Tcp就实现了。