定时器是服务器中一个很重要的部件,比如定时对连接的进行健康检测,在设计协程的sleep API时也需要定时器的介入,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。为此,定时器的设计值得我们去深入学习,本文将浅析定时器的设计及其在不同数据结构下的表现。
话不多说,直接上代码,从源码开始分析,这是我的一个服务器框架的定时器。
class TimeManager; // 定时器管理器
class Timer : public std::enable_shared_from_this{ // 定时器
friend class TimeManager;
public:
using ptr = std::shared_ptr;
bool cancel();
bool refresh();
bool reset(uint64_t ms, bool from_now);
private:
Timer(uint64_t ms, std::function cb, bool recurring, TimeManager* timeManager);
Timer(uint64_t next);
private:
uint64_t m_ms = 0; //执行周期
uint64_t m_next = 0; //精确的执行时间
std::function m_cb; //定时器回调
bool m_recurring = false; //是否循环
TimeManager* m_manager = nullptr;
private:
struct Compare{
bool operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const;
};
};
定时器其实并不只指定时器,其实还包括定时器容器(TimeManager
),定时器容器的设计正式定时器的精妙所在。
从上面的代码我们可以看到,定时器有五个私有变量,分别是:
m_ms
: 定时器的执行周期m_next
: 定时器事件还有多久时间就会执行m_cb
: 定时器回调函数m_manager
: 定时器到达时间执行事件后是否继续定时还有定义了私有类Compare
是供TimeManager
调用的,后文再说它的设计。
三个公有接口:
cancel()
: 取消定时器事件refresh()
: 重新计算时间reset()
: 重新设置定时器的执行周期,可以选择是否重新计时这里有一个细节,就是构造函数是私有的,切TimeManager
被设置成友元类,所以Timer
只能通过TimeManager
创建
下面是公有接口的实现代码:
bool Timer::cancel() {
TimeManager::MutexType::Lock lock(m_manager->m_mutex);
if(m_cb){
auto it = m_manager->m_timers.find(shared_from_this());
m_manager->m_timers.erase(it);
return true;
}
return false;
}
m_manager->mutex
是TimeManager
用来保护计时器容器的互斥量,保证线程安全。如果有回调函数的话,则从TimeManager的容器中删除掉本身。
bool Timer::refresh() {
TimeManager::MutexType::Lock lock(m_manager->m_mutex);
if(!m_cb){
return false;
}
auto it = m_manager->m_timers.find(shared_from_this());
if(it == m_manager->m_timers.end()){
return false;
}
m_manager->m_timers.erase(it);
m_next = GetCurrentMS() + m_ms;
m_manager->m_timers.insert(shared_from_this());
return true;
}
先加锁对TimeManager的容器进行保护,如果没有回调函数,直接返回,否则在容器中寻找本身,找到后先从容器中删除,更新m_ms后重新加入。
bool Timer::reset(uint64_t ms, bool now_time) {
if(ms == m_ms && now_time == false){
return true;
}
TimeManager::MutexType::Lock lock(m_manager->m_mutex);
if(!m_cb){
return false;
}
auto it = m_manager->m_timers.find(shared_from_this());
if(it == m_manager->m_timers.end()){
return false;
}
m_manager->m_timers.erase(it);
uint64_t start = 0;
if(now_time) {
start = acid::GetCurrentMS();
} else {
start = m_next - m_ms;
}
m_ms = ms;
m_next = start + m_ms;
m_manager->addTimer(shared_from_this(), lock);
return true;
}
class TimeManager{
friend class Timer;
public:
using MutexType = Mutex;
TimeManager();
virtual ~TimeManager();
Timer::ptr addTimer(uint64_t ms, std::function cb, bool recurring = false);
Timer::ptr addConditionTimer(uint64_t ms, std::function cb,
std::weak_ptr cond,
bool recurring = false);
uint64_t getNextTimer();
void getExpiredCallbacks(std::vector>& cbs);
bool hasTimer();
protected:
virtual void onInsertAtFront() = 0;
void addTimer(Timer::ptr timer, MutexType::Lock & lock);
private:
MutexType m_mutex;
std::set m_timers;
bool m_tickled = false;
};
TimeManager是定时器的核心部分,管理者所有的容器。
TimeManager有三个私有成员,分别是:
m_mutex
: 互斥量,用于保护m_timers
,使线程安全。m_timers
: 存储所有定时器的集合,底层是红黑树,比较函数是Timer::Compare
m_tickled
TimeManager有两个procted函数,这是因为在我的项目中TimeManager是IOManager的基类之一,这里可以把他看成公有接口。我们看下几个接口的实现
Timer::ptr TimeManager::addTimer(uint64_t ms, std::function cb, bool recurring) {
Timer::ptr timer(new Timer(ms, cb, recurring, this));
MutexType::Lock lock(m_mutex);
addTimer(timer, lock);
return timer;
}
void TimeManager::addTimer(Timer::ptr timer, MutexType::Lock& lock) {
auto it = m_timers.insert(timer).first;
bool at_front = (it == m_timers.begin()) && !m_tickled;
if(at_front) {
m_tickled = true;
}
lock.unlock();
if(at_front) {
onInsertAtFront();
}
}
第一个addTimer
是公有接口外部可以调用,封装完会调用内部的addTimer
,也就是第二个,如果插在第一个位置要调用IOManager
来检测下一次到时时间。
Timer::ptr
TimeManager::addConditionTimer(uint64_t ms, std::function cb, std::weak_ptr cond, bool recurring) {
return addTimer(ms,[cb, cond](){
std::shared_ptr tmp = cond.lock();
if(tmp){
cb();
}
}, recurring);
}
这里说下cond为什么是weak_ptr,以connect_with_timeout(addConditionTimer的一个调用,负责设置有时间限制的connect)为例
std::shared_ptr tinfo(1);
std::weak_ptr winfo(tinfo);
if(timeout_ms != (uint64_t)-1) {
timer = iom->addConditionTimer(timeout_ms, [winfo, fd, iom]() {
auto t = winfo.lock();
if(!t || t->cancelled) {
return;
}
t->cancelled = ETIMEDOUT;
iom->cancelEvent(fd, sylar::IOManager::WRITE);
}, winfo);
}
本地条件变量是tinfo(一个指向int的智能指针),添加条件计时器是新建了一个与tinfo关联的的weak_ptr,此时有两种情况
uint64_t TimeManager::getNextTimer() {
MutexType::Lock lock(m_mutex);
if(m_timers.empty()){
return ~0ull;
}
m_tickled = false;
Timer::ptr timer = *m_timers.begin();
uint64_t now = GetCurrentMS();
if(now >= timer->m_next){
return 0;
} else {
return timer->m_next - now;
}
}
获取第一个超时定时器,直接取得最近一个定时器即*m_timers.begin()
,然后判断是否到时即可。
void TimeManager::getExpiredCallbacks(std::vector>& cbs) {
uint64_t now = GetCurrentMS();
std::vector expired;
MutexType::Lock lock(m_mutex);
if(m_timers.empty() ) {
return;
}
Timer::ptr timer(new Timer(now));
auto it = m_timers.upper_bound(timer);
expired.insert(expired.begin(), m_timers.begin(), it);
m_timers.erase(m_timers.begin(), it);
//lock.unlock();
cbs.reserve(expired.size());
for(auto &i : expired){
cbs.push_back(i->m_cb);
if(i->m_recurring){
i->m_next = now + i->m_ms;
m_timers.insert(i);
} else {
i->m_cb = nullptr;
}
}
}
getExpiredCallbacks是获取所有到期定时器的回调函数,我们可以看到调用的是set的upper_bound,可以在O(logn)的时间内完成查找
到所有到时计时器。
前面介绍的定时器是基于C++STL中的set(红黑树)来存储的,Timer的存储还有很多方式,比如升序链表就是其中之一。
class Timer {
public:
using callback = function;
util_timer():prev(NULL),next(NULL){}
public:
uint64_t expired; /*任务的超时时间,这里使用绝对时间*/
callback m_cb; /*任务回调函数*/
Timer* prev; /*指向前一个定时器*/
Timer* next; /*指向下一个定时器*/
};
很正常的设计,因为是基于侵入式链表管理的,所以多加了两个Timer*指针。因为它是有序的,所以每次插入时都要寻找合适的位置,时间复杂度是O(n)。但链表也有链表的优势,就是插入(插入节点)和删除节点很快,比set(红黑树快很多)。
在一个时间轮内,(实线)指针指向轮子上的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动称为一个滴答(tick)。一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心搏时间。该时间轮共有N个槽,因此它运转一周的时间是Nsi。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差Nsi的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插入槽ts(timer slot)对应的链表中。
这其实就是在排序链表的基础上,将Timer散列化,通过消费额外的空间来记录更多的信息争取更少的时间。
class Timer{
public:
using callback = function;
Timer(int rot,int ts)
:next(nullptr), prev(nullptr), rotation(rot), time_slot(ts){}
public:
int rotation;/*记录定时器在时间轮转多少圈后生效*/
int time_slot;/*记录定时器属于时间轮上哪个槽(对应的链表,下同)*/
callback cb;
Timer* next;/*指向下一个定时器*/
Timer* prev;/*指向前一个定时器*/
};
对时间轮而言,添加一个定时器的时间复杂度是O(1),删除一个定时器的时间复杂度也是O(1),执行一个定时器的时间复杂度是O(n)。但实际上执行一个定时器任务的效率要比O(n)好得多,因为时间轮将所有的定时器散列到了不同的链表上。时间轮的槽越多,等价于散列表的入口(entry)越多,从而每条链表上的定时器数量越少。此外,我们的代码仅使用了一个时间轮。当使用多个轮子来实现时间轮时,执行一个定时器任务的时间复杂度将接近O(1)。
时间堆的基本原理是: 将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔。这样,一旦心搏函数tick被调用,超时时间最小
的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再次从剩余的定时器中找出超时时间最小的一个,并将这段最小时间设置为下一次心搏间隔。如此反复,就实现了较为精确的定时。
这和上文项目详解的计时器很像,都是以超时时间最小的一个定时器的超时值作为心搏间隔,但我项目里用的是红黑树存储,其实还可以用最小堆存储。
class Timer {
public:
using callback = function;
Timer(int delay) {
expire = time(NULL) + delay;
}
public:
time_t expire;/*定时器生效的绝对时间*/
void(*cb_func)(client_data*);/*定时器的回调函数*/
callback cb;/*用户数据*/
};
对时间堆而言,添加一个定时器的时间复杂度是O(lgn),删除一个定时器的时间复杂度是O(1),执行一个定时器的时间复杂度是O(1)。因此,时间堆的效率是很高的。
计时器的设计其实和排序算法很像,不同的数据结构带来不同的收益,空间和时间之间的权衡,这也正是数据结构的美妙所在。