• Unity Protobuf+RPC+UniTask


    什么是RPC协议

    远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。

    RPC的基本原理

    RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:

    1.客户端调用:客户端程序发起对某个远程过程的调用请求。
    2.请求打包:调用参数被打包成消息,发送到服务器。
    3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。
    4.结果打包和返回:执行结果被打包成消息,发送回客户端。
    5.客户端接收结果:客户端解包消息,获取调用结果

    RPC的关键组件

    1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。
    2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。
    3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。
    4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。

    RPC的优缺点

    优点
    1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。
    2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。

    缺点
    1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。
    2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。
    3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。

    Protobuf

    protoc.exe 生成C#文件
    在这里插入图片描述

    Gen.bat

    @echo off
    
    rem 设置路径变量
    set PROTOC_PATH="protoc.exe"
    set PROTO_DIR="Protos"
    set OUTPUT_DIR="ProtocolCodes"
    
    rem 创建日志头
    echo .......................proto2C#.......................
    echo.
    
    rem 检查目录是否存在
    if not exist %PROTO_DIR% (
        echo Error: Protocols directory does not exist.
        echo Please create the Protocols directory and place your .proto files in it.
        echo.
        pause
        exit /b
    )
    
    rem 创建输出目录
    if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%
    
    rem 批量处理 .proto 文件
    for %%f in (%PROTO_DIR%\*.proto) do (
        echo %%f complete
        %PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f
    )
    
    echo code generation complete. Press any key to close.
    pause > nul
    
    

    函数绑定

    1.使用反射自动获取所有RPC函数, 对其进行Hash绑定

    函数的定义 RPCMsgHandles.cs

    public sealed class RPCMsgHandles
    {
        private static void ReqMove(int unitId, Move move)
        {
    
        }
    
        private static void RecvAttack(int skillid, Attack attack, ItemList itemList)
        {
            LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}");
        }
    
        private static void RecvDelete(int msg)
        {
            LogHelper.Log($"Recv: state = {msg}");
        }
    
        private static void RecvReflectMove(Move move)
        {
            LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}");
        }
    }
    

    使用反射进行函数绑定 RPCMoudle.cs

    public sealed class RPCMoudle
    {
        private static Dictionary<int, IRPC> _msg = new Dictionary<int, IRPC>();
    
        public static void Init()
        {
            System.Type type = typeof(RPCMsgHandles);
            MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);
            foreach (MethodInfo methodInfo in methods)
            {
                RPC method = new RPC(methodInfo);
                int index = 0;
                ParameterInfo[] infos = methodInfo.GetParameters();
                foreach (var info in infos)
                {
                    if (typeof(IMessage).IsAssignableFrom(info.ParameterType))
                    {
                        IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage;
                        method.AddParamType(DateType.Message);
                        method.AddParam(index, message);
                    }
                    else
                    {
                        DateType dateType = GetDateType(info.ParameterType);
                        method.AddParamType(dateType);
                    }
    
                    index++;
                }
    
                int hash = Globals.Hash(methodInfo.Name);
                if (_msg.ContainsKey(hash))
                    throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name);
    
                _msg.Add(hash, method);
            }
        }
    }
    

    2.使用泛型手动进行RPC函数绑定

    泛型类进行函数绑定 RPCMoudle.cs

    public static void Register<T>(string methodName, Action<T> action) where T : class, IMessage, new()
    {
        int id = Globals.Hash(methodName);
        RPCStatic<T> method = new RPCStatic<T>();
        method.Register(action, new T());
    
        if (_msg.ContainsKey(id))
        {
            LogHelper.LogError($"repeat id, id = {id}");
        }
    
        _msg[id] = method;
    }
    
    public static void Unregister(string methodName)
    {
        int id = Globals.Hash(methodName);
        if (_msg.ContainsKey(id))
        {
            _msg.Remove(id);
        }
        else
        {
            LogHelper.LogError($"no find method, id = {id}");
        }
    }
    

    Call

    Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid

    Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包

    Call函数有多个方法重载, 根据业务需求使用
    1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定
    2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷

    具体调用例子
    Move move = new Move();
    move.X = 10;
    move.Y = 20;
    move.Speed = 100;
    move.Dir = 20;
    
    这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
    使用起来简单方便, 业务逻辑开发上使用较为方便
    比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id);
    比如请求保存勾选 RPCMoudle.Call("Save", true);
    RPCMoudle.Call("ReqMove", 10016, move);
    
    这样是类型安全的, 也不会存在装箱拆箱的开销
    更加高效, 战斗场景较为适合
    RPCMoudle.Call("ReqMove", move);
    

    Call的实现, 将数据进行Encode转换成二进制

    public static void Call(string methodName, IMessage message)
    {
        if (message == null) 
            return;
    
        try
        {
    
            int id = Globals.Hash(methodName);
            int offset = 0;
            BuffMessage msg = GameFrame.message.GetBuffMessage();
            BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
            offset += sizeof(int);
    
            BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
            BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);
    
            msg.length = offset;
            Main.Instance.Send(msg);
        }
        catch(Exception ex)
        {
            LogHelper.LogError(ex.ToString());
        }
    }
    
    public static void Call(string id, params object[] args)
    {
        try
        {
            Profiler.BeginSample("rpc call");
            int hash = Globals.Hash(id);
            BuffMessage msg = Encode(hash, args);
            Main.Instance.Send(msg);
            Profiler.EndSample();
        }
        catch(Exception ex)
        {
            LogHelper.LogError(ex.ToString());
        }
    }
    

    Encode

    Encode函数的实现

    private static BuffMessage Encode(int id, params object[] args)
    {
      int offset = 0;
      BuffMessage msg = GameFrame.message.GetBuffMessage();
      BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);
      offset += sizeof(int);
      BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);
    
      foreach (object arg in args)
      {
          try
          {
              System.Type type = arg.GetType();
              switch (arg)
              {
                  case IMessage:
                      BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);
                      break;
                  case Int16:
                      BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);
                      break;
                  case Int32:
                      BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);
                      break;
                  case Int64:
                      BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);
                      break;
                  case UInt16:
                      BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);
                      break;
                  case UInt32:
                      BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);
                      break;
                  case UInt64:
                      BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);
                      break;
                  case bool:
                      BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);
                      break;
                  case Byte:
                      BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                      break;
                  case SByte:
                      BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);
                      break;
                  case Char:
                      BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);
                      break;
                  case Single:
                      BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);
                      break;
                  case Double:
                      BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);
                      break;
                  case string:
                      BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);
                      break;
              }
          }
          catch(Exception ex)
          {
              LogHelper.LogError($"id: {id}, " + ex.ToString());
              msg.Dispose();
              return msg;
          }
      }
    
      msg.length = offset;
      return msg;
    }
    

    0GC的TryWriteBytes方案

    namespace Game
    {
        public static class BitConverterHelper
        {
            private static readonly int BUFFER_SIZE = 1024 * 1024;
            private static readonly byte[] buffer = new byte[BUFFER_SIZE];
            private static CodedOutputStream _stream;
            private static Stopwatch _watch;
    
            public static void Init()
            {
                CreateStream();
                _watch = new Stopwatch();
                _watch.Start();
            }
    
            private static void CreateStream()
            {
                if (_stream != null)
                    _stream.Dispose();
    
                if (_watch != null)
                {
                    _watch.Stop();
                    LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s");
                    _watch.Restart();
                }
    
                _stream = new CodedOutputStream(buffer);
            }
    
            private static Span<byte> ToByteArray(IMessage message)
            {
                if (message == null)
                    return new byte[0];
    
                int length = message.CalculateSize();
                if (length == 0)
                    return new byte[0];
    
                if (length >= BUFFER_SIZE)
                {
                    throw new Exception($"overflow: message length >= {BUFFER_SIZE}");
                }
    
                if (_stream.Position + length >= BUFFER_SIZE)
                    CreateStream();
    
                int position = (int)_stream.Position;
                message.WriteTo(_stream);
                return buffer.AsSpan(position, length);
            }
    
            public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Int16;
                Check(buffer, offset + sizeof(Int16));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Int16);
            }
    
            public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Int32;
                Check(buffer, offset + sizeof(Int32));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Int32);
            }
    
            public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Int64;
                Check(buffer, offset + sizeof(Int64));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Int64);
            }
    
            public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.UInt16;
                Check(buffer, offset + sizeof(UInt16));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(UInt16);
            }
    
            public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.UInt32;
                Check(buffer, offset + sizeof(UInt32));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(UInt32);
            }
    
            public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.UInt64;
                Check(buffer, offset + sizeof(UInt64));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(UInt64);
            }
    
            public static void WriteBool(byte[] buffer, ref int offset, bool arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Boolean;
                Check(buffer, offset + sizeof(bool));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(bool);
            }
    
            public static void WriteByte(byte[] buffer, ref int offset, byte arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Byte;
                Check(buffer, offset + 1);
                buffer[offset++] = arg;
            }
    
            public static void WriteChar(byte[] buffer, ref int offset, Char arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Char;
                Check(buffer, offset + sizeof(Char));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Char);
            }
    
            public static void WriteSingle(byte[] buffer, ref int offset, Single arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Single;
                Check(buffer, offset + sizeof(Single));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Single);
            }
    
            public static void WriteDouble(byte[] buffer, ref int offset, Double arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Double;
                Check(buffer, offset + sizeof(Double));
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);
                offset += sizeof(Double);
            }
    
            public static void WriteString(byte[] buffer, ref int offset, string arg)
            {
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.String;
                byte[] bytes = Encoding.UTF8.GetBytes(arg);
                Check(buffer, offset + bytes.Length);
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
                offset += sizeof(int);
                Span<byte> target = new Span<byte>(buffer, offset, buffer.Length - offset);
                bytes.CopyTo(target);
                offset += bytes.Length;
            }
    
            public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg)
            {
                IMessage message = arg;
                Span<byte> bytes = ToByteArray(message);
                Check(buffer, offset + 1);
                buffer[offset++] = (byte)DateType.Message;
                Check(buffer, offset + bytes.Length);
                BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);
                offset += sizeof(int);
                Span<byte> target = new Span<byte>(buffer, offset, bytes.Length);
                bytes.CopyTo(target);
                offset += bytes.Length;
            }
    
            private static void Check(byte[] buffer, int offset)
            {
                if (offset >= buffer.Length)
                    throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!");
            }
    
            public static void Dispose()
            {
                _stream?.Dispose();
                _stream = null;
            }
        }
    }
    

    Recv

    Decode 数据解析,调用本地方法

    
    public static void OnRPC(BuffMessage msg)
    {
        if(msg == null)
        {
            LogHelper.LogError("socket recv error, msg == null");
            return;
        }
    
        Decode(msg.bytes);
    }
    

    Decode

    0GC的Decode方案

    private static void Decode(byte[] buffer)
    {
        if (buffer == null || buffer.Length < sizeof(int))
        {
            LogHelper.LogError("Invalid buffer received");
            return;
        }
    
        int protoId = BitConverter.ToInt32(buffer, 0);
        if (!_msg.TryGetValue(protoId, out IRPC method))
        {
            LogHelper.LogError($"Method not found for protoId: {protoId}");
            return;
        }
    
        BuffMessage buffMessage = GameFrame.message.GetBuffMessage();
        try
        {
            Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));
            method.Decode(buffMessage.bytes);
        }
        catch (Exception ex)
        {
            LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}");
        }
        finally
        {
            GameFrame.message.PutBuffMessage(buffMessage);
        }
    }
    
    namespace Game
    {
        public interface IRPC : IDisposable
        {
            public void Decode(byte[] buffer);
        }
    
        public abstract class RPCBase : IRPC
        {
            protected byte[] buffer;
            public abstract void Decode(byte[] buffer);
    
            protected ReadOnlySpan<byte> ReadData(DateType type, ref int offset)
            {
                ReadOnlySpan<byte> data = null;
                int length = GetLength(type);
                if (length > 0)
                {
                    data = new ReadOnlySpan<byte>(buffer, offset, length);
                    offset += length;
                }
    
                return data;
            }
    
            protected bool ToBoolean(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Boolean, ref offset);
                return BitConverter.ToBoolean(data);
            }
    
            protected Byte ToByte(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
                return data[0];
            }
    
            protected char ToChar(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);
                return BitConverter.ToChar(data);
            }
    
            protected Int16 ToInt16(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Int16, ref offset);
                return BitConverter.ToInt16(data);
            }
    
            protected UInt16 ToUInt16(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.UInt16, ref offset);
                return BitConverter.ToUInt16(data);
            }
    
            protected Int32 ToInt32(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Int32, ref offset);
                return BitConverter.ToInt32(data);
            }
    
            protected UInt32 ToUInt32(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.UInt32, ref offset);
                return BitConverter.ToUInt32(data);
            }
    
            protected Int64 ToInt64(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Int64, ref offset);
                return BitConverter.ToInt64(data);
            }
    
            protected UInt64 ToUInt64(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.UInt64, ref offset);
                return BitConverter.ToUInt64(data);
            }
    
            protected Single ToSingle(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Single, ref offset);
                return BitConverter.ToSingle(data);
            }
    
            protected Double ToDouble(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Double, ref offset);
                return BitConverter.ToDouble(data);
            }
    
            protected string ToString(ref int offset)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.String, ref offset);
                return DecodeString(ref offset);
            }
    
            protected IMessage ToMessage(ref int offset, IMessage message)
            {
                ReadOnlySpan<byte> data = ReadData(DateType.Message, ref offset);
                return DecodeMessage(ref offset, message);
            }
    
            private IMessage DecodeMessage(ref int offset, IMessage message)
            {
                int length = BitConverter.ToInt32(buffer, offset);
                offset += sizeof(int);
    
                ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
                offset += length;
    
                return message.Descriptor.Parser.ParseFrom(messageData)!;
            }
    
            private string DecodeString(ref int offset)
            {
                int length = BitConverter.ToInt32(buffer, offset);
                offset += sizeof(int);
    
                ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);
                offset += length;
    
                return Encoding.UTF8.GetString(messageData);
            }
    
            private static int GetLength(DateType type)
            {
                switch (type)
                {
                    case DateType.Boolean:
                        return sizeof(bool);
                    case DateType.Char:
                        return sizeof(char);
                    case DateType.SByte:
                    case DateType.Byte:
                        return sizeof(byte);
                    case DateType.Int16:
                        return sizeof(Int16);
                    case DateType.UInt16:
                        return sizeof(UInt16);
                    case DateType.Int32:
                        return sizeof(Int32);
                    case DateType.UInt32:
                        return sizeof(UInt32);
                    case DateType.Int64:
                        return sizeof(Int64);
                    case DateType.UInt64:
                        return sizeof(UInt64);
                    case DateType.Single:
                        return sizeof(Single);
                    case DateType.Double:
                        return sizeof(double);
                }
    
                return -1;
            }
    
            public virtual void Dispose()
            {
                buffer = null;
            }
        }
    }
    

    Decode数据到对象列表, 然后Invoke

    namespace Game
    {
        public class RPC : RPCBase
        {
            private MethodInfo _method;
            private List<DateType> _types;
            private List<object> _params;
            private Dictionary<int, IMessage> _param;
            private int _paramIndex;
    
            public RPC(MethodInfo method)
            {
                this._method = method;
                _types = new List<DateType>();
                _params = new List<object>();
                _param = new Dictionary<int, IMessage>();
            }
    
            public void AddParamType(DateType type)
            {
                _types?.Add(type!);
            }
    
            public void AddParam(int index, IMessage message)
            {
                _param[index] = message;
            }
    
            public override void Decode(byte[] buffer)
            {
                base.buffer = buffer;
                _paramIndex = 0;
                int offset = 0;
    
                _params.Clear();
                foreach (DateType type in _types)
                {
                    DateType dateType = (DateType)buffer[offset++];
                    if (dateType != type)
                    {
                        LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}");
                    }
    
                    object obj = ToObject(dateType, ref offset);
                    _params.Add(obj!);
                    _paramIndex++;
                }
    
                _method?.Invoke(null, _params.ToArray());
            }
    
            private object ToObject(DateType type, ref int offset)
            {
                switch (type)
                {
                    case DateType.Message:
                        IMessage message = null;
                        if (!_param!.TryGetValue(_paramIndex, out message))
                        {
                            LogHelper.LogError("no find message");
                            return null;
                        }
                            
                        return ToMessage(ref offset, message);
                    case DateType.Boolean:
                        return ToBoolean(ref offset);
                    case DateType.Char:
                        return ToChar(ref offset);
                    case DateType.SByte:
                    case DateType.Byte:
                        return ToByte(ref offset);
                    case DateType.Int16:
                        return ToInt16(ref offset);
                    case DateType.UInt16:
                        return ToUInt16(ref offset);
                    case DateType.Int32:
                        return ToInt32(ref offset);
                    case DateType.UInt32:
                        return ToUInt32(ref offset);
                    case DateType.Int64:
                        return ToInt64(ref offset);
                    case DateType.UInt64:
                        return ToUInt64(ref offset);
                    case DateType.Single:
                        return ToSingle(ref offset);
                    case DateType.Double:
                        return ToDouble(ref offset);
                    case DateType.String:
                        return ToString(ref offset);
                    default:
                        LogHelper.LogError("no find dateType: " + type);
                        break;
                }
    
                return null;
            }
    
            public override void Dispose()
            {
                base.Dispose();
                _method = null;
                _types = null;
            }
        }
    }
    

    泛型Decode,然后Invokde

    namespace Game
    {
        public class RPCStatic<T> : RPCBase
        {
            private Action<T> _action;
            private IMessage _message;
    
            public RPCStatic()
            {
            }
    
            public virtual void Register(Action<T> action, IMessage message)
            {
                this._message = message;
                this._action = action;
            }
    
            public override void Decode(byte[] buffer)
            {
                base.buffer = buffer;
                int offset = 0;
                DateType dateType = (DateType)buffer[offset++];
    
                try
                {
                    if (dateType == DateType.Message)
                    {
                        IMessage arg = ToMessage(ref offset, _message);
                        _action?.Invoke((T)arg);
                    }
                    else
                    {
                        LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}");
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.LogError(ex.ToString());
                }
            }
    
            public override void Dispose()
            {
            }
        }
    }
    

    Socket.Send和Recv

    使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理

    namespace Game
    {
        public enum SocketState
        {
            None = 0,
            Connected = 1,
            Disconnected = 2,
            Connecting = 3,
            ConnectFailed = 4,
            Close = 5,
            Dispose = 6,
        }
    
        public class Tcp
        {
            private ConcurrentQueue<BuffMessage> _sendMsgs;
            private ConcurrentQueue<BuffMessage> _receiveMsgs;
            private TcpClient _tcpClient;
    
            private SocketState _socketState;
            private byte[] _recvBuff;
            private int _recvOffset;
            private int _delay = 10;
            private CancellationTokenSource _recvCancelToken;
            private CancellationTokenSource _sendCancelToken;
    
            public SocketState State { get { return _socketState; } }
            public string IP { get; set; }
            public int Port { get; set; }
    
            public NetworkStream Stream
            {
                get { return _tcpClient.GetStream(); }
            }
    
            public Tcp()
            {
                _sendMsgs = new ConcurrentQueue<BuffMessage>();
                _receiveMsgs = new ConcurrentQueue<BuffMessage>();
                _recvBuff = new byte[Globals.BUFFER_SIZE];
            }
    
            private void InitTcpClient()
            {
                _tcpClient = new TcpClient();
                _recvCancelToken = new CancellationTokenSource();
                _sendCancelToken = new CancellationTokenSource();
            }
    
            public void Update()
            {
                Profiler.BeginSample("on tcp rpc");
                if (_receiveMsgs.TryDequeue(out BuffMessage msg))
                {
                    RPCMoudle.OnRPC(msg);
                    GameFrame.message.PutBuffMessage(msg);
                }
                Profiler.EndSample();
            }
    
            public void Connect(string ip, int port)
            {
                IP = ip;
                Port = port;
                Connect();
            }
    
            public async void Connect()
            {
                try
                {
                    Close();
                    InitTcpClient();
                    SetSocketState(SocketState.Connecting);
                    await _tcpClient.ConnectAsync(IP, Port);
                    OnConnect();
                }
                catch (Exception ex)
                {
                    LogHelper.LogError(ex.ToString());
                }
            }
    
            private void OnConnect()
            {
                try
                {
                    if (_tcpClient.Connected)
                    {
                        LogHelper.Log("connected...");
                        SetSocketState(SocketState.Connected);
                        StartAsyncTasks();
                    }
                    else
                    {
                        SetSocketState(SocketState.ConnectFailed);
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message);
                    SetSocketState(SocketState.ConnectFailed);
                }
            }
    
            private void StartAsyncTasks()
            {
                UniTask send = UniTask.Create(SendThread);
                UniTask recv = UniTask.Create(RecvThread);
            }
    
            private async UniTask SendThread()
            {
                await UniTask.SwitchToThreadPool();
                while (_socketState == SocketState.Connected)
                {
                    while (true)
                    {
                        if (!_sendMsgs.TryDequeue(out BuffMessage msg))
                            break;
    
                        var timeoutToken = new CancellationTokenSource();
                        timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));
                        var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);
    
                        try
                        {
                            if(_sendCancelToken.IsCancellationRequested)
                                break;
    
                            await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);
                            LogHelper.Log($"发送完成: {msg.length} byte");
                            GameFrame.message.PutBuffMessage(msg);
                        }
                        catch (OperationCanceledException ex)
                        {
                            if (timeoutToken.IsCancellationRequested)
                            {
                                _sendMsgs.Enqueue(msg);
                                LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送...");
                                await UniTask.Delay(10);
                                continue;
                            }
    
                            LogHelper.LogWarning("发送操作被终止..." + ex.Message);
                            break;
                        }
                        catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted)
                        {
                            LogHelper.Log("发送操作被终止...");
                            break;
                        }
                        catch (Exception ex)
                        {
                            LogHelper.LogError("发送错误: " + ex.Message);
                            break;
                        }
                    }
    
                    await UniTask.Delay(_delay);
                }
            }
    
            private async UniTask RecvThread()
            {
                await UniTask.SwitchToThreadPool();
                while (_socketState == SocketState.Connected)
                {
                    try
                    {
                        if (_recvCancelToken.IsCancellationRequested) 
                            break;
    
                        int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);
                        if (length == 0)
                        {
                            LogHelper.Log("connect failed...");
                            break;
                        }
    
                        _recvOffset += length;
                        int offset = 0;
                        while (true)
                        {
                            if (_recvOffset - offset < sizeof(int))
                                // 没有足够的数据读取下一个消息的长度
                                break;
    
                            int dataLength = BitConverter.ToInt32(_recvBuff, offset);
                            if (_recvOffset - offset < dataLength + sizeof(int))
                                // 没有足够的数据读取完整的消息
                                break;
    
                            // 读取完整消息
                            BuffMessage msg = GameFrame.message.GetBuffMessage();
                            Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength);
                            _receiveMsgs.Enqueue(msg);
    
                            // 移动偏移量到下一个消息
                            offset += sizeof(int) + dataLength;
                        }
    
                        // 将未处理的数据移到缓冲区开头
                        if (_recvOffset - offset > 0)
                            Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);
    
                        _recvOffset -= offset;
                    }
                    catch(OperationCanceledException ex)
                    {
                        LogHelper.Log("读取操作被终止: " + ex.Message);
                        break;
                    }
                    catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted)
                    {
                        LogHelper.Log("读取操作被终止...");
                        break;
                    }
                    catch (Exception ex)
                    {
                        LogHelper.LogError("读取错误: " + ex.ToString());
                        break;
                    }
    
                    await UniTask.Delay(_delay);
                }
            }
    
            private void SetSocketState(SocketState state)
            {
                _socketState = state;
            }
    
            public void Send(BuffMessage message)
            {
                if (message.length > 0)
                {
                    int headLength = sizeof(int);
                    Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);
                    BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);
                    message.length += headLength;
                    _sendMsgs.Enqueue(message);
                }
                else
                {
                    GameFrame.message.PutBuffMessage(message);
                }
            }
    
            public void Close()
            {
                if (_tcpClient == null)
                    return;
    
                try
                {
                    if (_tcpClient.Connected)
                    {
                        SetSocketState(SocketState.Close);
                        _recvCancelToken.Dispose();
                        _sendCancelToken.Dispose();
                        _tcpClient.Close();
                    }
                }
                catch (Exception ex)
                {
                    LogHelper.LogError(ex.ToString());
                }
            }
    
            public void Dispose()
            {
                Close();
            
                if (_tcpClient != null)
                {
                    _tcpClient.Dispose();
                    _tcpClient = null;
                }
    
                if (_sendMsgs != null)
                {
                    _sendMsgs.Clear();
                    _sendMsgs = null;
                }
    
                if (_receiveMsgs != null)
                {
                    _receiveMsgs.Clear();
                    _receiveMsgs = null;
                }
    
                SetSocketState(SocketState.Dispose);
            }
        }
    }
    

    项目地址

    SimpleRPC

  • 相关阅读:
    泰山OFFICE技术讲座:关于文字方向的几种实现思路
    【RbMQ】RabbitMQ概念辨析
    Oracle数据库安装及配置
    linux下特定usb设备的权限设置
    论文阅读 (77):Abductive Learning with Ground Knowledge Base
    Rabbit加密算法:性能与安全的完美结合
    Linux常用命令——常用网络命令【二】
    使用cpolar远程连接群晖NAS(升级安全链接1)
    Vue 源码解读(10)—— 编译器 之 生成渲染函数
    试题二(15分)和试题三(15分) (软件设计师笔记)
  • 原文地址:https://blog.csdn.net/qq_25670983/article/details/139643155