• go实现自定义rpc框架 (核心:服务端&客户端、自定义io流、编解码、服务发现、负载均衡、支持多语言网关等)


    git地址:GitHub - radmin-zhangjian/zrpc: zrpc go实现自定义rpc框架

    go开发脚手架 动态/静态路由 (根据gin框架)https://mp.csdn.net/mp_blog/creation/editor/126445356

    目录

    zrpc

    服务端

    客户端

    php客户端 (socket方式)

    网关 支持多语言

    php 连接网关 (curl方式)

    快速开始 - 服务端

    快速开始 - 客户端

    快速开始 - php客户端

    快速开始 - 网关


    zrpc


    说明: zrpc实现了以下基本功能
    1、自定义协议字节流
    2、定义编解码,支持多种方法
    3、注册服务方法
    4、反射调用方法及参数,根据struct方式和map方式传参
    5、支持同步和异步调用
    6、服务发现:redis实现(根据路径查询和redis-scan方式)、其他方式待实现
    7、负载均衡:随机和轮询、其他方式待实现
    8、捕获业务程序异常防止崩溃
    9、支持多语言 网关(http)
    10、php通过socket调用zrpc

    服务端

    go run test/server/main.go -addr=127.0.0.1:8091
    go run test/server/main.go -addr=127.0.0.1:8092
    go run test/server/main.go -addr=127.0.0.1:8093

    客户端

    go run test/client/main.go

    php客户端 (socket方式)

    test/client/client.php

    网关 支持多语言

    go run test/gateway/gateway.go

    php 连接网关 (curl方式)

    test/gateway/gateway.php

    快速开始 - 服务端

    1. package main
    2. import (
    3. "encoding/gob"
    4. "flag"
    5. "log"
    6. "zrpc/example/service"
    7. v1 "zrpc/example/v1"
    8. v2 "zrpc/example/v2"
    9. "zrpc/rpc"
    10. )
    11. // go run main.go -addr=127.0.0.1:8092
    12. var (
    13. addr = flag.String("addr", ":8092", "server address")
    14. registry = flag.String("registry", "redis://127.0.0.1:6379", "registry address")
    15. basePath = flag.String("basepath", "/zrpc_center", "")
    16. )
    17. // 自己定义数据格式的读写
    18. func main() {
    19. // 解析参数
    20. if !flag.Parsed() {
    21. flag.Parse()
    22. }
    23. // gob 编解码时需要注册
    24. gob.Register(map[string]interface{}{})
    25. gob.Register(service.User{})
    26. gob.Register(v1.User{})
    27. gob.Register(v2.User{})
    28. // 创建服务发现
    29. sd, err := rpc.CreateServiceDiscovery(*basePath, *registry, "", 0, 100)
    30. if err != nil {
    31. log.Fatal(err)
    32. }
    33. // 创建服务端
    34. srv := rpc.NewServer(*addr, sd)
    35. // 将服务端方法,注册一下
    36. //srv.Register(new(service.Test))
    37. srv.RegisterName(new(service.Test), "service")
    38. srv.RegisterName(new(v1.Test), "v1")
    39. srv.RegisterName(new(v2.Test), "v2")
    40. // 启动服务
    41. srv.Serve()
    42. }

    快速开始 - 客户端

    1. package main
    2. import (
    3. "encoding/gob"
    4. "flag"
    5. "fmt"
    6. "log"
    7. "sync"
    8. "time"
    9. "zrpc/example/service"
    10. v1 "zrpc/example/v1"
    11. v2 "zrpc/example/v2"
    12. "zrpc/rpc"
    13. "zrpc/rpc/center"
    14. )
    15. var (
    16. registry = flag.String("registry", "redis://127.0.0.1:6379", "registry address")
    17. basePath = flag.String("basepath", "/zrpc_center", "")
    18. cli *rpc.Client
    19. )
    20. // Args 参数
    21. type Args struct {
    22. Id int64
    23. X int64
    24. Y int64
    25. Z string
    26. }
    27. // 自己定义数据格式的读写
    28. func main() {
    29. // 解析参数
    30. if !flag.Parsed() {
    31. flag.Parse()
    32. }
    33. // 发现服务
    34. sd, err := rpc.ServiceDiscovery(*basePath, *registry, "", 0, 100)
    35. if err != nil {
    36. log.Fatal(err)
    37. }
    38. // 创建客户端
    39. if cli == nil {
    40. cli = rpc.NewClient(sd, center.SelectMode(center.Random), true)
    41. defer closeCli()
    42. }
    43. // 同步rpc
    44. var reply any
    45. // 参数 struct 格式
    46. str := "我是rpc测试参数!!!"
    47. args := Args{
    48. Id: 2,
    49. X: 20,
    50. Z: str,
    51. }
    52. errC := cli.Call("service.QueryUser", args, &reply)
    53. if errC != nil {
    54. fmt.Println("main.call.errC", errC)
    55. }
    56. reply1 := reply.(map[string]any)
    57. fmt.Println("main.call.reply", reply1["Age"])
    58. fmt.Println("==========================================")
    59. // 异步rpc
    60. var reply2 any
    61. call2 := cli.Go("v1.QueryInt", map[string]any{"Id": 10000, "msg": str}, &reply2, nil)
    62. <-call2.Done
    63. if call2.Error != nil {
    64. fmt.Printf("main.go.reply2.error: %v \n", call2.Error)
    65. }
    66. fmt.Printf("main.go.reply2: %v \n", reply2)
    67. time.Sleep(2 * time.Second)
    68. }

    快速开始 - php客户端

    1. (new TestController())->actionTest()
    2. class TestController extends Controller
    3. {
    4. const HEAD_MSG = "@**@";
    5. public $socket;
    6. // 析构函数
    7. public function __destruct() {
    8. socket_close($this->socket);
    9. }
    10. public function newRpc($host, $port) {
    11. $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die("Unable to create socket");
    12. @socket_connect($this->socket, $host, $port) or die("Connect error.");
    13. if ($err = socket_last_error($this->socket)) {
    14. socket_close($this->socket);
    15. die(socket_strerror($err));
    16. }
    17. }
    18. public function build($method, $data) {
    19. $response = [
    20. "ServiceMethod" => $method,
    21. "Args" => $data,
    22. "Reply" => null,
    23. ];
    24. return $response;
    25. }
    26. public function write($response) {
    27. $response = msgpack_pack($response);
    28. $len = strlen($response);
    29. $buf = pack("a4", self::HEAD_MSG);
    30. $buf .= pack("N", $len);
    31. $buf .= pack("a".$len, $response);
    32. return socket_write ($this->socket , $buf, strlen($buf));
    33. }
    34. public function read() {
    35. // 读取数据
    36. $hm = socket_read($this->socket, 4, PHP_BINARY_READ);
    37. if ($hm != self::HEAD_MSG) {
    38. print_r("HEAD_MSG ERROR: " . $hm);
    39. }
    40. $hLen = socket_read($this->socket, 4, PHP_BINARY_READ);
    41. $hLen = unpack("N", $hLen);
    42. $hLen = $hLen[1];
    43. $buffer = socket_read($this->socket, $hLen, PHP_BINARY_READ);
    44. $buffer = msgpack_unpack($buffer);
    45. return $buffer;
    46. }
    47. public function call($api, $data) {
    48. // 打包数据
    49. $buf = $this->build($api, $data);
    50. // 发送数据
    51. $this->write($buf);
    52. // 读取数据
    53. $buffer = $this->read();
    54. return $buffer;
    55. }
    56. // PHP 调用 zrpc 服务
    57. public function actionTest()
    58. {
    59. // 连接服务
    60. $this->newRpc("127.0.0.1", "8092");
    61. // 数据
    62. $data["Id"] = 1;
    63. $data["X"] = 20;
    64. $data["Z"] = "aaasssdddfffggghhh";
    65. $data["msg"] = "msg000";
    66. // 接口
    67. $api[] = "service.QueryUser";
    68. $api[] = "v1.QueryInt";
    69. // 调用
    70. for ($i=0; $i<10000; $i++) {
    71. $rand = mt_rand(0, 1);
    72. $buffer = $this->call($api[$rand], $data);
    73. var_dump(json_encode($buffer));
    74. }
    75. }
    76. }

    快速开始 - 网关

    1、启动网关

    1. package main
    2. import (
    3. "flag"
    4. "log"
    5. "zrpc/rpc"
    6. )
    7. var (
    8. addr = flag.String("addr", "127.0.0.1:8060", "addr server")
    9. registry = flag.String("registry", "redis://127.0.0.1:6379", "registry address")
    10. basePath = flag.String("basepath", "/zrpc_center", "")
    11. )
    12. func main() {
    13. // 解析参数
    14. if !flag.Parsed() {
    15. flag.Parse()
    16. }
    17. // http new
    18. http := rpc.NewHttp(*addr)
    19. // 发现服务
    20. sd, err := rpc.ServiceDiscovery(*basePath, *registry, "", 0, 100)
    21. if err != nil {
    22. log.Fatal(err)
    23. }
    24. // 注册服务
    25. router := http.RegServe(sd)
    26. // 启动http服务
    27. http.HttpServer(router)
    28. }

    2、客户端连接网关 php例子

    1. $url = 'http://localhost:8060/';
    2. $data['servicePath'] = 'v1';
    3. $data['serviceMethod'] = 'queryInt';
    4. $data['content'] = '{"Id": 10000, "msg": "测试api"}';
    5. $this->curl = new CurlRequest($url);
    6. $this->curl->setPostFeilds($data);
    7. $data = $this->curl->post();
    8. $data = json_decode($data, true);
    9. var_dump($data);
    10. // curl
    11. class CurlRequest
    12. {
    13. public $version = "1.0";
    14. public $handler;
    15. public $timeOut = 30;
    16. public $header = ['Expect:'];
    17. public $referer;
    18. public $postFields;
    19. public $url;
    20. public $ssl = false;
    21. public $type = "get";
    22. public $agent = "see curl/1.0";
    23. public $returnData;
    24. public function __construct($url)
    25. {
    26. $this->url = $url = trim($url);
    27. $this->handler = curl_init();
    28. curl_setopt($this->handler, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
    29. curl_setopt($this->handler, CURLOPT_RETURNTRANSFER, true);
    30. curl_setopt($this->handler, CURLOPT_URL, $url);
    31. //支持https
    32. $this->ssl = stripos($url, 'https://') === 0 ? true : false;
    33. if ($this->ssl) {
    34. curl_setopt($this->handler, CURLOPT_SSL_VERIFYPEER, false);
    35. curl_setopt($this->handler, CURLOPT_SSL_VERIFYHOST, false);
    36. }
    37. }
    38. public function setTimeOut($timeOut){
    39. $this->timeOut = $timeOut;
    40. }
    41. public function setOpt($opt, $value)
    42. {
    43. curl_setopt($this->handler, $opt, $value);
    44. }
    45. public function setHeader($header)
    46. {
    47. $this->header = array_merge($this->header, $header);
    48. }
    49. public function setCookie($cookie)
    50. {
    51. curl_setopt($this->handler, CURLOPT_COOKIE, $cookie);
    52. }
    53. public function setReferer($referer)
    54. {
    55. $this->referer = $referer;
    56. }
    57. public function setPostFeilds($postFields)
    58. {
    59. $this->postFields = $postFields;
    60. }
    61. public function setAgent($agent){
    62. $this->agent = $agent;
    63. }
    64. public function exec()
    65. {
    66. if (!empty($this->referer)) {
    67. curl_setopt($this->handler, CURLOPT_REFERER, $this->referer);
    68. }else{
    69. curl_setopt($this->handler, CURLOPT_AUTOREFERER, true);
    70. }
    71. //set header
    72. $this->setHeader(['PHP-SEE-TID:' . 1]);
    73. $this->setHeader(['PHP-SEE-SEQ:' . 2]);
    74. curl_setopt($this->handler,CURLOPT_HTTPHEADER,$this->header);
    75. curl_setopt($this->handler, CURLOPT_USERAGENT,$this->agent);
    76. curl_setopt($this->handler, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
    77. curl_setopt($this->handler, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
    78. curl_setopt($this->handler, CURLOPT_CONNECTTIMEOUT, $this->timeOut);
    79. curl_setopt($this->handler, CURLOPT_TIMEOUT, $this->timeOut);
    80. if($this->type=='post'){
    81. curl_setopt($this->handler, CURLOPT_POST, true);
    82. curl_setopt($this->handler, CURLOPT_POSTFIELDS, $this->postFields);
    83. }
    84. $this->returnData = curl_exec($this->handler);
    85. $httpcode = curl_getinfo($this->handler, CURLINFO_HTTP_CODE);
    86. if ($errorNo = curl_errno($this->handler) || $httpcode != 200) {
    87. //error message
    88. $errorMsg = curl_error($this->handler);
    89. \See::$log->warning("curl error, url:%s, type:%s, postData:%s, errorNo:%s, errorMsg:%s, httpcode:%s, return:%s",$this->url,$this->type,$this->postFields,$errorNo,$errorMsg,$httpcode,$this->returnData);
    90. }else{
    91. \See::$log->trace("curl success, url:%s, type:%s, postData:%s, httpcode:%s, return:%s",$this->url,$this->type,$this->postFields,$httpcode,$this->returnData);
    92. }
    93. curl_close($this->handler);
    94. return $this->returnData;
    95. }
    96. public function get(){
    97. $this->type = 'get';
    98. return $this->exec();
    99. }
    100. public function post(){
    101. $this->type ='post';
    102. return $this->exec();
    103. }
    104. }
  • 相关阅读:
    如何设计金融机构多场景关键应用下的存储架构
    Hikari连接池2--获取和归还连接
    IO模型5-进阶-Netty模型
    1832javaERP管理系统之车间计划管理Myeclipse开发mysql数据库servlet结构java编程计算机网页项目
    linux espidf vscode
    nodejs+vue菜谱美食食谱网站系统
    前端基础建设与架构18 对比 Koa 和 Redux:分析前端中的中间件理念
    Redis - 高级
    苍穹外卖——项目搭建
    springboot+vue+elementUI 健身房私人健身与教练预约管理系统 #毕业设计
  • 原文地址:https://blog.csdn.net/qq_37200336/article/details/126564093