• php cli 多进程编程


    前言

    php cli 命令模式我想在日常开发中,大家用的都比较少。其实,在某些场景,cli命令真的很有作用,

    我举个例子

    mysql数据库的某个表tab1中数据量有3000W+条数据,现在需要对这张表中的每一条数据做计算处理。将处理的结果放置在新表tab2上,因为数据量很大,我想大家不会采用php-fpm模式去处理这个吧,肯定还是要cli,并将执行时间设置为长时执行(set_time_limit(0))。经测试,单条数据处理耗时3s左右。那么3000W+ * 3 = 9000W 多秒的时间才能计算完全部的数据,这个时间是运营无法接受的。那么我们的想法肯定是单进程处理改为多进程处理。将计算量分摊到多个进程上处理达到优化的目的。 那么我们怎么来设计呢。

    程序设计

    封装 - php - swoole_proccess 多进程模块

    
    
    class ProcessPool{
        private $process;
    
        /**
         * Worker 进程数组
         * @var array
         */
        private $process_list = [];
    
        /**
         * 正在被使用的进程
         * @var array
         */
        private $process_use = [];
    
        /**
         * 最少进程数量
         * @var int
         */
        private $min_worker_num = 3;
    
        /**
         * 最多进程数量
         * @var int
         */
        private $max_worker_num = 6;
    
        /**
         * 当前进程数量
         * @var int
         */
        private $current_num;
    
        /**
         * @var callable 闭包待执行函数
         */
        public $callable;
    
        public function __construct($callable,$max_worker_num = 6)
        {
            $this->callable = $callable;
            $this->max_worker_num = $max_worker_num;
    
            $this->process = new swoole_process(array($this, 'run'), false, 2);
            $this->process->start();
            swoole_process::wait();
        }
    
        public function run()
        {
            $this->current_num = $this->min_worker_num;
            //建立全部的worker进程
            for($i = 0; $i < $this->current_num; $i++){
                $process = new swoole_process(array($this, 'task_run'), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 0;
            }
    
            foreach($this->process_list as $process){
                swoole_event_add($process->pipe, function ($pipe) use ($process){
                    $data = $process->read();
    //                var_dump($data . '空闲');
                    //接收子进程处理完成的信息,而且重置为空闲
                    $this->process_use[$data] = 0;
                });
            }
    
            //每秒定时向worker管道投递任务
            swoole_timer_tick(1000 ,function ($timer_id){
                static $index = 0;
                $index = $index + 1;
                $flag = true; //是否新建worker
                foreach ($this->process_use as $pid => $used){
                    if($used == 0){
                        $flag = false;
                        //标记为正在使用
                        $this->process_use[$pid] = 1;
                        // 在父进程内调用write,子进程能够调用read接收此数据
                        $this->process_list[$pid]->write($index);
                        break;
                    }
                }
    
                if($flag && $this->current_num < $this->max_worker_num){
                    //没有闲置worker,新建worker来处理
                    $process = new swoole_process(array($this, 'task_run'), false, 2);
                    $pid = $process->start();
                    $this->process_list[$pid] = $process;
                    $this->process_use[$pid] = 1;
                    $this->process_list[$pid]->write($index);
                    $this->current_num++;
                }
    //            var_dump('第' .$index. '个任务');
                if($index >= ($this->max_worker_num + 1)){
                    foreach($this->process_list as $process){
                        $process->write("exit");
                    }
                    swoole_timer_clear($timer_id);
                    $this->process->exit();
                }
    
            });
        }
    
        /**
         * 子进程处理
         * @param $worker
         */
        public function task_run($worker)
        {
            swoole_event_add($worker->pipe, function($pipe) use($worker){
                $data = $worker->read();
    //            var_dump($worker->pid . ':' . $data);
                if($data == 'exit'){
                    $worker->exit();
                    exit;
                }
                //模拟耗时任务
    
    //            sleep(5);
    
                call_user_func($this->callable,$worker,$data);
    
                //告诉主进程处理完成
                //在子进程内调用write,父进程能够调用read接收此数据
                $worker->write($worker->pid);
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133

    对数据量进行分摊,交给不同的进程处理。比如进程1处理0-200W的数据,进程2处理200W-300W的数据,依次类推。并做好日志记录,记录哪些处理失败了,原因是什么,方便后期针对性处理。

    
    	$count = 30000000;//3000W   数据总量
    	$secNum = 2000000;  //200W  每个进程要处理的数据量
        $maxWorkNum = intval(ceil($count / $secNum));
        $processPool = new ProcessPool(function(\Swoole\Process $worker,$index) use($maxWorkNum,$secNum){
            //index 为进程编号从1开始计数;work为进程对象
            $start = (($index-1) * $secNum) + 1;
            $end   = $index > $maxWorkNum ? 0 : $index * $secNum;
    
    		//开始任务
            $companyJob = new CompanyRelationshipJob($start,$end,$index);
            $companyJob->run();
        },$maxWorkNum);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    总结

    当遇到数据量大的业务场景,我们需要考虑怎么将数据量均摊给多个进程处理,利用多核cpu的优势加快数据的处理满足业务需求。

  • 相关阅读:
    【分布式事务】
    机器学习之数据清洗和预处理
    世界各国GDP相关面板数据(1960-2019年)
    AI生成PPT:如何轻松制作专业的答辩PPT?
    数据类型【MySQL】
    Timeline 时间线自定义节点为Checkbox
    Java 8中的map和flatMap方法的区别
    LabVIEW通信-CAN
    春秋云镜 CVE-2017-5638
    黑客嫌100万美元太少,上市企业敏感数据遭泄露
  • 原文地址:https://blog.csdn.net/u014559227/article/details/136391494