// 4.将该任务推送到消息队列,等待对应的消费者去执行
i
s
P
u
s
h
e
d
<
/
s
p
a
n
>
=
Q
u
e
u
e
:
:
p
u
s
h
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
isPushed = Queue::push( isPushed</span>=Queue::push(<spanclass="hljs−variable">jobHandlerClassName ,
j
o
b
D
a
t
a
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
jobData , jobData</span>,<spanclass="hljs−variable">jobQueueName );
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false if( $isPushed !== false ){ echo date(‘Y-m-d H:i:s’) . " a new Hello Job is Pushed to the MQ".“ ”; }else{ echo‘Oops, something went wrong.’; } } }
listen命令所在的进程会循环地创建 单次执行模式的 work 进程,每次创建的 work 进程只消费一个消息就会结束, 然后 listen 进程再创建一个新的 work 进程,
listen 进程会定时检查当前的 work 进程执行时间是否超过了 —timeout 参数的值, 如果已超时, 则 listen 进程会 kill 掉 work 进程, 然后抛出异常
listen 进程会通过管道来监听当前的 work 进程的输出, 当 work 进程有输出时, listen 进程会将输出写入到 stdout / stderr
listen 进程会定时通过 proc_get_status() 来监控当前的 work 进程是否仍在运行, work 进程消费完一个任务之后, work 进程就结束了,其状态会变成 terminated, 此时 listen 进程就会重新创建一个新的 work 进程并对其计时, 新的 work 进程开始消费下一个任务
2.3.2 结束时机不同
work 命令的结束时机在上面的执行原理部分已叙述,此处不再重复
listen 命令中,listen 进程和 work 进程会在以下情况下结束:
listen 进程会定时检查当前的 work 进程的执行时间是否超过了 —timeout 参数的值,如果已超时, 此时 listen 进程会先 kill 掉当前的 work 进程, 然后抛出一个 ProcessTimeoutException 异常并结束 listen 进程
listen 进程会定时检查自身使用的内存是否超过了 --memory 参数的值,如果已超过, 此时 listen 进程会直接 die 掉, work 进程也会自动结束.
t
a
s
k
T
y
p
e
<
/
s
p
a
n
>
=
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
taskType = taskType</span>=<spanclass="hljs−variable">_GET[‘taskType’]; switch (KaTeX parse error: Expected '}', got 'EOF' at end of input: …hljs-variable">jobHandlerClassName = ‘application\index\job\MultiTask@taskA’; KaTeX parse error: Expected 'EOF', got '&' at position 63: …'a' =&̲gt; jobQueueName = “multiTaskJobQueue”; break; case‘taskB’: KaTeX parse error: Undefined control sequence: \index at position 69: …g">'application\̲i̲n̲d̲e̲x̲\job\MultiTask@…jobDataArr = [‘b’ => ‘2’]; $jobQueueName = “multiTaskJobQueue”; break; default: break; }
i
s
P
u
s
h
e
d
<
/
s
p
a
n
>
=
Q
u
e
u
e
:
:
p
u
s
h
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
isPushed = Queue::push(isPushed</span>=Queue::push(<spanclass="hljs−variable">jobHandlerClassName,
j
o
b
D
a
t
a
A
r
r
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
jobDataArr, jobDataArr</span>,<spanclass="hljs−variable">jobQueueName); if (KaTeX parse error: Expected '}', got 'EOF' at end of input: …s="hljs-subst">taskType of MultiTask Job has been Pushed to “.KaTeX parse error: Expected 'EOF', got '&' at position 49: …"hljs-string">"&̲lt;br>"taskType of MultiTask Job Failed!”); } }
// 延迟 2 秒执行
i
s
P
u
s
h
e
d
<
/
s
p
a
n
>
=
Q
u
e
u
e
:
:
l
a
t
e
r
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
n
u
m
b
e
r
"
>
2
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
isPushed = Queue::later( 2, isPushed</span>=Queue::later(<spanclass="hljs−number">2</span>,<spanclass="hljs−variable">jobHandlerClassName,
j
o
b
D
a
t
a
A
r
r
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
jobDataArr, jobDataArr</span>,<spanclass="hljs−variable">jobQueueName);
// 延迟到 2017-02-18 01:01:01 时刻执行
t
i
m
e
2
w
a
i
t
<
/
s
p
a
n
>
=
s
t
r
t
o
t
i
m
e
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
s
t
r
i
n
g
"
>
′
2017
−
02
−
1801
:
01
:
0
1
′
<
/
s
p
a
n
>
)
−
s
t
r
t
o
t
i
m
e
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
s
t
r
i
n
g
"
>
′
n
o
w
′
<
/
s
p
a
n
>
)
;
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); time2wait</span>=strtotime(<spanclass="hljs−string">′2017−02−1801:01:01′</span>)−strtotime(<spanclass="hljs−string">′now′</span>);<spanclass="hljs−variable">isPushed = Queue::later(
t
i
m
e
2
w
a
i
t
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
time2wait,time2wait</span>,<spanclass="hljs−variable">jobHandlerClassName,
j
o
b
D
a
t
a
A
r
r
<
/
s
p
a
n
>
,
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
jobDataArr, jobDataArr</span>,<spanclass="hljs−variable">jobQueueName);
在消费者类中:
复制// 重发,即时执行$job->release();
1
2
3
// 重发,延迟 2 秒执行 $job->release(2);
// 延迟到 2017-02-18 01:01:01 时刻执行
t
i
m
e
2
w
a
i
t
<
/
s
p
a
n
>
=
s
t
r
t
o
t
i
m
e
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
s
t
r
i
n
g
"
>
′
2017
−
02
−
1801
:
01
:
0
1
′
<
/
s
p
a
n
>
)
−
s
t
r
t
o
t
i
m
e
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
s
t
r
i
n
g
"
>
′
n
o
w
′
<
/
s
p
a
n
>
)
;
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now'); time2wait</span>=strtotime(<spanclass="hljs−string">′2017−02−1801:01:01′</span>)−strtotime(<spanclass="hljs−string">′now′</span>);<spanclass="hljs−variable">job->release($time2wait);
@param
j
o
b
D
a
t
a
s
t
r
i
n
g
∣
a
r
r
a
y
∣
.
.
.
/
/
发
布
任
务
时
传
递
的
j
o
b
D
a
t
a
数
据
∗
/
<
/
s
p
a
n
>
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
k
e
y
w
o
r
d
"
>
p
u
b
l
i
c
<
/
s
p
a
n
>
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
f
u
n
c
t
i
o
n
"
>
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
k
e
y
w
o
r
d
"
>
f
u
n
c
t
i
o
n
<
/
s
p
a
n
>
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
t
i
t
l
e
"
>
f
a
i
l
e
d
<
/
s
p
a
n
>
(
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
p
a
r
a
m
s
"
>
<
s
p
a
n
c
l
a
s
s
=
"
h
l
j
s
−
v
a
r
i
a
b
l
e
"
>
jobData string|array|... //发布任务时传递的 jobData 数据 */publicfunctionfailed(jobDatastring∣array∣...//发布任务时传递的jobData数据∗/</span><spanclass="hljs−keyword">public</span><spanclass="hljs−function"><spanclass="hljs−keyword">function</span><spanclass="hljs−title">failed</span>(<spanclass="hljs−params"><spanclass="hljs−variable">jobData){ send_mail_to_somebody() ;
print(“Warning: Job failed after max retries. job data is :”.var_export($data,true).“\n”; }