协程应用实例:计时器过期事件响应
序
早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。
1. 计时调度中心
计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)
2.基于协程的事件处理
通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。
协程返回值类型 promise_type,用于和协程交互
class co_call
{
public:
class promise_type
{
public:
using value_type = size_t;
public:
promise_type &get_return_object()
{
return *this;
}
auto initial_suspend()
{
return std::experimental::suspend_always{};
}
auto final_suspend()
{
// suspend it to save the coroutine handle
return std::experimental::suspend_always{};
}
void yield_value(value_type _Value)
{
_CurrentValue = _Value;
}
auto return_value(value_type _Value)
{
_CurrentValue = _Value;
return std::experimental::suspend_always{}; // dont suspend it
}
value_type operator *(void) const noexcept
{
return _CurrentValue;
}
public:
value_type _CurrentValue;
};
using value_type = promise_type::value_type;
public:
explicit co_call(promise_type &_Prom)
: _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),
_Value(*_Prom)
{
}
co_call() = default;
co_call(co_call const &) = delete;
co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value)
{
_Right._Coro = nullptr;
_Right._Value = 0;
}
~co_call()
{
if (_Coro) {
_Coro.destroy();
}
}
public:
_NODISCARD value_type resume()
{
if (_Coro) {
_Coro.resume();
_Value = *_Coro.promise();
if (_Coro.done() || (_Value == 0))
{
_Coro.destroy();
_Coro = 0;
_Value = 0;
return _Value;
}
}
return _Value;
}
public:
co_call &operator=(co_call const &) = delete;
co_call &operator=(co_call &&_Right)
{
if (this != _STD addressof(_Right)) {
_Coro = _Right._Coro;
_Right._Coro = nullptr;
_Value = _Right._Value;
_Right._Value = 0;
}
return *this;
}
operator bool(void)
{
return (_Coro != 0);
}
private:
::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;
value_type _Value = 0;
};
计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0
class _XTimer
{
public:
public:
_XTimer()
:task(0),
id(0),
handle()
{
}
_XTimer(int c,int i)
:task(c),
id(i),
handle()
{
}
~_XTimer()
{
}
public:
co_call on_event(size_t now)
{
std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;
// 挂起一段时间,让出时间片,执行其他COROUTINE
_COT_WAIT(task);
std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " << std::endl;
_COT_NORET();
}
public:
int task;
int expires;
int id;
co_call handle;
};
下面是全部代码
#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>
using namespace std;
#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)
class co_call
{
public:
class promise_type
{
public:
using value_type = size_t;
public:
promise_type &get_return_object()
{
return *this;
}
auto initial_suspend()
{
return std::experimental::suspend_always{};
}
auto final_suspend()
{
// suspend it to save the coroutine handle
return std::experimental::suspend_always{};
}
void yield_value(value_type _Value)
{
_CurrentValue = _Value;
}
auto return_value(value_type _Value)
{
_CurrentValue = _Value;
return std::experimental::suspend_always{}; // dont suspend it
}
value_type operator *(void) const noexcept
{
return _CurrentValue;
}
public:
value_type _CurrentValue;
};
using value_type = promise_type::value_type;
public:
explicit co_call(promise_type &_Prom)
: _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),
_Value(*_Prom)
{
}
co_call() = default;
co_call(co_call const &) = delete;
co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value)
{
_Right._Coro = nullptr;
_Right._Value = 0;
}
~co_call()
{
if (_Coro) {
_Coro.destroy();
}
}
public:
_NODISCARD value_type resume()
{
if (_Coro) {
_Coro.resume();
_Value = *_Coro.promise();
if (_Coro.done() || (_Value == 0))
{
_Coro.destroy();
_Coro = 0;
_Value = 0;
return _Value;
}
}
return _Value;
}
public:
co_call &operator=(co_call const &) = delete;
co_call &operator=(co_call &&_Right)
{
if (this != _STD addressof(_Right)) {
_Coro = _Right._Coro;
_Right._Coro = nullptr;
_Value = _Right._Value;
_Right._Value = 0;
}
return *this;
}
operator bool(void)
{
return (_Coro != 0);
}
private:
::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;
value_type _Value = 0;
};
class _XTimer
{
public:
public:
_XTimer()
:task(0),
id(0),
handle()
{
}
_XTimer(int c,int i)
:task(c),
id(i),
handle()
{
}
~_XTimer()
{
}
public:
co_call on_event(size_t now)
{
std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;
// 挂起一段时间,让出时间片,执行其他COROUTINE
_COT_WAIT(task);
std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " << std::endl;
_COT_NORET();
}
public:
int task;
int expires;
int id;
co_call handle;
};
class _XTimeDisp
{
public:
using _Cont_ty = ::std::list<class _XTimer*>;
static constexpr int _TVN_BITS = 4;
static constexpr int _TVR_BITS = 6;
static constexpr int _TVN_SIZE = 1 << _TVN_BITS;
static constexpr int _TVR_SIZE = 1 << _TVR_BITS;
static constexpr int _TVN_MASK = _TVN_SIZE - 1;
static constexpr int _TVR_MASK = _TVR_SIZE - 1;
public:
_XTimeDisp()
:_tq_mutex(),
tvec(),
_sign_mutex(),
_run_thread(),
_sign(),
_thread_state(0)
{
}
~_XTimeDisp()
{
}
public:
static constexpr int _INDEX(int expires, int n)
{
return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);
}
static constexpr int _OFFSET(int n)
{
return (_TVR_SIZE + n*_TVN_SIZE);
}
public:
void add(_XTimer* timer)
{
unsigned long long expires = timer->expires;
unsigned long long index = expires - _Check_time;
unsigned int _VecIndex = 0;
if (index < _TVR_SIZE) // tvec_1
{
tvec_1[expires & _TVR_MASK].push_back(timer);
}
else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS))) // tvec_2
{
tvec_2[_INDEX(expires, 0)].push_back(timer);
}
else if (index <= 0) // 异常处理,视为即将到期的计时器
{
tvec_1[_Check_time & _TVR_MASK].push_back(timer);
}
else
{
if (index > 0xFFFFFFFFUL)
{
index = 0xFFFFFFFFUL;
expires = index + _Check_time;
}
tvec_1[_INDEX(expires, 1)].push_back(timer);
}
}
int cascade(int offset, int index)
{
::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);
_Cont_ty& list = tvec[offset + index];
_Cont_ty empty;
::std::swap(empty, list);
for (auto it = empty.begin(); it != empty.end(); ++it)
{
this->add(*it);
}
return index;
}
void tick(size_t _Now)
{
::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);
while (_Check_time <= _Now)
{
int index = _Check_time & _TVR_MASK;
if (!index && // tv1
!cascade(_OFFSET(0), _INDEX(_Check_time, 0))) // tv2
{
cascade(_OFFSET(1), _INDEX(_Check_time, 1)); // tv3
}
++_Check_time;
_Cont_ty& list = tvec[index];
_Cont_ty empty;
::std::swap(empty, list);
for (auto it = empty.begin(); it != empty.end(); ++it)
{
// 如果有句柄说明处于挂起状态,继续执行
if ((*it)->handle)
{
auto ret = (*it)->handle.resume();
if (ret != 0)
{
(*it)->expires += ret;
add(*it);
}
}
else
{
// 视为第一次执行
auto res = (*it)->on_event(_Now);
auto ret = res.resume();
if (ret != 0)
{
(*it)->expires += ret;
(*it)->handle = ::std::move(res);
add(*it);
}
}
}
}
}
void run()
{
while (_thread_state == 1)
{
tick(_Check_time);
::std::unique_lock<::std::mutex> _nofity(_sign_mutex);
_sign.wait(_nofity);
}
}
void nofity(size_t now)
{
_Check_time = now;
_sign.notify_one();
}
void start(size_t time)
{
if (_thread_state)
{
return;
}
_Check_time = time;
_thread_state = 1;
_run_thread = ::std::thread(&_XTimeDisp::run, this);
}
void stop()
{
_thread_state = 0;
_sign.notify_one();
if (_run_thread.joinable())
{
_run_thread.join();
}
}
public:
size_t _Check_time;
union
{
class
{
public:
_Cont_ty tvec_1[_TVR_SIZE];
_Cont_ty tvec_2[_TVN_SIZE];
_Cont_ty tvec_3[_TVN_SIZE];
};
_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];
};
::std::recursive_mutex _tq_mutex;
::std::mutex _sign_mutex;
::std::condition_variable _sign;
::std::thread _run_thread;
int _thread_state;
};
using timer = _XTimer;
using tdc = _XTimeDisp;
void main()
{
timer t1, t2,t3,t4;
t1.id = 1;
t1.expires = 0;
t1.task = 300;
t2.id = 2;
t2.task = 100;
t2.expires = 3;
t3.id = 3;
t3.task = 50;
t3.expires = 3;
t4.id = 4;
t4.task = 30;
t4.expires = 88;
tdc tm;
tm._Check_time = 0;
tm.add(&t1);
tm.add(&t2);
tm.add(&t3);
tm.add(&t4);
tm.start(0);
int tbegin = 0;
while (1)
{
::std::this_thread::sleep_for(::std::chrono::milliseconds(10));
tm.nofity(tbegin);
++tbegin;
}
system("pause");
}
#undef _COT_WAIT
#undef _COT_NORET
运行截图
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的