• Laravel Octane 和 Swoole 协程的使用分析


    之前在工作中使用 Laravel Octane 的 concurrently 处理并发时,发现在队列和定时任务中不会触发并发效果。经过分析,作了如下猜测:队列和定时任务都属于一个独立的进程,与 Octane 服务无关,而 Octane concurrently 恰恰需要在 Octane 环境下才能运行。

    后来通过代码进行环境检测和查看 php 的进程,证明猜想成立。

    info('check env', [
        'served by octane' => isset($_SERVER['LARAVEL_OCTANE']) && ((int)$_SERVER['LARAVEL_OCTANE'] === 1),
        'on swoole server' => (extension_loaded('swoole') || extension_loaded('openswoole')) && app()->bound(Server::class)
    ]);
    
    • 1
    • 2
    • 3
    • 4

    为了能够在任意代码中实现并发,我们研究参考了 Hyperf 框架关于协程的代码,然后抽取了如下两个类:

    
    
    namespace App\Services;
    
    use App\Exceptions\ParallelExecutionException;
    use Laravel\Octane\Facades\Octane;
    use Throwable;
    use Swoole\Coroutine as Co;
    
    class Parallel
    {
        protected array $callbacks = [];
        protected array $results = [];
        /**
         * @var Throwable[]
         */
        protected array $throwables = [];
    
        public function add(callable $callable, $key = null): void
        {
            if (is_null($key)) {
                $this->callbacks[] = $callable;
            } else {
                $this->callbacks[$key] = $callable;
            }
        }
    
        public function wait(bool $throw = true): array
        {
            if (isset($_SERVER['LARAVEL_OCTANE']) && ((int)$_SERVER['LARAVEL_OCTANE'] === 1)) {
                return Octane::concurrently($this->callbacks, 300000);
            }
    
            app('log')->useLoggingLoopDetection(false);
            
            Co\run(function () {
                foreach ($this->callbacks as $key => $callback) {
                    Co::create(function () use ($callback, $key) {
                        try {
                            $this->results[$key] = $callback();
                        } catch (Throwable $throwable) {
                            $this->throwables[$key] = $throwable;
                            unset($this->results[$key]);
                        }
                    });
    
                }
            });
    
            if ($throw && ($throwableCount = count($this->throwables)) > 0) {
                $message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowAbles($this->throwables);
                $executionException = new ParallelExecutionException($message);
                $executionException->setResults($this->results);
                $executionException->setThrowAbles($this->throwables);
                unset($this->results, $this->throwables);
                throw $executionException;
            }
    
            app('log')->useLoggingLoopDetection(true);
            return $this->results;
        }
    
        private function formatThrowAbles(array $throwables): string
        {
            $output = '';
            foreach ($throwables as $key => $value) {
                $output .= sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());
            }
            return $output;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    
    
    namespace App\Exceptions;
    
    use RuntimeException;
    
    class ParallelExecutionException extends RuntimeException
    {
        protected array $results = [];
    
        protected array $throwables = [];
    
        public function getResults(): array
        {
            return $this->results;
        }
    
        public function setResults(array $results): void
        {
            $this->results = $results;
        }
    
        public function getThrowAbles(): array
        {
            return $this->throwables;
        }
    
        public function setThrowAbles(array $throwables): array
        {
            return $this->throwables = $throwables;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    其中,第一个类的作用是检测系统是否运行在 Octane 环境下,是则调用Octane concurrently,否则就执行 Swoole 协程代码,使用起来也比较简单:

    $parallel = new Parallel();
    $parallel->add(fn() => $this->analysisStructure(), 'structure');
    $parallel->add(fn() => $this->analysisHabit(), 'habit');
    [
        'structure' => $structure,
        'habit' => $habit,
    ] = $parallel->wait();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    之所以没有完全使用 Swoole 协程,是因为相比之下,Octane 代码更加优雅,我们在期待着某天更新后,Octane concurrently 也能直接在队列中运行使用。

    第二个类的作用比较简单,就是对协程中异常的一个定义。

    另外在分析过程中,我们也发现了一个比较有意思的事情:
    在这里插入图片描述

    如图所示,当我在路由中运行检测代码时,Octane 和 Swoole Server 都为 true;在控制器中运行检测代码时,又只有 Octane 为true;为什么会有这样的区分?我个人猜测是 Octane 在将框架代码读进内存时,特意跳过了控制器中的代码,以避免数据更新不及时等问题。

    有知道具体原因的小伙伴,欢迎留言探讨。

  • 相关阅读:
    Mybatis如何批量插入数据?
    Selenium入门教程
    stm32f103c8t6学习笔记(学习B站up江科大自化协)-UNIX时间戳
    “中国版”马斯克被本尊翻牌:“如果是真的,我想见见他”
    2023年9月青少年软件编程(C 语言) 等级考试试卷(一级)
    企业架构LNMP学习笔记45
    CTFHUB - SSRF
    c++中的标准模板库
    什么是虚拟主播?虚拟数字人直播,不用出镜,不用露脸的直播方式
    HandlerMapping类是如何找到相应的controller呢?
  • 原文地址:https://blog.csdn.net/pptsv7/article/details/136355296