• PHP + Laravel + RabbitMQ + Redis 实现消息队列 (五) 异常处理(死信)、延时队列与优先队列


    异常处理

    当我们遇到消息没有被正常消费的时候,需要对这条异常的消息进行处理,这里就会遇到一个问题,如果是直接丢弃消息,那么这条消息就会丢失。如果是直接重试,那么如果还是存在异常,这里就会陷入死循环。

    RabbitMQ中的异常处理

    死信队列

    死信队列,其实就是在满足一定规则的前提下,将消息发送到指定的一个交换机队列中。这些规则包括:

    • 消息被拒绝:消费者拒绝处理消息(使用 basic_nack 或 basic_reject),并且设置了 requeue=false。

      • basic_nack
        • 用法: 用于拒绝处理单个消息。
        • 参数:
          delivery_tag:消息的标识符。
          requeue:布尔值,决定是否将消息重新排入队列。
          行为:
          如果 requeue 设置为 true,消息会被重新放回队列。
          如果 requeue 设置为 false,消息将会被丢弃,或者发送到死信队列(如果已设置)。
      • basic_reject
        • 用法: 用于拒绝处理一组消息(批量)。
        • 参数:
          delivery_tag:消息的标识符。
          multiple:布尔值,是否批量拒绝多个消息。
          requeue:布尔值,决定是否将消息重新排入队列。
        • 行为:
          如果 requeue 设置为 true,所有拒绝的消息会被重新放回队列。
          如果 requeue 设置为 false,消息将会被丢弃,或者发送到死信队列(如果已设置)。
    • 消息过期:消息在队列中存活时间超过了 TTL(Time-To-Live)。

    • 队列满:消息无法进入队列,因为队列达到其最大长度限制。

    • 消息不可路由:消息无法路由到任何匹配的队列。

    创建一个死信队列
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel(); 
    
    // 死信队列及交换机
    $channel->exchange_declare('dead_letter', 'direct', false,true,false); // 定义交换机
    $channel->queue_declare('dead_letter_queue', false, true); // 定义队列
    $channel->queue_bind('dead_letter_queue', 'dead_letter'); // 队列绑定交换机
    
    
    echo "等待死信队列消息,或者使用 Ctrl+C 退出程序。";
    
    // 定义接收数据的回调函数
    $callback = function ($msg) {
        echo '死信队列接收到数据: ', $msg->body, PHP_EOL;
    };
    
    // 消费队列,获取到数据将调用 callback 回调函数
    $channel->basic_consume('dead_letter_queue', '', false, true, false, false, $callback);
    
    while ($channel->is_open()) {
        $channel->wait();
    }
    
    死信队列消费
    $channel->queue_declare('hello', false, true, false, false, false, new AMQPTable([
        'x-message-ttl'=>10000, // 10秒过期
        'x-dead-letter-exchange'=>'dead_letter', // 死信到某个交换机
        'x-dead-letter-routing-key'=>'', // 死信路由
    ]));
    
    $callback = function ($msg) {
        echo '接收到数据: ', $msg->body, PHP_EOL;
        $msg->nack();
    };
    

    Redis 中的异常处理

    因为在Redis中并没有这样的死信处理机制,所以这里是用Laravel 的queue + redis的逻辑来进行的处理。因为异常的队列 larveal会保存到一个异常处理表中所以首先要执行一个表迁移;

    php artisan queue:failed-table
    php artisan migrate
    

    然后进行创建一个异常的队列 这里不需要使用 –tries 这个参数,避免重试;

    // 查看所有错误的队列
    php artisan queue:failed
    // 通过uid可以把这条失败的数据又塞回之前的队列里了,消费者又会开始对它进行消费
    php artisan queue:retry uid
    

    Laravel 中任务失败的回调

    只需要在任务类中实现 failed() 方法。

    // /app/Jobs/Queue.php
    public function failed($exception = null)
    {
      echo '如果发生错误就进入到这里了,错误信息是:'.$exception->getMessage(), PHP_EOL;
    }
    

    延时队列

    RabbitMQ中的延时队列

    在RabbitMQ中其实是没有单独的延时队列的,但是这里我们可以提到上文中的死信队列的 x-message-ttl 参数亦或者是在消息中加入进行设置

    	$msg = new AMQPMessage('Hello World!' . time(),[
            'expiration'=> 3000,  // 消息 3 秒过期
        ]);
    

    Laravel框架中使用 Redis 实现

    在 Laravel 中,只需要在任务分发,也就是入队的时候,使用一个 delay() 方法就可以了。
    这个 delay() 方法接收一个 now() 助手函数返回的 Carbon 类型的时间对象。

    public function handle()
    {
      Queue::dispatch('任务发送时间:' . date('Y-m-d H:i:s'))
        ->delay(now()->addSeconds(random_int(0,10)));
      return 0;
    }
    

    具体的实现逻辑

    Redis中也没有延时队列的实现,那这里他是怎么实现的呢?
    首先它使用的数据类型是 Sorted Set (ZSet 有序集合)
    有序集合除了数据本身外,还有一个 score 分数字段可以用于排序,这里直接将时间戳当做 score 就可以实现按指定时间排序的功能了。
    我们使用 ZREMRANGEBYRANK 或者 ZPOPMIN 命令都可以拿到最新的数据,但是,Laravel 里面的更复杂一些。它是先把延时队列的迁移到 laravel_database_queues:default 队列,然后再进行普通队列的 POP 处理。在 /vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php 中,pop() 方法第一行就是调用一下 migrate() 方法。这个方法内部会继续调用 migrateExpiredJobs() 方法,传递的参数为 $queue.‘:delayed’ 和 $queue 参数名称为 $from 和 $to 。最后调用 /vendor/laravel/framework/src/Illuminate/Queue/LuaScripts.php 中的 migrateExpiredJobs() 方法,这个方法里面是一个 Lua 脚本,脚本中就是使用 zremrangebyrank 命令根据 score 顺序获取数据,接着再 rpush 到 default 队列中。

    public static function migrateExpiredJobs()
    {
      return <<<'LUA'
    -- Get all of the jobs with an expired "score"...
    local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
    
    -- If we have values in the array, we will remove them from the first queue
    -- and add them onto the destination queue in chunks of 100, which moves
    -- all of the appropriate jobs onto the destination queue very safely.
    if(next(val) ~= nil) then
        redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
    
        for i = 1, #val, 100 do
            redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
            -- Push a notification for every job that was migrated...
            for j = i, math.min(i+99, #val) do
                redis.call('rpush', KEYS[3], 1)
            end
        end
    end
    
    return val
    LUA;
    }
    

    优先队列

    优先级队列的实现,一般是通过大顶堆或者小顶堆的方式来实现。在PHP 的 SPL 扩展中也有通过大顶堆实现的优先级队列对象 SplPriorityQueue ;

    $queue = new SplPriorityQueue();
    
    // 插入元素
    $queue->insert('Low priority task', 1);
    $queue->insert('High priority task', 10);
    $queue->insert('Medium priority task', 5);
    
    // 提取元素
    while (!$queue->isEmpty()) {
        echo $queue->extract() . "\n";
    }
    
    

    RabbitMQ中的实现

    我们需要先设置一个队列的优先级容量 x-max-priority ,也就是在这个队列中,最大的优先级就到 10 。这个值可以设置到更大,但是官方推荐就到 10 就可以了。

    // 建立连接
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel(); // 获取频道
    
    $channel->queue_declare('queue', false, true, false, false, false, new AMQPTable([
        'x-max-priority'=>10, // 设置最大优先级
    ]));
    
    // 创建消息 随机生成10个随机优先级为 0-2的消息
    for ($i = 10; $i > 0; $i--) {
        $priority = random_int(0, 2);
        $body = '优先消息测试,当前优先级为:' . $priority;
        $msg = new AMQPMessage($body,
            ['priority' => $priority]
        );
        $channel->basic_publish($msg, '', 'queue'); // 将消息放入队列中
    
        echo "生产者向消息队列中发送信息:" . $body, PHP_EOL;
    }
    

    Laravel 中的优先队列

    在 laravel中 其实并不是一个完全的优先级队列实现,因为它针对的其实是不同的队列,而不是同一个队列中给不同的消息赋予不同的优先级。

    public function handle()
    {
      //
      for ($i = 10; $i > 0; $i--) {
        $queue = 'default';
        if ($i%3 == 1) {
          $queue = 'A';
        } else if ($i%3 == 2) {
          $queue = 'B';
        }
        sleep(random_int(0, 2));
        Queue6::dispatch('测试优先级,当前优先队列为:' . $queue . ',入队时间:' . date("Y-m-d H:i:s"))->onQueue($queue);
      }
    }
    

    在消费队列的时候需要指定队列的名称 来进行顺序控制

    php artisan queue:work --queue=B,A,default
    
  • 相关阅读:
    java计算机毕业设计社区人员管理系统源码+mysql数据库+系统+lw文档+部署
    linux rsyslog综合实战1
    Chromium 开发指南2024 Mac篇-开始编译Chromium(五)
    线程池shutdown引发TimeoutException
    hive中判断一个字符串是否包含另一个子串的四种方法,sql中也可用
    【空间统计入门】笔记—空间关系和空间权重矩阵
    哪些自动化工具赋能电商运营效率翻倍?
    位于同一子网下的ip在子网掩码配置错误的情况下如何进行通信(wireshrak抓包分析)
    Nginx学习笔记12——Nginx高可用和keepalived
    Java开发之框架(spring、springmvc、springboot、mybatis)【面试篇 完结版】
  • 原文地址:https://blog.csdn.net/weixin_43193813/article/details/141093782