• 关于 Laravel Redis 多个进程同时取队列问题详解


    最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。

    使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:

    1

    2

    3

    4

    5

    6

    7

    8

    [program:laravel-worker]

    process_name=%(program_name)s_%(process_num)02d

    command=php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

    autostart=true

    autorestart=true

    numprocs=8

    redirect_stderr=true

    stdout_logfile=/var/www/xxx.cn/worker.log

    注意: numprocs = 8,代表开启 8 个进程来执行 command 中的命令。

    如下:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    PS C:\Users\tanteng\website\laradock> docker-compose exec php-worker sh

    /etc/supervisor/conf.d # ps -ef | grep php

     7 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     8 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     9 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     10 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     11 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     12 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     13 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     14 root  0:00 php /var/www/xxx.cn/artisan queue:work --queue=sendfile --tries=3 --daemon

     44 root  0:00 grep php

    Laravel 多进程读取队列内容是否会重复

    Laravel 的某个控制器方法,一次放入多个任务队列:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    public function index(Request $request)

    {

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

     $this->dispatch((new SendFile3())->onQueue('sendfile'));

    }

    在队列处理的方法打印日志,打印处理的队列的 ID:

    app/Jobs/SendFile3.php

    1

    2

    3

    4

    5

    6

    7

    8

    public function handle()

    {

     info('invoke SendFile3');

     dump('invoke handle');

     $rawbody = $this->job->getRawBody();

     $info = json_decode($rawbody, true);

     info('queue id:' . $info['id']);

    }

    Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    {

     "job": "Illuminate\\Queue\\CallQueuedHandler@call",

     "data": {

     "commandName": "App\\Jobs\\SendFile3",

     "command": "O:18:\"App\\Jobs\\SendFile3\":4:{s:6:\"\u0000*\u0000job\";N;s:10:\"connection\";N;s:5:\"queue\";s:8:\"sendfile\";s:5:\"delay\";N;}"

     },

     "id": "hadBcy3IpNsnOofQQdHohsa451OkQs88",

     "attempts": 1

    }

    请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:

    这个时候开启 Supervisor 处理队列任务,并查看日志:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:JaClJzhDEvntzLCRIz6uRQkCVLbE8Y9C

    [2017-12-23 19:01:01] local.INFO: queue id:ukHv0Li4P2VgPa55qU6yEOJM27Mo5YwJ

    [2017-12-23 19:01:01] local.INFO: queue id:ObMpwDTmnaveBUkU7aan5abt3Agyt90l

    [2017-12-23 19:01:01] local.INFO: queue id:fo2qZn2ftSdQtdnKOciMK7iJb4qlhRGE

    [2017-12-23 19:01:01] local.INFO: queue id:uLjFMoOU7Wk7bOAd4zpHb3ccRMJHBtR6

    [2017-12-23 19:01:01] local.INFO: queue id:87ULqPBObFmGr16nl5wxFVOi71zGCeRM

    [2017-12-23 19:01:01] local.INFO: queue id:9UVl0muQLzBqlRI99rChGW2ElXwVEMIE

    [2017-12-23 19:01:01] local.INFO: queue id:a0vgyZuz9HtmH7DGHEpXqesFTcQU3QAF

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:2cXuXxopPkgYiV4WO8gv9CJ6CwXeKtYL

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:9acTAYa8cxpJX6Q3Gb1sULokotP8reqZ

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:BPHQvBboChlv4gr2I0vyLVyw9bijtTYJ

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:Fm6tNajdxYKtdQbDMYDmwWJFLnNikRyg

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:nyAbcvSkBVPbaH3e2ItQkoLJlP1ficib

    [2017-12-23 19:01:01] local.INFO: queue id:WBHsSVZtP43569UoPXxfLLJcvYmPW7cP

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:bliPnKcRSDApwVmKLNxEhaKelhm0RDEY

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:eOAoQucEIwRz9uZ64xm6IDKgiqj9Xc3W

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:lzise9EiqQqINrhALbmAI4qNg7qylpb2

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:WXYKvcfOhS1pPnwOwUTsenoMv5l5EUXe

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:XtH5JiwLgnrwWzI02Oyi70pihAOkuJUD

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:9ehmE5HImlpNubpY0xWN8UVrOzxeMqws

    [2017-12-23 19:01:01] local.INFO: queue id:C1sK87cpZl47edLA0zhfo7PJ9MIEcoyx

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:2kwl51oH4lyyRrljCReGUCkNiJRDl7oe

    [2017-12-23 19:01:01] local.INFO: queue id:ObRpoqrYTPYiyv2delMlOXu3sAPpWJlN

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:6qgu6W3TapLjSrt688yv9HRXvDDLxntz

    [2017-12-23 19:01:01] local.INFO: queue id:wiTlERhwn7s9cQkfUF9lLlNADpXjKncI

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:ZSLW0VLFBDpL4wjTJzu3Yb3V45pNe807

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:qhZlXLGfGWRluIeNm7VbllmTJZYb2h5n

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:LUx1IByD3L2psNl9BZwHhk2knXyRPzW6

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:M2RESPjyo5hpAFxxL0EQbWwsUq4jpmWn

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:hUsGaiIAOO6ZfGQc5kGHGpsv5RpoRPYO

    [2017-12-23 19:01:01] local.INFO: queue id:cEHJsOy6bLeZ4NbncPziaHqlarMeyyEF

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:w4bkFiJKMU5saqG2xKN3ZRL5BYXGATMk

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:0zBuwbxlrEhhxKfYBkVyTY4z35f154sI

    [2017-12-23 19:01:01] local.INFO: queue id:mvoZvyDPvq4tcPjEy9G7PMtH3MwPkPik

    [2017-12-23 19:01:01] local.INFO: invoke SendFile3

    [2017-12-23 19:01:01] local.INFO: queue id:TLvF74eeidECWKtjZqWvW03UJTRPTL9r

    [2017-12-23 19:01:01] local.INFO: queue id:me8wyPfgcz0nf9xvcXz0hf2xVxqa1FFS

    这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。

    分析一下 Laravel 队列的处理

    Laravel 中入队列方法

    1

    2

    3

    4

    5

    6

    public function pushRaw($payload, $queue = null, array $options = [])

    {

     $this->getConnection()->rpush($this->getQueue($queue), $payload);

      

     return Arr::get(json_decode($payload, true), 'id');

    }

    用的是 Redis 的 rpush 命令。

    Laravel 中取队列方法

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    public function pop($queue = null)

    {

     $original = $queue ?: $this->default;

     $queue = $this->getQueue($queue);

     $this->migrateExpiredJobs($queue.':delayed', $queue);

     if (! is_null($this->expire)) {

      $this->migrateExpiredJobs($queue.':reserved', $queue);

     }

     list($job, $reserved) = $this->getConnection()->eval(

      LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire

     );

     if ($reserved) {

      return new RedisJob($this->container, $this, $job, $reserved, $original);

     }

    }

    这里用的是 lua 脚本取队列,如下:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    public static function pop()

    {

     return <<<'LUA'

    local job = redis.call('lpop', KEYS[1])

    local reserved = false

    if(job ~= false) then

    reserved = cjson.decode(job)

    reserved['attempts'] = reserved['attempts'] + 1

    reserved = cjson.encode(reserved)

    redis.call('zadd', KEYS[2], ARGV[1], reserved)

    end

    return {job, reserved}

    LUA;

    }

    那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。

     

  • 相关阅读:
    css网格布局
    Java成员方法的声明和调用
    麒麟-v10系统添加字体方法
    1、Spring简介
    Java基础35 面向对象三大特征之继承
    Android 序列化框架 Gson 原理分析,可以优化吗?
    MySQL 8 - 启用远程连接
    为啥小扎一直醉心于元宇宙的布局?
    调优C / C ++编译器以在多核应用程序中获得最佳并行性能:第一部分
    【电力运维】浅谈电力通信与泛在电力物联网技术的应用与发展
  • 原文地址:https://blog.csdn.net/sinat_40572875/article/details/128030134