• think-queue tp自带的消息队列类库


    简介

    其中主要介绍了关于使用think-queue来实现普通队列和延迟队列的相关内容,think-queue是thinkphp官方提供的一个消息队列服务

    基本特性:

    消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等
    队列的多队列, 内存限制 ,启动,停止,守护等
    消息队列可降级为同步执行

    消息队列实现过程
    1. 通过生产者推送消息到消息队列服务中
    2. 消息队列服务将收到的消息存入redis队列中(zset)
    3. 消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条
    4. 处理获取下来的消息调用业务类进行处理相关业务
    5. 业务处理后,需要从队列中删除消息

    安装

    安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

    composer require topthink/think-queue
    
    • 1

    配置

    配置文件位于 config/queue.php

      return [
           'connector'  => 'Redis',		// Redis 驱动
           'expire'     => 60,		// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 
           'default'    => 'default',		// 默认的队列名称
           'host'       => '127.0.0.1',	// redis 主机ip
           'port'       => 6379,		// redis 端口
           'password'   => '',		// redis 密码
           'select'     => 0,		// 使用哪一个 db,默认为 db0
           'timeout'    => 0,		// redis连接的超时时间
           'persistent' => false,		// 是否是长连接
         
       //    'connector' => 'Database',   // 数据库驱动
       //    'expire'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
       //    'default'   => 'default',    // 默认的队列名称
       //    'table'     => 'jobs',       // 存储消息的表名,不带前缀
       //    'dsn'       => [],
     
       //    'connector'   => 'Topthink',	// ThinkPHP内部的队列通知服务平台 ,本文不作介绍
       //    'token'       => '', 
       //    'project_id'  => '',
       //    'protocol'    => 'https',
       //    'host'        => 'qns.topthink.com',
       //    'port'        => 443,
       //    'api_version' => 1,
       //    'max_retries' => 3,
       //    'default'     => 'default',
     
       //    'connector'   => 'Sync',		// Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
       ];
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    如果使用数据库的话

    CREATE TABLE `prefix_jobs` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `queue` varchar(255) NOT NULL,
      `payload` longtext NOT NULL,
      `attempts` tinyint(3) unsigned NOT NULL,
      `reserved` tinyint(3) unsigned NOT NULL,
      `reserved_at` int(10) unsigned DEFAULT NULL,
      `available_at` int(10) unsigned NOT NULL,
      `created_at` int(10) unsigned NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    创建任务类

    namespace app\job;
    
    use think\queue\Job;
    
    class Job1{
        
        public function fire(Job $job, $data){
        
                //....这里执行具体的任务 
                
                 if ($job->attempts() > 3) {
                      //通过这个方法可以检查这个任务已经重试了几次了
                 }
                
                
                //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
                $job->delete();
                
                // 也可以重新发布这个任务
                $job->release($delay); //$delay为延迟时间
              
        }
        
        public function failed($data){
        
            // ...任务达到最大重试次数后,失败了
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    执行

    1. think\facade\Queue::push($job任务名(类名) , $data = ‘参数’, $queue = 队列名)
    2. think\facade\Queue::later($delay延时时间, $job任务名(类名@+方法名), $data = ‘参数’, $queue = 队列名)
    use think\facade\Queue;
    // 普通队列生成调用方式
    Queue::push($job, $data, $queueName);
    // 例:
    Queue::push(Test::class, $data, $queueName);
    // 延时队列生成调用方式
    Queue::later($delay, $job, $data, $queueName);
    // 例如使用延时队列 10 秒后执行:
    Queue::later(10 , Test::class, $data, $queueName);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    前者是立即执行,后者是在$delay秒后执行

    监听任务并执行

    queue:work 默认只执行一次队列请求,当请求执行完成后就终止;这里可以看出,要是延迟队列,就不能使用该命令喽
    queue:work --daemon 同listen 一样,只要运行着,就能一直接受请求,不一样的地方是在这个运行模式下,当新的请求到来的时候,**不重新加载整个框架 **, 而是直接 fire 动作。所以若是更改的代码,需要停止,然后重新启动。(该命令是在laravel4.2以后才加入的)

    &> php think queue:work
    
    • 1

    queue:listen 监听队列请求,只要运行着,就能一直接受请求,除非手动终止

    &> php think queue:listen 
    
    • 1
  • 相关阅读:
    【pen200-lab】10.11.1.21(实际获得22权限)
    (附源码)计算机毕业设计Java办公自动化管理系统
    Mysql索引
    从nuxt开始的SEO之路
    josn模块和ujson模块性能对比
    free-mybatis-plugin插件下载
    保健用品行业B2B电子商务系统:供采交易全链路数字化,助推企业管理精细化
    【javascript】内部引入与外部引入javascript
    349. 两个数组的交集
    基于记忆与模型协同过滤的电影推荐系统研究与实践(文末送书)
  • 原文地址:https://blog.csdn.net/wangzhae/article/details/128194396