• Hyperf微服务——五、JsonRpc远程调用


    一、JsonRpc请求的实现、

    调用链从上到下

    namespace App\JsonRpc;
    //请求入口 通过容器获取接口对象
    class IndexController extends AbstractController
    {
        public function index()
        {
            $user_info = ApplicationContext::getContainer()->get(UserInfoInterface::class);
            return $user_info->getUserInfoById(1);
        }
    }
    
    
    namespace App\JsonRpc;
    //服务启动,自动生成服务提供者的接口代理文件
    class UserInfoInterface_7be0c5bab4e9d0b2671b5482e5fa29f4 extends \Hyperf\RpcClient\Proxy\AbstractProxyService implements UserInfoInterface
    {
        public function getUserInfoById($id)
        {
        	//通过抽象代理文件的__call方法进行调用
            return $this->client->__call(__FUNCTION__, func_get_args());
        }
    }
    
    
    namespace Hyperf\RpcClient;
    //rpc客户端的底层实现
    class ServiceClient extends AbstractServiceClient
    {
        /**
         * @var MethodDefinitionCollectorInterface
         */
        protected $methodDefinitionCollector;
    
        /**
         * @var string
         */
        protected $serviceInterface;
    
        /**
         * @var NormalizerInterface
         */
        private $normalizer;
    
        public function __construct(ContainerInterface $container, string $serviceName, string $protocol = 'jsonrpc-http', array $options = [])
        {
            $this->serviceName = $serviceName;
            $this->protocol = $protocol;
            $this->setOptions($options);
            parent::__construct($container);
            $this->normalizer = $container->get(NormalizerInterface::class);
            $this->methodDefinitionCollector = $container->get(MethodDefinitionCollectorInterface::class);
        }
    
    	//真正的rpc调用方法
        protected function __request(string $method, array $params, ?string $id = null)
        {
            if ($this->idGenerator instanceof IdGeneratorInterface && ! $id) {
                $id = $this->idGenerator->generate();
            }
            //生成调用地址并发送消息体到服务提供者
            $response = $this->client->send($this->__generateData($method, $params, $id));
            if (! is_array($response)) {
                throw new RequestException('Invalid response.');
            }
    
            $response = $this->checkRequestIdAndTryAgain($response, $id);
            if (array_key_exists('result', $response)) {
                $type = $this->methodDefinitionCollector->getReturnType($this->serviceInterface, $method);
                if ($type->allowsNull() && $response['result'] === null) {
                    return null;
                }
    
    			//消息体解码
                return $this->normalizer->denormalize($response['result'], $type->getName());
            }
    
            if ($code = $response['error']['code'] ?? null) {
                $error = $response['error'];
                // Denormalize exception.
                $class = Arr::get($error, 'data.class');
                $attributes = Arr::get($error, 'data.attributes', []);
                if (isset($class) && class_exists($class) && $e = $this->normalizer->denormalize($attributes, $class)) {
                    if ($e instanceof \Throwable) {
                        throw $e;
                    }
                }
    
                // Throw RequestException when denormalize exception failed.
                throw new RequestException($error['message'] ?? '', $code, $error['data'] ?? []);
            }
    
            throw new RequestException('Invalid response.');
        }
    
    
    	//对外部开放的调用方法
        public function __call(string $method, array $params)
        {
            return $this->__request($method, $params);
        }
    
        protected function setOptions(array $options): void
        {
            $this->serviceInterface = $options['service_interface'] ?? $this->serviceName;
    
            if (isset($options['load_balancer'])) {
                $this->loadBalancer = $options['load_balancer'];
            }
        }
    }
    
    
    namespace Hyperf\RpcClient;
    //打包消息为json格式并请求服务端
    class Client
    {
        /**
         * @var null|PackerInterface
         */
        private $packer;
    
        /**
         * @var null|TransporterInterface
         */
        private $transporter;
    
    	//消息体发送到服务提供者
        public function send($data)
        {
            if (! $this->packer) {
                throw new InvalidArgumentException('Packer missing.');
            }
            if (! $this->transporter) {
                throw new InvalidArgumentException('Transporter missing.');
            }
            //进行消息体编码并发送
            $packer = $this->getPacker();
            $packedData = $packer->pack($data);
            $response = $this->getTransporter()->send($packedData);
            return $packer->unpack((string) $response);
        }
    
    	//接收服务提供者返回的消息体
        public function recv()
        {
            $packer = $this->getPacker();
            $response = $this->getTransporter()->recv();
            return $packer->unpack((string) $response);
        }
    
        public function getPacker(): PackerInterface
        {
            return $this->packer;
        }
    
        public function setPacker(PackerInterface $packer): self
        {
            $this->packer = $packer;
            return $this;
        }
    
        public function getTransporter(): TransporterInterface
        {
            return $this->transporter;
        }
    
        public function setTransporter(TransporterInterface $transporter): self
        {
            $this->transporter = $transporter;
            return $this;
        }
    }
    
    
    namespace Hyperf\JsonRpc;
    //jsosrpc发送消息到服务提供者并接收返回
    class JsonRpcTransporter implements TransporterInterface
    {
        use RecvTrait;
    
        /**
         * @var null|LoadBalancerInterface
         */
        private $loadBalancer;
    
        /**
         * If $loadBalancer is null, will select a node in $nodes to request,
         * otherwise, use the nodes in $loadBalancer.
         *
         * @var Node[]
         */
        private $nodes = [];
    
        /**
         * @var float
         */
        private $connectTimeout = 5;
    
        /**
         * @var float
         */
        private $recvTimeout = 5;
    
        /**
         * @var array
         */
        private $config = [];
    
        public function __construct(array $config = [])
        {
            $this->config = array_replace_recursive($this->config, $config);
    
            $this->recvTimeout = $this->config['recv_timeout'] ?? 5.0;
            $this->connectTimeout = $this->config['connect_timeout'] ?? 5.0;
        }
    
    	/**
    	 * 基于swoole协程客户端发送方法实现消息体发送
    	 * swoole文档 https://wiki.swoole.com/#/coroutine_client/client?id=send
    	 **/
        public function send(string $data)
        {
            $client = retry(2, function () use ($data) {
                $client = $this->getClient();
                if ($client->send($data) === false) {
                    if ($client->errCode == 104) {
                        throw new RuntimeException('Connect to server failed.');
                    }
                }
                return $client;
            });
    
            return $this->recvAndCheck($client, $this->recvTimeout);
        }
    
    	/**
    	 * 基于swoole协程客户端接收方法实现消息体发送
    	 * swoole文档 https://wiki.swoole.com/#/coroutine_client/client?id=recv
    	 **/
        public function recv()
        {
            $client = $this->getClient();
    
            return $this->recvAndCheck($client, $this->recvTimeout);
        }
    
    	/**
    	 * 基于swoole协程客户端连接方法实现远程服务器连接
    	 * swoole文档 https://wiki.swoole.com/#/coroutine_client/client?id=connect
    	 **/
        public function getClient(): SwooleClient
        {
            $class = spl_object_hash($this) . '.Connection';
            if (Context::has($class)) {
                return Context::get($class);
            }
    
            return Context::set($class, retry(2, function () {
                $client = new SwooleClient(SWOOLE_SOCK_TCP);
                $client->set($this->config['settings'] ?? []);
                $node = $this->getNode();
                $result = $client->connect($node->host, $node->port, $this->connectTimeout);
                if ($result === false && ($client->errCode == 114 or $client->errCode == 115)) {
                    // Force close and reconnect to server.
                    $client->close();
                    throw new RuntimeException('Connect to server failed.');
                }
                return $client;
            }));
        }
    
        public function getLoadBalancer(): ?LoadBalancerInterface
        {
            return $this->loadBalancer;
        }
    
        public function setLoadBalancer(LoadBalancerInterface $loadBalancer): TransporterInterface
        {
            $this->loadBalancer = $loadBalancer;
            return $this;
        }
    
        /**
         * @param \Hyperf\LoadBalancer\Node[] $nodes
         */
        public function setNodes(array $nodes): self
        {
            $this->nodes = $nodes;
            return $this;
        }
    
        public function getNodes(): array
        {
            return $this->nodes;
        }
    
        /**
         * If the load balancer is exists, then the node will select by the load balancer,
         * otherwise will get a random node.
         */
        private function getNode(): Node
        {
            if ($this->loadBalancer instanceof LoadBalancerInterface) {
                return $this->loadBalancer->select();
            }
            return $this->nodes[array_rand($this->nodes)];
        }
    }
    
    
    //接收服务提供者返回的消息体并检查远程连接可用
    trait RecvTrait
    {
        /**
         * @param Client|RpcConnection $client
         * @param float $timeout
         */
        public function recvAndCheck($client, $timeout)
        {
            $data = $client->recv((float) $timeout);
            if ($data === '') {
                // RpcConnection: When the next time the connection is taken out of the connection pool, it will reconnecting to the target service.
                // Client: It will reconnecting to the target service in the next request.
                $client->close();
                throw new RecvException('Connection is closed. ' . $client->errMsg, $client->errCode);
            }
            if ($data === false) {
                $client->close();
                throw new RecvException('Error receiving data, errno=' . $client->errCode . ' errmsg=' . swoole_strerror($client->errCode), $client->errCode);
            }
    
            return $data;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334

    二、定义服务消费者

    如果我们的调用场景是 用户服务 调用 订单服务 ,那么首先需要按照上一篇文章 Hyperf微服务——四、第一个微服务的搭建 创建一个订单服务, 然后在用户服务按照如下步骤定义订单服务的服务消费者配置。

    1.配置服务消费者文件

    通过在 config/autoload/services.php配置文件内进行一些简单的配置,即可通过动态代理自动创建消费者类。

    <?php
    
    declare(strict_types=1);
    /**
     * This file is part of Hyperf.
     *
     * @link     https://www.hyperf.io
     * @document https://hyperf.wiki
     * @contact  group@hyperf.io
     * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
     */
    return [
        'enable' => [
            'discovery' => true,
            'register' => true,
        ],
        //通过闭包函数完成多个服务消费者的定义
        'consumers' => value(function () {
            $consumers = [];
            //统一包含定义服务消费者的关系文件
            $services = include __DIR__ . '/service_consumers.php';
            //循环处理服务消费者的配置信息
            foreach ($services as $name => $interface) {
                $consumers[] = [
                    'name' => $name,
                    'service' => $interface,
                    'protocol' => 'jsonrpc',
                    'load_balancer' => 'random',
                    'registry' => [
                        'protocol' => env('PROTOCOL', 'consul'),
                        'address' => env('CONSUL_URI', 'http://127.0.0.1:8500'),
                    ],
                    'options' => [
                        'connect_timeout' => 5.0,
                        'recv_timeout' => 5.0,
                        'settings' => [
                            // 根据协议不同,区分配置
                            'open_eof_split' => true,
                            'package_eof' => "\r\n",
                            // 'open_length_check' => true,
                            // 'package_length_type' => 'N',
                            // 'package_length_offset' => 0,
                            // 'package_body_offset' => 4,
                        ],
                        // 重试次数,默认值为 2,收包超时不进行重试。暂只支持 JsonRpcPoolTransporter
                        'retry_count' => 2,
                        // 重试间隔,毫秒
                        'retry_interval' => 100,
                        // 使用多路复用 RPC 时的心跳间隔,null 为不触发心跳
                        'heartbeat' => 30,
                        // 当使用 JsonRpcPoolTransporter 时会用到以下配置
                        'pool' => [
                            'min_connections' => 1,
                            'max_connections' => 32,
                            'connect_timeout' => 10.0,
                            'wait_timeout' => 3.0,
                            'heartbeat' => -1,
                            'max_idle_time' => 60.0,
                        ],
                    ],
                ];
            }
            return $consumers;
        }),
        'providers' => [],
        'drivers' => [
            'consul' => [
                'uri' => env('CONSUL_URI', 'http://127.0.0.1:8500'),
                'token' => '',
                'check' => [
                    'deregister_critical_service_after' => '90m',
                    'interval' => '1s',
                ],
            ],
        ],
    ];
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    2.配置服务消费者绑定关系

    创建配置文件 config/autoload/service_consumers.php定义服务消费者的关系,如果所有服务统一使用consul为注册中心,配置文件 config/autoload/services.php的其他项不需要修改。

    <?php
    //定义服务消费者配置文件,用来快速注册服务消费者
    return [
        //用户服务
        'OrderService' => App\JsonRpc\OrderInterface::class,
        //...
    ];
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. 注入接口文件

    这样便可以通过注入 OrderInterface 接口来使用客户端了
    复制服务提供者订单服务的OrderInterface文件到用户服务的JsonRpc目录,如下如所示,启动用户服务即可进行RPC远程调用。
    在这里插入图片描述

    4.封装rpc调用方法

    可以通过封装简易调用方法,以方便业务开发者使用,不需要关注调用逻辑。

    	//封装简易rpc调用,
       public function rpc($interface, $function, $args)
        {
            $interface = (new \ReflectionClass($interface))->getName();
            $data = ApplicationContext::getContainer()->get($interface)->$function($args);
            return $data;
        }
    
    	//调用demo
    	$this->rpc('\App\JsonRpc\OrderInterface','listOrder', []);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    至此,一个简单的微服务RPC调用已经实现了。下面就可以根据实际的业务场景进行深度开发了。

  • 相关阅读:
    Mac版AndroidStudio常用快捷键(汇总)
    数字化时代,企业为什么要做数字化转型?
    期货开户客户出入金是什么时间?
    C++11特性-右值与右值引用
    自动控制原理2.3---控制系统的结构图与信号流图
    单点登录常用协议原理和流程
    自动化测试:selenium(完结篇)
    pcl--第五节 点云表面法线估算
    探秘 | 简说IP地址以及路由器的功能究竟是什么?
    linux下centos7升级python版本
  • 原文地址:https://blog.csdn.net/LuckyStar_D/article/details/125404054