think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。
think-queue支持消息队列的基本特性
- 消息的发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等
- 队列的多队列、内存限制、启动、停止、守护等
- 消息队列可降级位同步执行
think-queue各主版本对应适用的ThinkPHP版本
| think-queue版本号 | 适用的ThinkPHP版本 |
|---|---|
| 1.x | ThinkPHP5.0 |
| 2.x | ThinkPHP5.1 |
| 3.x | ThinkPHP6.0 |
composer require topthink/think-queue
使用 Redis [推荐],详见 《Redis安装与使用》
使用 数据库 [不推荐]
config/queue.php
默认的驱动类型sync更改为redis,使用前者的发布任务的只会同步执行,推荐使用redis。使用redis驱动类型,需要事先安装好Redis服务,详见 《Redis安装与使用》

公共配置
[
'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
]
驱动类型
| 驱动类型 | 对应的类型值 |
|---|---|
| sync | 同步执行, 默认值 |
| database | 数据库驱动 |
| redis | Redis驱动 【推荐】 |
| 其他自定义的完整的类名 | ··· |
单模块项目推荐使用
app\job作为任务类的命名空间,多模块项目可用使用app\module\job作为任务类的命名空间,也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数think\queue\Job $job(当前的任务对象)和$data(发布任务时自定义的数据)
还有个可选的任务失败执行的方法failed传入的参数为$data(发布任务时自定义的数据)
单任务的类
namespace app\job;
use think\queue\Job;
class Job1
{
public function fire(Job $job, $data)
{
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function failed($data)
{
// ...任务达到最大重试次数后,失败了
}
}
多任务的类
namespace app\lib\job;
use think\queue\Job;
class Job2
{
public function task1(Job $job, $data)
{
}
public function task2(Job $job, $data)
{
}
public function failed($data)
{
}
}
think\facade\Queue::push($job, $data = '', $queue = null)和think\facade\Queue::later($delay, $job, $data = '', $queue = null)两个方法,前者是立即执行,后者是在$delay秒后执行
$job是任务名
app\job的,比如上面的例子一,写Job1类名即可app\module\job的,写model/Job1即可app\lib\job\Job2app\lib\job\Job2@task1、app\lib\job\Job2@task2$data是你要传到任务里的参数
$queue队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
> php think queue:listen
> php think queue:work
两种,具体的可选参数可以输入命令加--help查看
使用Supervisor,保证进程常驻
php think queue:listen --queue testQueue

参考资料: