• think\queue 消息队列


    简介

    TP 中使用 think-queue 可以实现普通队列和延迟队列

    think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

    • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
    • 队列的多队列, 内存限制 ,启动,停止,守护等
    • 消息队列可降级为同步执行

    消息队列实现过程

    1. 通过生产者推送消息到消息队列服务中
    2. 消息队列服务将收到的消息存入redis队列中(zset)
    3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
    4. 处理获取下来的消息调用业务类进行处理相关业务
    5. 业务处理后,需要从队列中删除消息

    安装queue

    composer 安装 think-queue

    composer require topthink/think-queue
    
    • 1

    配置文件

    在 \application\extra 新建queue.php 为 queue 的配置文件
    声明启动redis (服务器redis需要先开启安装redis扩展)

    
    return [
        'connector'  => 'Redis',          // Redis 驱动
        'expire'     => null,             // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
        'default'    => 'default',        // 默认的队列名称
        'host'       => '127.0.0.1',      // redis 主机ip
        'port'       => 6379,             // redis 端口
        'password'   => '',               // redis 密码
        'select'     => 0,                // 使用哪一个 db,默认为 db0
        'timeout'    => 0,                // redis连接的超时时间
        'persistent' => false,            // 是否是长连接
    ];
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    queue服务基类

    首先要写一个声明一个服务基类 命名空间及文件夹名 自己定义好 到直接位置
    我这里是在 \application\common\library\task\OrderClose.php 这个路径

    namespace app\common\library\task;
    
    use app\common\controller\Frontend;
    use think\Lang;
    use think\Response;
    use think\queue\Job;
    
    /**
     * 未支付订单到期自动关闭任务
     */
    class OrderClose
    {
        
        /**
         * fire方法是消息队列默认调用的方法
         * @param Job            $job      当前的任务对象
         * @param array|mixed    $data     发布任务时自定义的数据
         */
        public function fire(Job $job, $data)
        {
            // 此处做一些 check,提前判断是否需要执行
            // $isJobStillNeedToBeDone = $this->checkJob($data);
            // if(! $isJobStillNeedToBeDone){
            //     $job->delete();
            //     return;
            // }
            // if ($job->attempts() > 6) {
            //     $job->delete();
            //     // 也可以重新发布这个任务
            //     //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行
            // }
            // 执行逻辑处理(即:你需要该消息队列做什么)
            $isJobDone = $this->doJob($data);
            if ($isJobDone) {
                // 如果任务执行成功,记得删除任务
                $job->delete();
                // $job->release(10);
            } else {
                //删除任务
                $job->delete();
                // 通过这个方法可以检查这个任务已经重试了几次了
                
            }
        }
        
        /**
         * 有些消息在到达消费者时,可能已经不再需要执行了
         * @param $data 发布任务时自定义的数据
         * @return bool 任务执行的结果
         */
        private function checkJob($data){
            // $order_id = !empty($data['order_id']) ? $data['order_id'] : '';
            // if(!$order_id) return false;
            // $orderModel = new \app\api\model\order\Order;
            // $orderInfo = $orderModel->where(['id'=>$order_id])->find();
            // if(!$orderInfo) return false;
            // if($orderInfo->status != '1') return false;
            // //更新订单数据
            // $updateData = [
            //     'status'=>'10',
            //     'close_time'=>time(),
            //     'remarks'=>'未支付订单超时自动关闭'
            // ];
            // $ret = $orderModel->save($updateData);
            // if($ret){
            //     return true;
            // }else{
            //     return false;    
            // }
        }
        
        /**
         * 根据消息中的数据进行实际的业务处理...
         * @param $data
         * @return bool
         */
        private function doJob($data)
        {
            $order_id = !empty($data['order_id']) ? $data['order_id'] : '';
            if(!$order_id) return false;
            $orderInfo = \app\api\model\order\Order::where(['id'=>$order_id])->find();
            // \think\Log::write(['msg'=>'测试队列7878','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');
            if(!$orderInfo) return false;
            if($orderInfo->status != '1') return false;
            //更新订单数据
            $updateData = [
                'status'=>'10',
                'close_time'=>time(),
                'remarks'=>'未支付订单超时自动关闭'
            ];
            $ret = \app\api\model\order\Order::where(['id'=>$order_id])->update($updateData);
            //归还库存
            if($orderInfo['activity_id']){
                // \think\Log::write(['msg'=>'测试队列返还库存','data'=>$orderInfo,'order_id'=>$order_id,'activity_id'=>$orderInfo['activity_id']],'error');
                $orderProductInfo = \app\api\model\order\OrderProduct::where('order_id',$order_id)->find();
                $backStock = \app\api\model\activity\Activity::where('id',$orderInfo['activity_id'])->setInc('stock',$orderProductInfo['number']);
            }
            if($ret){
                return true;
            }else{
                return false;    
            }
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    $job->release(2); 重新调用任务 2是延迟2秒执行
    $job->delete(); 删除任务

    调用

    在实际应用中调用该基类

    //创建订单自动关闭任务
    $jobHandlerClassName = 'app\common\library\task\OrderClose';
    $jobData = ['order_id'=>$orderProductDatas['order_id']];
    $jobQueueName = 'order_close';
    //延迟执行
    $isPushed = \think\Queue::later(30 * 60,$jobHandlerClassName, $jobData,$jobQueueName);
    //立即执行
    $isPushed = \think\Queue::push($jobHandlerClassName, $jobData,$jobQueueName);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    两个方法,前者是(30 * 60)秒后执行,后者立即执行
    注意的是
    later 延迟执行 无反参 我执行成功 返回了null
    push 有反参 返回随机的字符串

    开启守护进程

    我服务器是用宝塔的守护进程
    在这里插入图片描述
    在这里安装Supervisor管理器
    安装完需要配置一下守护进程的命令行

    在这里插入图片描述
    名称必须是英文
    运行目录要选择项目根目录
    命令行如下 注意order_close 是你项目基类名称

    php think queue:listen --queue order_close
    
    • 1

    填写完成之后 保存就可以了

    如果不是宝塔环境 请看一下 supervisor—进程管理神器
    给我大哥点点关注

    结束

    这里的配置就基本结束了 基类中的 注释 表明了一下需要参数和方法 需要自己多揣摩一下
    还有自己的业务逻辑也要修改 这里只作为参考 大家自行删除就可以

  • 相关阅读:
    230. 二叉搜索树中第K小的元素
    Linux常用命令——bye命令
    150. 逆波兰表达式求值
    Docker-compose update db password
    代码随想录算法训练营第三十九天【动态规划part02】 | 62.不同路径、63. 不同路径 II
    Ubuntu 创建本地 Git 并与 Github(私有库) 交互(上传与下载)| 记录 | 踩坑
    C# 面试题及答案整理,最新面试题
    前端工程化知识系列(10)
    【RtpSeqNumOnlyRefFinder】webrtc m98: ManageFrameInternal 的帧决策过程分析
    深入理解JVM虚拟机第四篇:一些常用的JVM虚拟机
  • 原文地址:https://blog.csdn.net/weixin_43866089/article/details/126436143