• laravel8-rabbitmq消息队列-实时监听跨服务器消息


    使用场景介绍:
    1)用于实时监听远程服务器发出的消息(json格式消息),接受并更新消息状态,存储到本地服务器
    2)环境:lNMP(laravel8)
    3)服务器需要开启rabbitmq驱动队列

    1、composer安装rabbitmq扩展包

    vladimir-yuldashev/laravel-queue-rabbitmq
    参考文档:[https://blog.csdn.net/u012321434/article/details/126246141]

    2、安装配置文件

    1. 打开app/config/queue.php中connections数组中添加以下代码,根据实际情况填写相关配置信息
     'rabbitmq' => [
                'driver' => 'rabbitmq',
                'queue' => env('RABBITMQ_QUEUE', 'default'),
                'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class,
                'hosts' => [
                    [
                        'host' => env('RABBITMQ_HOST', '127.0.0.1'),
                        'port' => env('RABBITMQ_PORT', 5672),
                        'user' => env('RABBITMQ_USER', 'guest'),
                        'password' => env('RABBITMQ_PASSWORD', 'guest'),
                        'vhost' => env('RABBITMQ_VHOST', '/'),
                    ],
                ],
                'options' => [
                    'ssl_options' => [
                        'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', false),
                    ],
                    'queue' => [
                    	//此处直接添加到自定义的job任务中
                        'job' => App\Jobs\Rabbitmq\RabbitMQJob::class,
    
                        //以下配置是rabbitmq 广播模式(direct)
                        'exchange' => 'amq',
                        'exchange_type' => 'direct',
                        'exchange_routing_key' => '',
                    ],
                ],
    
                /*
                 * Set to "horizon" if you wish to use Laravel Horizon.
                 */
                'worker' => env('RABBITMQ_WORKER', 'default'),
            ],
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. .env文件中配置相关参数信息
    RABBITMQ_HOST=127.0.0.1
    RABBITMQ_PORT=5672
    RABBITMQ_USER=testuser
    RABBITMQ_PASSWORD=test
    RABBITMQ_VHOST=/project
    RABBITMQ_QUEUE=que_project
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 在app/config/logging.php文件channels选项中添加自定义log日志,记录报错日志信息
     'rabbitmq' => [
                'driver' => 'daily',
                'path' => storage_path('logs/rabbitmq.log'),
                'level' => env('LOG_LEVEL', 'debug'),
                'days' => 14,
            ],
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. RabbitMQJob.php
    namespace App\Jobs\Rabbitmq;
    
    use Illuminate\Support\Str;
    use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob as BaseJob;
    use App\Services\Rabbitmq\RabbitmqService;
    
    class RabbitMQJob extends BaseJob
    {
        public $tries = 1;
        public $timeout = 3600;
        public $maxExceptions = 3;
    
        public function fire()
        {
            $payload = $this->payload();
            (new RabbitmqService())->handle($payload['data']);
            $this->delete();
        }
    
        /**
         * Get the decoded body of the job.
         * 接收消息体并自定义处理
         * @return array
         */
        public function payload()
        {
            return [
                'uuid' => (string) Str::uuid(),
                'job'  => '\App\Services\Rabbitmq\RabbitmqService@handle',
                'maxTries' => $this->tries,
                'maxExceptions' => $this->maxExceptions,
                'timeout' => $this->timeout,
                'data' => json_decode($this->getRawBody(), true)
            ];
        }
    
        /**
         * Process an exception that caused the job to fail.
         *
         * @param  \Throwable|null  $e
         * @return void
         */
        protected function failed($e)
        {
            (new RabbitmqService())->failed($e);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    1. RabbitmqService.php
    
    namespace App\Services\Rabbitmq;
    
    use Illuminate\Support\Facades\Log;
    
    class RabbitmqService
    {
        protected $logName = 'rabbitmq';
        protected $connection;
        protected $channel;
        public $messageService;
    
        /**
         * 处理消息状态
         * @param $message .接收到的消息
         * @return bool
         */
        public function handle($message='')
        {
            //1.判断接收的消息情况
            Log::channel($this->logName)->info('接收的消息体:'.json_encode($message));
    		//接收到的消息
            $message = json_decode(json_encode($message), true);
            //2.消息自定义处理
        		
        }
    
        /**
         * 异常扑获
         * @param \Exception $exception
         */
        public function failed(\Exception $exception)
        {
            Log::channel($this->logName)->info('异常:'.json_encode($exception->getMessage()));
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    1. 服务器开启rabbitmq队列驱动,开始监听消息
    php artisan queue:work rabbitmq
    
    • 1
  • 相关阅读:
    结构设计模式 - 桥接设计模式 - JAVA
    每日OJ题_DFS回溯剪枝①_力扣46. 全排列(回溯算法简介)
    Spring Cloud Gateway夺命连环10问,带你彻底了解gateway
    【服务注册框架1】Eureka&nacos 两者的区别
    ORA-01005 vs ORA-28040
    【无标题】
    MySQL 数据库基础知识(系统化一篇入门)
    【软件测试】接手一个测试任务,怎么快速开始测试快速找出测试点?(总结)
    堆 Heap & 栈 Stack(.Net)【概念解析系列_3】【C# 基础】
    车载SOA测试利器——Parasoft SOA自动化测试方案
  • 原文地址:https://blog.csdn.net/kirsten_z/article/details/134361643