• TP5的消息队列


    1.首先查看项目中是否已经有think-queue目录:/vendor/topthink/

    如果没有,则用composer安装(安装composer参考:http://www.runoob.com/w3cnote/composer-install-and-usage.html
    ),安装think-queue先进入到项目根目录,运行


     

    composer require topthink/think-queue

    安装完成后,有think-queue文件夹,再运行 

    php think queue:work -h

    查看是否安装成功,出现如下,则安装成功 

    2.配置队列,queue内置了四种驱动,推荐使用redis驱动。配置文件在application/extra/queue.php,如下 

    1. <?php
    2. use think\Env;
    3. return [
    4. 'connector' => 'Redis', // Redis 驱动
    5. 'expire' => Env::get('queue.expire',60), // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null (设置为null ,relase 方法将不生效)
    6. 'default' => 'default', // 默认的队列名称
    7. 'host' => Env::get('queue.host','127.0.0.1'), // redis 主机ip
    8. 'port' => Env::get('queue.port',6379), // redis 端口
    9. 'password' => Env::get('queue.password',''), // redis 密码
    10. 'select' => Env::get('queue.select',0), // 使用哪一个 db,默认为 db0
    11. 'timeout' => 0, // redis连接的超时时间
    12. 'persistent' => false, // 是否是长连接
    13. ];

     3.消息的创建和推送,首先要添加队列

    1. namespace app\common\controller;
    2. use think\Queue;
    3. class OrderQueue
    4. {
    5. static $className = 'app\common\job';
    6. public static function createOrderQueue($jobData)
    7. {
    8. // 1.当前任务将由哪个类来负责处理。
    9. // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
    10. $jobHandlerClassName = self::$className.'\Order';
    11. // 2.当前任务归属的队列名称,如果为新队列,会自动创建
    12. $jobQueueName = "createOrderJob";
    13. // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
    14. // ( jobData 为对象时,需要在此处手动序列化,否则只存储其public属性的键值对 )
    15. // 4.将该任务推送到消息队列,等待对应的消费者去执行
    16. $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
    17. // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
    18. if( $isPushed !== false ){
    19. return true;
    20. }else{
    21. return false;
    22. }
    23. }
    24. }

    4.消息队列的执行 

    1. <?php
    2. namespace app\common\job;
    3. use app\common\model\ErrorLog;
    4. use think\queue\Job;
    5. class Order
    6. {
    7. /**
    8. * fire方法是消息队列默认调用的方法
    9. * @param Job $job 当前的任务对象
    10. * @param array|mixed $data 发布任务时自定义的数据
    11. */
    12. public function fire(Job $job,$data)
    13. {
    14. //业务处理代码,具体不贴出来了
    15. $isJobDone = $this->create($data);
    16. if ($isJobDone) {
    17. // 如果任务执行成功, 记得删除任务
    18. $job->delete();
    19. }else{
    20. if ($job->attempts() > 3) {
    21. //通过这个方法可以检查这个任务已经重试了几次了
    22. $job->delete();
    23. // 也可以重新发布这个任务
    24. //print("Hello Job will be availabe again after 2s."."\n");
    25. //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
    26. }
    27. }
    28. }
    29. private function create($data)
    30. {
    31. ErrorLog::insertLog('执行队列',__METHOD__,json_encode($data));
    32. return true;
    33. }
    34. }

    到此,所有代码准备完毕,经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程,
    上面添加队列的代码只要放在任意一个控制器内就行,部分目录机构如下

    5.启用队列的监听模式,队列的监听模式有两种,配置参数如下 

    只需要到项目根目录执行

    php think queue:listen --queue createOrderJob

    即可,具体根据项目实际情况加上不同的参数。而且我们具体是不能要让命令以守护进程执行。所以我们可以换成下面这条命令 

    nohup php think queue:listen --queue createOrderJob >/dev/null 2>&1 &

    到此,整个消息队列就完成了

    6.测试:

    1. public function createOrder()
    2. {
    3. $data = [
    4. 'order_id' => 1,
    5. ];
    6. OrderQueue::createOrderQueue($data);
    7. }

    参考 https://www.jianshu.com/p/e68e23f01490

    注意 :

    1.3.1 配置文件中的 expire 参数说明

    expire 参数指的是任务的过期时间, 单位为秒。 过期的任务,其准确的定义是

    1. 任务的状态为执行中
    2. 任务的开始执行的时刻 + expire > 当前时刻

    expire 不为null 时 ,thinkphp-queue 会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。

    expire 为null 时,thinkphp-queue 不会检查过期的任务,性能相对较高一点。但是需要注意:

    • 这些执行超时的任务会一直留在消息队列中,需要开发者另行处理(删除或者重发)!

    对expire 参数理解或者使用不当时,很容易产生一些bug,后面会举例提到。

     

  • 相关阅读:
    QTableView、QTableWidget通过setColumnWidth改变列宽无效的问题解决
    教师产假多少天
    解决网络协议服务器问题的关键:定位能力与抓包技术
    手写自定义springboot-starter,感受框架的魅力和原理
    Scatter plot with histograms
    scanpy 空转数据结构seurattoscanpy spatial
    运维知识点-MySQL从小白到入土
    操作系统复习:引论
    达美乐中国再闯港交所,能否IPO必达?
    线性规划的对偶理论
  • 原文地址:https://blog.csdn.net/zhangleia/article/details/127667006