// 新的socket连接回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr& conn)
{
if (!conn->connected())
{
// 和rpc client的连接断开了
conn->shutdown();
}
}
对于在网络上接收的字符流,在框架内部,RpcProvider和RpcConsumer协商好之间通信用的protobuf数据类型,定义proto的message类型,进行数据头的序列化和反序列化。
比如需要得知:service_name, method_name, args。不定义类型,传过来的字符流,是没办法识别的。
另外,为了防止粘包,可以在头部的message类型里添加 args_size,即参数的长度。
所以需要设计的部分有:
header_size(4个字节)(服务名字、方法名字) + header_str + args_str
最后设计出该类型
rpcHeader.proto
syntax = "proto3";
package mprpc;
message RpcHeader
{
bytes service_name = 1;
bytes method_name = 2;
uint32 args_size = 3;
}

该方法表示已建立连接用户的读写事件操作,如果有一个远程RPC服务的调用请求,那么OnMessage方法就会响应。
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buffer,
muduo::Timestamp);
std::string recv_buf = buffer->retrieveAllAsString();
// 从字符流中读取前4个字节的内容
uint32_t header_size = 0;
recv_buf.copy((char*)&header_size, 4, 0);
这里用到了std::string的copy方法:

std::string rpc_header_str = recv_buf.substr(4, header_size);
mprpc::RpcHeader rpcHeader;
std::string service_name;
std::string method_name;
uint32_t args_size;
if (rpcHeader.ParseFromString(rpc_header_str))
{
// 数据头反序列化成功
service_name = rpcHeader.service_name();
method_name = rpcHeader.method_name();
args_size = rpcHeader.args_size();
}
else
{
// 数据头反序列化失败
std::cout << "rpc_header_str:" << rpc_header_str << " parse error!" << std::endl;
return;
}
// 获取rpc方法参数的字符流数据
std::string args_str = recv_buf.substr(4 + header_size, args_size);
// 打印调试信息
std::cout << "============================================" << std::endl;
std::cout << "header_size: " << header_size << std::endl;
std::cout << "rpc_header_str: " << rpc_header_str << std::endl;
std::cout << "service_name: " << service_name << std::endl;
std::cout << "method_name: " << method_name << std::endl;
std::cout << "args_str: " << args_str.c_str() << std::endl;
std::cout << "============================================" << std::endl;
// 存储注册成功的服务对象和其服务方法的所有信息
std::unordered_map<std::string, ServiceInfo> m_serviceMap;
// service服务类型信息
struct ServiceInfo
{
google::protobuf::Service* m_service; // 保存服务对象
std::unordered_map<std::string, const google::protobuf::MethodDescriptor*> m_methodMap; // 保存服务方法
};
根据上面表的对应关系,先对其进行判空,然后获取service对象和method对象
auto it = m_serviceMap.find(service_name);
if (it == m_serviceMap.end())
{
std::cout << service_name << " is not exist!" << std::endl;
return;
}
auto mit = it->second.m_methodMap.find(method_name);
if (mit == it->second.m_methodMap.end())
{
std::cout << service_name << ":" << method_name << " is not exist!" << std::endl;
return;
}
google::protobuf::Service *service = it->second.m_service; // 获取service对象 new UserService
const google::protobuf::MethodDescriptor *method = mit->second; // 获取method对象 Login
google::protobuf::Message *request = service->GetRequestPrototype(method).New();
if (!request->ParseFromString(args_str))
{
std::cout << "request parse error, content:" << args_str << std::endl;
return;
}
google::protobuf::Message *response = service->GetResponsePrototype(method).New();
// Closure的回调操作,用于序列化RPC的响应和网络发送
void SendRpcResponse(const muduo::net::TcpConnectionPtr&, google::protobuf::Message*);
// Closure的回调操作,用于序列化rpc的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn, google::protobuf::Message* response)
{
std::string response_str;
if (response->SerializeToString(&response_str)) // response进行序列化
{
// 序列化成功后,通过网络把rpc方法执行的结果发送会rpc的调用方
conn->send(response_str);
}
else
{
std::cout << "serialize response_str error!" << std::endl;
}
conn->shutdown(); // 模拟http的短链接服务,由rpcprovider主动断开连接
}

// 给下面的method方法的调用,绑定一个Closure的回调函数
google::protobuf::Closure *done = google::protobuf::NewCallback<RpcProvider,
const muduo::net::TcpConnectionPtr&,
google::protobuf::Message*>
(this,
&RpcProvider::SendRpcResponse,
conn, response);
// 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
service->CallMethod(method, nullptr, request, response, done);