C++高性能服务框架revolver:同时支持100万个的定时事件的定时器

文章来源:点击打开链接

https://github.com/yuanrongxi/revolver

在高性能的服务器程序当中,定时器是必不可少的部件,而且定时器的效率是直接影响到服务的性能。在众多的开源项目中,定时器设计都有各有各的方法,例如ACE和libEvent都采用了最小堆的算法实现,还有其他的开源项目采用平衡二叉树来做定时的器管理算法。不管是最小堆还是平衡二叉树,其定时器扫描都是O(1),但定时器插入和删除都是O(logN)的复杂度。在定时事件少的情况下,这种算法是足够的,如果超过上百万的定时事件,效率会成为瓶颈。所以revolver在定时器的实现上并没有使用通用的平衡二叉树和最小堆,而是采用了轮转HASH算法来做定时器管理。

什么是轮转HASH算法?轮转HASH是通过4个时间轮的转动来触发定时事件,就像时钟的秒针轮、分针轮、时针轮之间的关系一样。如图:


每个轮的有256个刻度,4个轮刚好是一个uint32_t整型数。最小轮的一刻度表示一个reactor的event loop时间(5ms)

1.定时器的扫描

1)获取当前的系统时间cur_ts,判断与上次扫描时刻prev_ts之间差距超过最小刻度,计算需要转动最小刻度的数量
scale = (cur_ts - prev_ts) / loop_delay;
2) 进行第一轮的轮转,转动scale个刻度,所有转动的刻度中的定时事件全部进行触发。如果转动后没有超过最大刻度256,轮转结束。
3)如果超过最大刻度256,那么从它的父轮(第二轮)上轮转1刻度。将父轮上轮转的刻度所有的定时事件全部按时间映射到子轮上。相当于重复2步。这是个迭代过程,如果父轮到了256,那么继续轮转父轮的父轮。直到到达第四轮转动为止。这和秒钟走一圈就是分针走一个刻度的原理是一样的。
代码如下:
[cpp]  view plain  copy
  1. template<class HANDLER, class FUNCTOR, class LOCK>  
  2. uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::expire()  
  3. {  
  4.     BASE_GUARD_RETURN(LOCK, cf_mon, mutex_, 0);  
  5.     uint32_t ret = SELECT_DELAY;    //默认20MS  
  6.   
  7.     CBaseTimeValue cur_timer = CBaseTimeValue::get_time_value();  
  8.       
  9.     if(cur_timer > prev_time_)  
  10.     {  
  11.         uint32_t scale = static_cast<uint32_t>((cur_timer.msec() - prev_time_.msec()) / SELECT_DELAY);  
  12.         if(scale > 0)  
  13.         {  
  14.             ret = revolver(scale);  
  15.             prev_time_ = cur_timer;  
  16.         }  
  17.     }  
  18.   
  19.     return ret;  
  20. }  
  21.   
  22. template<class HANDLER, class FUNCTOR, class LOCK>  
  23. uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::revolver(uint32_t scale)  
  24. {  
  25.     //std::cout << "pos, first = " << rings_[0].get_pos() << ", second = " << rings_[1].get_pos()  
  26.     //  << ", third = " << rings_[2].get_pos() << ", fourth = " << rings_[3].get_pos() <<std::endl;  
  27.   
  28.     uint32_t ret = SELECT_DELAY;  
  29.   
  30.     uint8_t index = 0;  
  31.     uint32_t rewind_scale = scale;  
  32.     while(rewind_scale > 0)  
  33.     {  
  34.         index = 0;  
  35.         if(rings_[index].cycle(rewind_scale, this)) //扫描第一轮  
  36.         {  
  37.             index ++;  
  38.             uint32_t sc = 1;  
  39.             while(rings_[index].cycle(sc, this))//扫描下一轮,刻度只往前推进1格  
  40.             {  
  41.                 sc = 1;  
  42.                 index ++;  
  43.                 if(index >= RINGS_SIZE)  
  44.                 {  
  45.                     start_time_ = CBaseTimeValue::get_time_value();  
  46.                     break;  
  47.                 }  
  48.             }  
  49.         }  
  50.     }  
  51.   
  52.     return ret;  
  53. }  

2.定时事件的插入

1)首先计算需触发的时刻与定时器轮当前时刻的距离
d = (cur_ts - start_ts + delay) / loop_delay;
2)分别计算在4个轮的位置
first = (uint8_t)(timeout_stamp_ / FIRST_ROUND);
second = (uint8_t)((timeout_stamp_ % FIRST_ROUND) / SECOND_ROUND);
third =  (uint8_t)((timeout_stamp_ % SECOND_ROUND) / THIRD_ROUND);
fourth = (uint8_t) (timeout_stamp_ % THIRD_ROUND);
3)通过计算得到的为止,保存到对应的轮刻度上,插入就完毕了。例子,如果计算的first = 0, second = 2, third = 30, fouth = 1,就会保存到第3轮的第2刻度上。

代码如下:
[cpp]  view plain  copy
  1. template<class HANDLER, class FUNCTOR, class LOCK>  
  2. uint32_t CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::schedule(HANDLER handler, const void *act, uint32_t delay, uint32_t interval)  
  3. {  
  4.     BASE_GUARD_RETURN(LOCK, cf_mon, mutex_, 0);  
  5.   
  6.     BaseTimerNode_T<HANDLER>* timer_obj = node_pool_.pop_obj();  
  7.     if(timer_obj != NULL)  
  8.     {  
  9.         uint32_t timer_id = get_free_node();  
  10.         CBaseTimeValue cur_timer = CBaseTimeValue::get_time_value();  
  11.         //计算距离  
  12.         uint64_t distance = delay / SELECT_DELAY; //直接以当前时间作为坐标,相差一个扫描间隔20MS  
  13.         if(cur_timer > start_time_)  
  14.             distance = (cur_timer.msec() - start_time_.msec() + delay) / SELECT_DELAY;  
  15.         distance = distance % (UNINT32_MAX);  
  16.         timer_obj->set(handler, act, (uint32_t)(core_max(distance, 1)), interval, timer_id);  
  17.         heap_[timer_id] = timer_obj;  
  18.   
  19.         used_num_ ++;  
  20.         //插入事件  
  21.         insert_node(timer_obj);  
  22.         upcall_functor().registration(timer_obj->get_handler(), timer_id);  
  23.         return timer_id;  
  24.     }  
  25.     return 0;  
  26. }  
  27.   
  28. template<class HANDLER, class FUNCTOR, class LOCK>  
  29. void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::insert_node(BaseTimerNode_T<HANDLER>* node)  
  30. {  
  31.     uint32_t timer_id = node->get_timer_id();  
  32.   
  33.     uint8_t poss[RINGS_SIZE] = {0};  
  34.     //获取位置  
  35.     node->get_revolver_pos(poss[RINGS_SIZE - 1], poss[RINGS_SIZE - 2], poss[RINGS_SIZE - 3], poss[RINGS_SIZE - 4]);  
  36.     uint8_t index = RINGS_SIZE - 1;  
  37.     //进行插入  
  38.     while(!rings_[index].add_element(poss[index], timer_id))  
  39.     {  
  40.         if(index == 0)  
  41.             break ;  
  42.   
  43.         index --;  
  44.     }  
  45. }  

3.定时事件的删除

删除和插入计算差不多步骤
1)首先计算需要删除定时事件触发的时刻与定时器轮当前时刻的距离
d = (cur_ts - start_ts + delay) / loop_delay;

2)分别计算在4个轮的位置
first = (uint8_t)(timeout_stamp_ / FIRST_ROUND);
second = (uint8_t)((timeout_stamp_ % FIRST_ROUND) / SECOND_ROUND);
third =  (uint8_t)((timeout_stamp_ % SECOND_ROUND) / THIRD_ROUND);
fourth = (uint8_t) (timeout_stamp_ % THIRD_ROUND);
3)根据位置坐标找到对应的轮位置删除轮上的定时事件

代码:

[cpp]  view plain  copy
  1. template<class HANDLER, class FUNCTOR, class LOCK>  
  2. void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::cancel_timer(uint32_t timer_id, const void **act)  
  3. {  
  4.     BASE_GUARD(LOCK, cf_mon, mutex_);  
  5.     if(timer_id < heap_size_ && heap_[timer_id] != NULL)  
  6.     {  
  7.         //查找对应的定时事件内容  
  8.         BaseTimerNode_T<HANDLER>* timer_obj = heap_[timer_id];  
  9.         //删除轮上的定时事件  
  10.         delete_node(timer_obj);  
  11.   
  12.         heap_[timer_id] = NULL;  
  13.         if(used_num_ > 0)  
  14.             used_num_ --;  
  15.   
  16.         freeTimers_.push_back(timer_id);  
  17.         *act = timer_obj->get_act();  
  18.         upcall_functor().cancel_timer(timer_obj->get_handler(), timer_id);  
  19.   
  20.         node_pool_.push_obj(timer_obj);  
  21.     }  
  22. }  
  23.   
  24. template<class HANDLER, class FUNCTOR, class LOCK>  
  25. void CTimerQueue_T<HANDLER, FUNCTOR, LOCK>::delete_node(BaseTimerNode_T<HANDLER>* node)  
  26. {  
  27.     uint32_t timer_id = node->get_timer_id();  
  28.     uint8_t poss[RINGS_SIZE] = {0};  
  29.     node->get_revolver_pos(poss[RINGS_SIZE - 1], poss[RINGS_SIZE - 2], poss[RINGS_SIZE - 3], poss[RINGS_SIZE - 4]);  
  30.     //删除掉对应的定时事件  
  31.     for(uint8_t index = 0; index < RINGS_SIZE; index ++) //在每个轮上进行删除  
  32.     {  
  33.         rings_[index].delete_element(poss[index], timer_id);  
  34.     }  
  35. }  


4.测试

在revovler的test工程中的main()函数中将test_timer_queue注释去掉,就可以进行测试。以下是test_timer_queue的代码:
[cpp]  view plain  copy
  1. void test_timer_queue()  
  2. {  
  3.     srand(time(NULL));  
  4.   
  5.     CTimerFunctor functor;  
  6.     TIMEQUEUE  timer_queue(&functor);  
  7.     CTest_Event_Handler handler;  
  8.   
  9.     handler.tq_ = &timer_queue;  
  10.   
  11.     CBaseTimeValue  begin_timer = CBaseTimeValue::get_time_value();  
  12.     for(int i = 0; i < 1000000; i ++)  
  13.     {  
  14.         insert_timer(&handler, (rand() % 240) * 1000, timer_queue);  
  15.     }  
  16.     CBaseTimeValue stop_timer = CBaseTimeValue::get_time_value();  
  17.     stop_timer = stop_timer - begin_timer;  
  18.     std::cout << "insert 1000000 timer, delay = " << stop_timer.msec() << " MS" << std::endl;  
  19.   
  20.     g_ts = stop_timer.get_time_value().msec();  
  21. #if _DEBUG  
  22.     //timer_queue.set_ring_id();  
  23. #endif  
  24.     std::cout << "exprie ......" << std::endl;  
  25.     while(1)  
  26.     {  
  27.         uint32_t ms = timer_queue.expire();  
  28.         usleep((1000));  
  29.     }  
  30. }  
这个函数可以测试插入100万个定时事件的耗时多少,在100个定时事件在定时器管理的时候,CPU和内存都可以进行相对应的监控和查看。我在window 7下面的release版本的信息如下:
从上图看,插入100万个定时事件耗时978MS,也就是说,当有几十万个定时事件在运行的时候,插入一个定时事件只需要0.97微秒。


以下是100万个定时事件在处理过程中的CPU和内存占用图。



5.总结

定时器的实现从效率和功能上实现都达到了最初设想的效率,但是内存使用上稍微过高(120M),如果用C的代码实现,对内存做严格的控制和内存管理,应该会好很多,以后优化的工作应该重点在这。如果需要完整的代码,请到: https://github.com/yuanrongxi/revolver 下载。

猜你喜欢

转载自blog.csdn.net/libaineu2004/article/details/80569493