【Socket客户端封装类】 及 【解决粘包和分包问题的Message封装类】

Socket通信中解决粘包和分包问题的Message封装类

发送时:
先发报头长度
再编码报头内容然后发送
最后发真实内容
接收时:
先收报头长度
根据取出的长度收取报头内容,然后解码,反序列化

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace MyChatServer.Server
{
    /// <summary>
    /// 消息发送封装类
    /// </summary>
    public class Message
    {
        //包含你所有需要传递的数据所转换的字节
        private byte[] buffer = null;
        private int _type = -1;

        //当前的字节位置
        private int _currentPos = 0;

        //开始读的字节位置
        private int _readPos = 0;

        //指定类型
        public Message(int __type)
        {
            this._type = __type;
            buffer = new byte[30];
            putInt(__type);
        }

        public Message()
        {
            buffer = new byte[1024 * 100];
        }

        //写入字节
        private void bytesWirteBytes(byte[] bs)
        {
            for (int i = 0; i < bs.Length; i++)
            {
                if (_currentPos >= buffer.Length)
                {
                    byte[] newBuffer = new byte[buffer.Length * 2];
                    Array.Copy(buffer, newBuffer, buffer.Length);
                    buffer = newBuffer;
                }
                buffer[_currentPos++] = bs[i];
            }
        }

        //Int型数据转化字节并写入
        public void putInt(int value)
        {
            this.bytesWirteBytes(System.BitConverter.GetBytes(value));
        }

        //Long型数据转化字节并写入
        public void putLong(long value)
        {
            this.bytesWirteBytes(System.BitConverter.GetBytes(value));
        }

        //string型数据转化字节并写入(string的长度和string)
        public void putString(string value)
        {
            byte[] b = Encoding.UTF8.GetBytes(value);
            this.putInt(b.Length);
            this.bytesWirteBytes(b);
        }

        //Double型数据转化字节并写入
        public void putDouble(double value)
        {
            this.bytesWirteBytes(System.BitConverter.GetBytes(value));
        }

        //Message型数据转化字节并写入
        public void putMessage(Message msg)
        {
            this.bytesWirteBytes(msg.toBytes);
        }

        //byte[]型数据转化字节并写入
        public void putBytes(byte[] bytes)
        {
            this.bytesWirteBytes(bytes);
        }

        //byte[]型数据转化字节并写入(指定开始位置长度)
        public void putBytes(byte[] bytes, int index, int len = 0)
        {
            if (index < 0 || index > bytes.Length || len < 0 || len > bytes.Length) return;
            int leng = (len == 0 ? bytes.Length : len);
            for (int i = index; i < leng + index; i++)
            {
                this.buffer[_currentPos++] = bytes[i];
            }
        }

        //设置开始读字节的位置
        public void setSeek(int _pos)
        {
            if (_pos < 0 || _pos >= buffer.Length) return;
            _readPos = _pos;
        }


        //获取Int型数据
        public int getInt()
        {
            int value = System.BitConverter.ToInt32(buffer, _readPos);
            _readPos += sizeof(int);
            return value;
        }

        //获取Long型数据
        public long getLong()
        {
            long value = System.BitConverter.ToInt64(buffer, _readPos);
            _readPos += sizeof(long);
            return value;
        }

        //获取Double型数据
        public double getDouble()
        {

            double value = System.BitConverter.ToDouble(buffer, _readPos);
            _readPos += sizeof(double);
            return value;
        }

        //获取UTF string型数据
        public string getUTFString()
        {
            int len = this.getInt();
            string str = Encoding.UTF8.GetString(buffer, _readPos, len);
            _readPos += len;

            return str;
        }

        //获取消息类型
        public int type
        {
            get
            {
                if (_type > 0) return _type;
                _readPos = 0;
                _type = this.getInt();
                return _type;
            }
        }

        //获取当前消息中的所有字节(size长度)
        public byte[] toBytes
        {
            get
            {
                byte[] arr = new byte[this.Size];
                Array.Copy(buffer, arr, this.Size);
                return arr;
            }
        }

        //获取字节数组的大小
        public int Size
        {
            get { return this._currentPos; }
        }

        //buffer存储当前需要发送的消息->int size->byte[]
        public byte[] byteSize
        {
            get
            {
                byte[] _size = System.BitConverter.GetBytes(this.Size);
                return _size;
            }
        }

        //重置
        public void reset()
        {
            Array.Clear(this.buffer, 0, this._currentPos);
            this._currentPos = 0;
            this._readPos = 0;
            this._type = -1;
        }
    }
}

CSocket:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Windows.Forms;
using System.Collections.Concurrent;


/*
 * 网络客户端连接组件
 * 
 * 
 */
namespace NET

{

    /*
     *网络连接类
     */
    class CSocket
    {
        //定义套接字对象
        private Socket client = null;
        //判断当前是否正在连接,一个客户端,只开一个连接
        private bool isConnection = false;
        public bool IsConnection { get { return isConnection; } }
        //连接对象
        private static CSocket _csocket = null;
        //消息处理列队
        private MessageQueue messageQueue = null;
        Thread thread1;
        private CSocket()
        {
            //连接TCP连接协议  --------------4IP网络 ------------流字节传送 ----TCP协议
            client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);   
        }
        /*
         * 开始连接到网络
         * _ip:服务器ip地址
         * _port:服务器开放连接端口
         */
        public void connectServer(string _ip, int _port)
        {
            try
            {
               // _ip = "192.168.3.206";
               // _port = 8888;
                //如果当前正在连接中,则不需要再次连接,如在不同的场景中,防止同一客户端,多次连接,造成数据不同步。
                if (this.isConnection==true) return;                
                //字符串IP解析
                IPAddress ip = IPAddress.Parse(_ip);
                //开始正真连接到网络指定的IP服务器中
                client.Connect(new IPEndPoint(ip, _port));//连接中....
                //设置连接标志,防止未断线情况下多次连接
                this.isConnection = true;
                //启动消息接受子线程
                this.start();
                MessageBox.Show("连接成功");
            }
            catch (SocketException e)
            {
                //连接出错时,关闭网络
                this.isConnection = false;
                Console.WriteLine("连接到服务器出错!");
                MessageBox.Show(e.ToString());
                this.Close();
            }
        }
        //创建连接单列对象,可以不同场景中自由调用
        public static CSocket shareCSocket()
        {
            if (_csocket == null) _csocket = new CSocket();
            return _csocket;
        }
        //启动线程
        public void start()
        {
            if (thread1!=null) return;
            //创建线程
            thread1= new Thread(new ThreadStart(reciveMessage));
            thread1.Start();
            thread1.IsBackground = true;
        }
        //发送消息到服务器
        //msg:要发送的消息对象
        //发送协议:消息长度(int) - 消息头(int) - 消息主体(bytes)
        public void sendMessage(Message1 msg)
        {
            if (this.isConnection)
            {
                //先发送消息长度
                this.client.Send(msg.byteSize,0,msg.byteSize.Length,0);
                //再发送消息主体
                this.client.Send(msg.toBytes,0,msg.Size,0);
            }
        }
        //消息接收
        public void reciveMessage()
        {
            try
            {
                //每次最多只能接受100K的内容,如果是文件或是视频传输,请更改字节大小
                byte[] bytes = new byte[1024 * 100];
                //每次从TCP协议中读取出来的消息标志,>0,成功,0,-1:失败或服务器异常
                int read = 0;
                //每次读取出来的消息写入到字节数组中的位置
                int readPos = 0;
                //保存消息头是否读完
                int headLeng = 0;
                //定义读取超时
                client.ReceiveTimeout = 1000 * 60 * 10;
                //创建消息线程
                messageQueue = MessageQueue.shareMessageQueue();
                while (this.isConnection)
                {
                    //如果消息头还没有读完,则一直读取消息头,或是下一条消息头
                    if (headLeng == 0)
                    {
                        //一个一个字节读,防止网络丢包和粘包问题
                        read = client.Receive(bytes, readPos, 1, 0);
                        //消息一个一个读,每次都往字节后写一个字节内容
                        readPos++;
                        if (read <= 0) break;//如果远程服务器中断
                        if (readPos < 4) continue;//如果协议头还没有读完 - 4个字节
                        //读协议头
                        headLeng = System.BitConverter.ToInt32(bytes, 0);
                    }
                    else
                    {
                        //如果得到一条完整的消息
                        if (readPos - 4 >= headLeng)
                        {
                            //创建一条消息
                            Message1 msg = new Message1();
                            //将一条完整的消息存入到Message对象中,去掉前面4个字节的长度信息。
                            msg.putBytes(bytes, 4, readPos);
                            //将消息加入到消息队列
                            messageQueue.putMessage(msg);
                        //    MessageBox.Show("消息");
                            //准备下一次读                       
                            readPos = 0;
                            //此处将头清0,便可读取下一条消息
                            headLeng = 0;
                            //将字节清0,此方法速度高于new byte[],且节约内存
                            Array.Clear(bytes, 0, bytes.Length);
                        }
                        else
                        {
                            //如果协议头还没有读完
                            headLeng = 0;
                        }
                    }
                }
                this.Close();
            }
            catch (Exception e){
                MessageBox.Show("服务器已断开!");
            }finally{
                this.Close();
            }
        }
        public void Close()
        {
            thread1.Abort();
            isConnection = false;
           // client.Close();
            //Console.WriteLine("连接关闭");
           // MethodDelegate.shareMethodDelegate().removeAllMethods();
            client.Dispose();
            client = null;
            _csocket = null;
        }
    }
    public delegate void callBack(Message1 msg);
    /*
     * 消息委托处理类
     */
    public class MethodDelegate
    {
        private static MethodDelegate md = null;
        private Dictionary<int, callBack> methods = null;
        private Dictionary<int, Form> forms = null;
        private Form formRun;
        private MethodDelegate()
        {
            methods = new Dictionary<int, callBack>();
            forms = new Dictionary<int, Form>();
        }
        public static MethodDelegate shareMethodDelegate()
        {
            if (md == null) md = new MethodDelegate();
            return md;
        }
        public void addMethod(int type, callBack callFun, Form form = null)
        {
            if (this.methods.ContainsKey(type))
            {
                this.methods.Remove(type);
            }
            this.methods.Add(type, callFun);
            //窗口线程 - 子网络接收子线程不能更新UI界面,所以必须要有窗体反射执行。
            if (this.forms.ContainsKey(type))
            {
                this.forms.Remove(type);
            }
            this.forms.Add(type, form);
        }
        public void removeMethod(int type)
        {
            if (this.methods.ContainsKey(type))
            {
                this.methods.Remove(type);
            }
            if (this.forms.ContainsKey(type))
            {
                this.forms.Remove(type);
            }
        }
        public void Run(int type, Message1 msg)
        {
           // MessageBox.Show(type.ToString());
            if (!this.methods.ContainsKey(type)) return;
            callBack fun = this.methods[type];

            Form f = this.forms[type];
            if (f != null)
            {
                f.Invoke(fun, msg);
                return;
            }
            fun(msg);
        }
        public bool isClose = false;
        public void removeAllMethods()
        {
            this.methods.Clear();
            formRun = null;
            isClose = true;
            MessageQueue.shareMessageQueue().th.Abort();
        }
    }

    /*
     * 消息处理队列
     */
    class MessageQueue
    {
        //private Queue<Message> messageQueue = null;
        private ConcurrentQueue<Message1> messageQueue = null;
        private MethodDelegate methodRun = null;
        private static bool isSet = true;
        private static MessageQueue mqeue = null;
        private static AutoResetEvent evt = new AutoResetEvent(false); 
        public static MessageQueue shareMessageQueue()
        {
            if (mqeue == null) mqeue = new MessageQueue();
            return mqeue;
        } public Thread th;
        private MessageQueue() {
            messageQueue = new ConcurrentQueue<Message1>();
            methodRun = MethodDelegate.shareMethodDelegate();
            //创建一条处理消息的线程
            th= new Thread(runMessage);
           th.Start();
          th.IsBackground = true;
        }

        public void putMessage(Message1 message)
        {
            messageQueue.Enqueue(message);
           // MessageBox.Show("新消息");
            if (isSet)
            {
                isSet = false;
                evt.Set();                
            }
        }

        private void runMessage()
        {
            bool f = false;
            Message1 message = null;
            while (true)
            {
                f = messageQueue.TryDequeue(out message);
                if (f)
                {
                    //执行方法
                 //   MessageBox.Show("执行代码");
                    methodRun.Run(message.type,message);
                    //把消息通知到U3D线程

                }
                if (messageQueue.Count == 0)
                {
                    isSet = true;
                    //队列没有消息了,就等待...,直以有消息进来,继续执行
                   evt.WaitOne();
                }
            }
        }
    }

    /*
     *  U3D 消息映射类 
     */
    class MessageCallBack 
    {
    }
}

猜你喜欢

转载自blog.csdn.net/qq_34937637/article/details/81270391