目录
workerman中的定时器Timer是个非常好用的功能,今天来研究下怎么实现的。我发现其实在wokerman里面应该有2中类型的Timer,而官方文档中只提到了一种。
初始化
我们先看下Timer::init()在项目中用到的地方。主要区别是调用的地方和初始化的数据。
位置 |
主进程 |
子进程 |
参数 |
空 |
static::$globalEvent |
public static function init($event = null)
{
if ($event) {
self::$_event = $event;
return;
}
if (\function_exists('pcntl_signal')) {
\pcntl_signal(\SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
}
}
主进程的Timer
从源码可以看出,这个Timer 基于进程alarm闹钟信号的定时器。为SIGALRM信号安装了一个信号处理器\Workerman\Lib\Timer::signalHandle。
\Workerman\Lib\Timer::signalHandle
作为信号处理器,每次收到信号SIGALRM,都会执行一次。
public static function signalHandle()
{
if (!self::$_event) {
\pcntl_alarm(1);
self::tick();
}
}
\Workerman\Lib\Timer::tick();
public static function tick()
{
if (empty(self::$_tasks)) {
\pcntl_alarm(0);
return;
}
$time_now = \time();
foreach (self::$_tasks as $run_time => $task_data) {
if ($time_now >= $run_time) {
foreach ($task_data as $index => $one_task) {
$task_func = $one_task[0];
$task_args = $one_task[1];
$persistent = $one_task[2];
$time_interval = $one_task[3];
try {
\call_user_func_array($task_func, $task_args);
} catch (\Exception $e) {
Worker::safeEcho($e);
}
if($persistent && !empty(self::$_status[$index])) {
$new_run_time = \time() + $time_interval;
if(!isset(self::$_tasks[$new_run_time])) self::$_tasks[$new_run_time] = array();
self::$_tasks[$new_run_time][$index] = array($task_func, (array)$task_args, $persistent, $time_interval);
}
}
unset(self::$_tasks[$run_time]);
}
}
}
monitorWorkersForLinux
最后循环监听进程信号
protected static function monitorWorkersForLinux()
{
static::$_status = static::STATUS_RUNNING;
while (1) {
\pcntl_signal_dispatch();
$status = 0;
$pid = \pcntl_wait($status, \WUNTRACED);
\pcntl_signal_dispatch();
}
}
示例
<?php
use \Workerman\Worker;
require './vendor/autoload.php';
$worker = new Worker("tcp://0.0.0.0:2000");
$worker->count = 1;
\Workerman\Timer::add(1,function(){
echo time().PHP_EOL;
});
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
运行结果
1636793532
1636793533
1636793534
1636793535
总结
主进程定时器的运行流程如下:
- 初始化定时器,设定信号处理器
- Timer::add 时,设定闹钟并触发检查
- 没有任务时移除闹钟
- 有任务时每秒检查符合要求的任务并执行,完成后视情况添加重复任务
- Timer清除时,清除闹钟
注意的:
- 由于基于信号,时间精度只到秒
- 下次任务时间是基于上次任务的结束时间
- 耗时的定时任务会阻塞进程阻塞
- 该类型只在linux上的主进程上可以使用
尴尬
这个类型的Timer只适用于非守护进程模式,也就是只在debug模式下生效。原因是,主进程会被退出了,详情可见【读源码】workerman中如何实现守护进程
子进程的Timer
子进程的Timer机制时基于even类型,如:libeven,event,swoole或者react,默认的是:\Workerman\Events\Select。此处以它为例。### win没有多进程
在win中workerman是以单进程形式运行,在第二次初始化Timer时,其实时覆盖了主进程的Timer。也就是说,在win中,Timer只有一个类型。
我们以linux为例。
子进程中的Timer都是在被fork后进行初始化,并且会出入参数 static::$globalEvent
static::$globalEvent
if (!static::$globalEvent) {
$event_loop_class = static::getEventLoopName();
static::$globalEvent = new $event_loop_class;
$this->resumeAccept();
}
Timer::init(static::$globalEvent);
getEventLoopName()
protected static $_availableEventLoops = array(
'event' => '\Workerman\Events\Event',
'libevent' => '\Workerman\Events\Libevent'
);
protected static function getEventLoopName()
{
if (static::$eventLoopClass) {
return static::$eventLoopClass;
}
if (!\class_exists('\Swoole\Event', false)) {
unset(static::$_availableEventLoops['swoole']);
}
$loop_name = '';
foreach (static::$_availableEventLoops as $name=>$class) {
if (\extension_loaded($name)) {
$loop_name = $name;
break;
}
}
if ($loop_name) {
if (\interface_exists('\React\EventLoop\LoopInterface')) {
switch ($loop_name) {
case 'libevent':
static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop';
break;
case 'event':
static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop';
break;
default :
static::$eventLoopClass = '\Workerman\Events\React\StreamSelectLoop';
break;
}
} else {
static::$eventLoopClass = static::$_availableEventLoops[$loop_name];
}
} else {
static::$eventLoopClass = \interface_exists('\React\EventLoop\LoopInterface') ? '\Workerman\Events\React\StreamSelectLoop' : '\Workerman\Events\Select';
}
return static::$eventLoopClass;
}
可以看出static::$globalEvent的优先级如下:
- event
- \Workerman\Events\React\ExtEventLoop
- \Workerman\Events\Event
- libevent
- \Workerman\Events\React\ExtLibEventLoop
- \Workerman\Events\Libevent
- \Workerman\Events\React\StreamSelectLoop
- \Workerman\Events\Select
\Workerman\Events\Select
终于到我们今天的主角了,我们看看这里的Timer是怎么实现的
Timer转发
可以看出Timer中的命令都被转发到了 static::$globalEvent
<?php
if (self::$_event) {
return self::$_event->add($time_interval,
$persistent ? EventInterface::EV_TIMER : EventInterface::EV_TIMER_ONCE, $func, $args);
}
if (self::$_event) {
return self::$_event->del($timer_id, EventInterface::EV_TIMER);
}
if (self::$_event) {
self::$_event->clearAllTimer();
}
Select 之 SplPriorityQueue
在\Workerman\Events\Select初始化时,同时会初始化一个SplPriorityQueue,用于存放定时任务。
SplPriorityQueue说php中内置的优先级队列,以Heap数据结构实现,默认为MaxHeap模式,即priority越大越优先出队。
public function __construct()
{
$this->_scheduler = new \SplPriorityQueue();
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
}
add
public function add($fd, $flag, $func, $args = array())
{
switch ($flag) {
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$timer_id = $this->_timerId++;
$run_time = \microtime(true) + $fd;
$this->_scheduler->insert($timer_id, -$run_time);
$this->_eventTimer[$timer_id] = array($func, (array)$args, $flag, $fd);
$select_timeout = ($run_time - \microtime(true)) * 1000000;
if( $this->_selectTimeout > $select_timeout ){
$this->_selectTimeout = $select_timeout;
}
return $timer_id;
}
return true;
}
del
public function del($fd, $flag)
{
$fd_key = (int)$fd;
switch ($flag) {
case self::EV_TIMER:
case self::EV_TIMER_ONCE;
unset($this->_eventTimer[$fd_key]);
return true;
}
return false;
}
delAll=>clearAllTimer
public function clearAllTimer()
{
//直接重新初始化的队列和存放定时器回调数组
$this->_scheduler = new \SplPriorityQueue();
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
$this->_eventTimer = array();
}
event::tick
protected function tick()
{
while (!$this->_scheduler->isEmpty()) {
$scheduler_data = $this->_scheduler->top();
$timer_id = $scheduler_data['data'];
$next_run_time = -$scheduler_data['priority'];
$time_now = \microtime(true);
$this->_selectTimeout = ($next_run_time - $time_now) * 1000000;
if ($this->_selectTimeout <= 0) {
$this->_scheduler->extract();
if (!isset($this->_eventTimer[$timer_id])) {
continue;
}
$task_data = $this->_eventTimer[$timer_id];
if ($task_data[2] === self::EV_TIMER) {
$next_run_time = $time_now + $task_data[3];
$this->_scheduler->insert($timer_id, -$next_run_time);
}
\call_user_func_array($task_data[0], $task_data[1]);
if (isset($this->_eventTimer[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE) {
$this->del($timer_id, self::EV_TIMER_ONCE);
}
continue;
}
return;
}
$this->_selectTimeout = 100000000;
}
event::loop
相比于主进程Timer的闹钟信号,子进程的通过event->loop循环进行处理。每个子进程最后都是进入一个循环,在这个循环中监听网络事件和定时任务。正如文档中的:如果任务很重(特别是涉及到网络IO的任务),可能会导致该进程阻塞,暂时无法处理其它业务。
public function loop()
{
while (1) {
if(\DIRECTORY_SEPARATOR === '/') {
\pcntl_signal_dispatch();
}
$read = $this->_readFds;
$write = $this->_writeFds;
$except = $this->_exceptFds;
if ($read || $write || $except) {
try {
$ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout);
} catch (\Exception $e) {} catch (\Error $e) {}
} else {
usleep($this->_selectTimeout);
$ret = false;
}
if (!$this->_scheduler->isEmpty()) {
$this->tick();
}
if (!$ret) {
continue;
}
}
}
总结
- Timer在\Workerman\Events\Select中的实现主基于SplPriorityQueue优先队列
- Timer在循环中并无其他限制,所以可以精确到毫秒
- Timer没错执行都是在网络事件的回调之前,也许可以做点事情
其他的在文档中已经写得很详细了,可以移步Timer定时器类查看
函数参考
pcntl_signal
pcntl_signal(int $signo, callback $handler, bool $restart_syscalls = true): bool
函数pcntl_signal()为signo指定的信号安装一个新 的信号处理器。
- 参数
-
signo
信号编号。
-
handler
信号处理器可以是用户创建的函数或方法的名字,也可以是系统常量 SIG_IGN(译注:忽略信号处理程序)或SIG_DFL(默认信号处理程序).
注意:
注意当你使用一个对象方法的时候,该对象的引用计数回增加使得它在你改变为其他处理或脚本结束之前是持久存在的。
-
restart_syscalls
指定当信号到达时系统调用重启是否可用。(译注:经查资料,此参数意为系统调用被信号打断时,系统调用是否从 开始处重新开始,但根据http://bugs.php.net/bug.php?id=52121,此参数存在bug无效。)
- 返回值
成功时返回 true, 或者在失败时返回 false。
pcntl_alarm
pcntl_alarm(int $seconds): int
创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号。每次对 pcntl_alarm()的调用都会取消之前设置的alarm信号。
- 参数
- seconds
等待的秒数。如果seconds设置为0,将不会创建alarm信号。
- 返回
返回上次alarm调度(离alarm信号发送)剩余的秒数,或者之前没有alarm调度(译注:或者之前调度已完成) 时返回0。
参考
- phptimer
项目介绍:phptimer现在是一个定时器项目,参考了workerman里面的源码,剥离了网络和多进程,保留了毫秒定时器,基于stream_select和event的阻塞超时实现。