• 使用workerman/redis-queue做队列(订阅)


    前言

    手上有一个短信群发通知功能,考虑到日后单个时间通知的手机号会有点多,故打算用workerman里的redis队列功能来实现(thinkphp6框架下)。

    部署

    1. 本地安装redis服务(如在本地调试的话),并在.env写入配置文件

    [REDIS]
    #AUTH为密码,本地为空
    AUTH=
    HOST=127.0.0.1
    PORT=6379
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. compser安装workerman/redis-queue

     composer require workerman/redis-queue:1.0.10
    
    • 1

    3. 使用make命令创建command(命令类)

     php think make:command Redis redis
    
    • 1

    4. 将workman启动参数复制到command文件夹下的Redis里

    <?php
    declare (strict_types = 1);
    
    namespace app\command;
    
    use common\Service\ImportFileService;
    use think\console\Command;
    use think\console\Input;
    use think\console\input\Argument;
    use think\console\input\Option;
    use think\console\Output;
    use Workerman\RedisQueue\Client;
    use Workerman\Worker;
    
    class Redis extends Command
    {
        //修改命令文件
        protected function configure()
        {
            // 指令配置
            $this->setName('redis')
                ->addArgument('status', Argument::REQUIRED, 'start/stop/reload/status')
                ->addOption('d', null, Option::VALUE_NONE, 'daemon(守护进程)方式启动')
                ->setDescription('start/stop/restart loadItem');
        }
    
        protected function init(Input $input, Output $output)
        {
            global $argv;
    
            $argv[1] = $input->getArgument('status') ?: 'start';
            if ($input->hasOption('d')) {
                $argv[2] = '-d';
            } else {
                unset($argv[2]);
            }
        }
    
        protected function execute(Input $input, Output $output)
        {
            $this->init($input, $output);
            $worker = new Worker();
            $worker->count = 1;
            $worker::$pidFile = rtrim(root_path(), '/') . '/runtime/load.pid';
            $worker::$logFile = rtrim(root_path(), '/') . '/runtime/load.log';
            $worker->name = 'loadItem';
            $worker->onWorkerStart = [$this, 'start'];
            $worker->runAll();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    5. 在config/console.php声明

        'commands' => [
            "redis" => app\command\Redis::class,
        ],
    
    • 1
    • 2
    • 3

    使用

    1. 将待处理的数据写入redis队列(生产),在需要的地方引入

    function redisQueueSend($redis, $queue, $data, $delay = 0)
    {
        $queue_waiting = '{redis-queue}-waiting';
        $queue_delay = '{redis-queue}-delayed';
        $now = time();
        $package_str = json_encode([
            'id' => rand(),
            'time' => $now,
            'delay' => 0,
            'attempts' => 0,
            'queue' => $queue,
            'data' => $data
        ]);
        if ($delay) {
            return $redis->zAdd($queue_delay, $now + $delay, $package_str);
        }
        return $redis->lPush($queue_waiting . $queue, $package_str);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    public function insertData(){
    	//连接redis
    	$redis = new \Redis();
    	$redis->connect(config('app.redis_host'), intval(config('app.redis_port')));
    	$redis->select(15);
    	//写入队列
    	$mobileList=['15267081542','15267081543'];
    	foreach ($mobileList as &$v) {
    		$v["mobile"] = $r->id;
    		//数据推入redis中
    		redisQueueSend($redis, "redis", $v);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2. 将数据从redis取出后处理(消费),写在command下的redis文件夹里

        public function start($work)
        {
            $options = [
                'db' => 15,
    //            "auth" => config('app.redis_auth')
            ];
            $client = new Client('redis://' . config('app.redis_host') . ":" . config('app.redis_port'), $options);
            $client->subscribe('load', function ($data) {
                ImportFileService::do($data);//data格式为['mobile'=>1]
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3. 打开redis客户端

    这里的redis就是上面configure里的name(名),start就是Argument(动作),启动worker用的

     php think redis start
    
    • 1

    参考资料:
    https://www.kancloud.cn/manual/thinkphp6_0/1037651
    https://www.workerman.net/doc/workerman/components/workerman-redis-queue.html

  • 相关阅读:
    运筹说 第66期|贝尔曼也有“演讲恐惧症”?
    SpringCloud 注册中心(Nacos)快速入门
    【数据结构(邓俊辉)学习笔记】向量01——接口与实现
    从mysql 5.7 升级到 8.0 的一些注意事项
    【线性代数】MIT Linear Algebra Lecture 6: Column space and nullspace
    WPF 入门笔记 - 03 - 样式基础及模板
    c#ushort转十六进制
    用于数据增强的十个Python库
    java基础
    新能源车“乱战时代”,车企们在争什么?
  • 原文地址:https://blog.csdn.net/xiantianga6883/article/details/127970449