1.首先查看项目中是否已经有think-queue目录:/vendor/topthink/

如果没有,则用composer安装(安装composer参考:http://www.runoob.com/w3cnote/composer-install-and-usage.html
),安装think-queue先进入到项目根目录,运行
composer require topthink/think-queue
安装完成后,有think-queue文件夹,再运行
php think queue:work -h
查看是否安装成功,出现如下,则安装成功

2.配置队列,queue内置了四种驱动,推荐使用redis驱动。配置文件在application/extra/queue.php,如下
- <?php
- use think\Env;
- return [
- 'connector' => 'Redis', // Redis 驱动
- 'expire' => Env::get('queue.expire',60), // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null (设置为null ,relase 方法将不生效)
- 'default' => 'default', // 默认的队列名称
- 'host' => Env::get('queue.host','127.0.0.1'), // redis 主机ip
- 'port' => Env::get('queue.port',6379), // redis 端口
- 'password' => Env::get('queue.password',''), // redis 密码
- 'select' => Env::get('queue.select',0), // 使用哪一个 db,默认为 db0
- 'timeout' => 0, // redis连接的超时时间
- 'persistent' => false, // 是否是长连接
- ];
3.消息的创建和推送,首先要添加队列
- namespace app\common\controller;
- use think\Queue;
- class OrderQueue
- {
- static $className = 'app\common\job';
- public static function createOrderQueue($jobData)
- {
- // 1.当前任务将由哪个类来负责处理。
- // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
- $jobHandlerClassName = self::$className.'\Order';
- // 2.当前任务归属的队列名称,如果为新队列,会自动创建
- $jobQueueName = "createOrderJob";
- // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
- // ( jobData 为对象时,需要在此处手动序列化,否则只存储其public属性的键值对 )
- // 4.将该任务推送到消息队列,等待对应的消费者去执行
- $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
- // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
- if( $isPushed !== false ){
- return true;
- }else{
- return false;
- }
- }
- }
4.消息队列的执行
- <?php
- namespace app\common\job;
-
- use app\common\model\ErrorLog;
- use think\queue\Job;
-
- class Order
- {
- /**
- * fire方法是消息队列默认调用的方法
- * @param Job $job 当前的任务对象
- * @param array|mixed $data 发布任务时自定义的数据
- */
- public function fire(Job $job,$data)
- {
- //业务处理代码,具体不贴出来了
- $isJobDone = $this->create($data);
-
- if ($isJobDone) {
- // 如果任务执行成功, 记得删除任务
- $job->delete();
- }else{
- if ($job->attempts() > 3) {
- //通过这个方法可以检查这个任务已经重试了几次了
-
- $job->delete();
-
- // 也可以重新发布这个任务
- //print("
Hello Job will be availabe again after 2s." ."\n"); - //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
- }
- }
- }
-
- private function create($data)
- {
- ErrorLog::insertLog('执行队列',__METHOD__,json_encode($data));
-
- return true;
- }
- }
到此,所有代码准备完毕,经历了一个消息的 创建 -> 推送 -> 消费 -> 删除 的基本流程,
上面添加队列的代码只要放在任意一个控制器内就行,部分目录机构如下
5.启用队列的监听模式,队列的监听模式有两种,配置参数如下

只需要到项目根目录执行
php think queue:listen --queue createOrderJob
即可,具体根据项目实际情况加上不同的参数。而且我们具体是不能要让命令以守护进程执行。所以我们可以换成下面这条命令
nohup php think queue:listen --queue createOrderJob >/dev/null 2>&1 &
到此,整个消息队列就完成了
6.测试:
- public function createOrder()
- {
- $data = [
- 'order_id' => 1,
- ];
-
- OrderQueue::createOrderQueue($data);
- }
参考 https://www.jianshu.com/p/e68e23f01490
注意 :
1.3.1 配置文件中的 expire 参数说明
expire 参数指的是任务的过期时间, 单位为秒。 过期的任务,其准确的定义是
expire 不为null 时 ,thinkphp-queue 会在每次获取下一个任务之前检查并重发过期(执行超时)的任务。
expire 为null 时,thinkphp-queue 不会检查过期的任务,性能相对较高一点。但是需要注意:
对expire 参数理解或者使用不当时,很容易产生一些bug,后面会举例提到。