协程(coroutine)应用实例:计时器过期事件响应

协程应用实例:计时器过期事件响应

  早期我曾把弄过War3 的WE编辑器,算是我编程的启蒙教育了。其事件响应系统在我心中一直印象深刻,特别是每个事件都可以用等待函数延迟执行,昨天我看到了协程,心血来潮便写了个简陋的计时器响应机制。

1. 计时调度中心

  计时调度中心采用linux时间轮式设计,网上资源很全面,不做过多叙述。
  详见: 基于Linux内核的时间轮算法设计实现【附代码】(https://cloud.tencent.com/developer/article/1553274)

2.基于协程的事件处理

  通常,在这种同时处理多个事件的情况下,一般是用 多线程? 执行事件响应函数(回调函数)的。
  考察实际应用方面,更多的是简短的事件响应,这样一来大部分的时间开销都用在创建/销毁线程对象上面了。
  当然为了解决这个问题,一种线程池技术被提出来。但仍然存在线程超过一定阈值时,大量时间被浪费在线程切换上所带来的问题。
  基于以上几点考虑,我决定用协程执行事件响应处理,同时也方面了等待函数的实现(不需要阻塞线程,仅仅只需要在一段时间后再度调度)。
  当然如果一个线程同时处理成千上万的协程压力过大,这时候平分到几个线程区执行就可以了。

协程返回值类型 promise_type,用于和协程交互

class co_call
{
public:
	class promise_type
	{
	public:
		using value_type = size_t;
	public:
		promise_type &get_return_object()
		{
			return *this;
		}

		auto initial_suspend()
		{
			return std::experimental::suspend_always{};
		}

		auto final_suspend()
		{
			// suspend it to save the coroutine handle 
			return std::experimental::suspend_always{};
		}

		void yield_value(value_type _Value)
		{
			_CurrentValue = _Value;
		}

		auto return_value(value_type _Value)
		{
			_CurrentValue = _Value;
			return std::experimental::suspend_always{}; // dont suspend it
		}

		value_type operator *(void) const noexcept
		{
			return _CurrentValue;
		}

	public:
		value_type _CurrentValue;
	};

	using value_type = promise_type::value_type;
public:

	explicit co_call(promise_type &_Prom)
		: _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),
		_Value(*_Prom)
	{

	}

	co_call() = default;

	co_call(co_call const &) = delete;


	co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value)
	{
		_Right._Coro = nullptr;
		_Right._Value = 0;
	}

	~co_call()
	{
		if (_Coro) {
			_Coro.destroy();
		}
	}
public:
	_NODISCARD value_type resume()
	{
		if (_Coro) {
			_Coro.resume();
			_Value = *_Coro.promise();
			if (_Coro.done() || (_Value == 0))
			{
				_Coro.destroy();
				_Coro = 0;
				_Value = 0;
				return _Value;
			}
		}
		return _Value;
	}

public:
	co_call &operator=(co_call const &) = delete;

	co_call &operator=(co_call &&_Right)
	{
		if (this != _STD addressof(_Right)) {
			_Coro = _Right._Coro;
			_Right._Coro = nullptr;
			_Value = _Right._Value;
			_Right._Value = 0;
		}
		return *this;
	}

	operator bool(void)
	{
		return (_Coro != 0);
	}
private:
	::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;
	value_type _Value = 0;
};

计时器对象,保存有协程句柄,如果事件存在等待函数则先行挂起返回一个等待时间,在一段时候后再次被调度,否则直接返回0

class _XTimer
{
public:

public:
	_XTimer()
		:task(0),
		id(0),
		handle()
	{

	}
	_XTimer(int c,int i)
		:task(c),
		id(i),
		handle()
	{

	}
	~_XTimer()
	{

	}
	
public:

	co_call on_event(size_t now)
	{
		std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;
		
		// 挂起一段时间,让出时间片,执行其他COROUTINE
		_COT_WAIT(task);
		
		std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;

		_COT_NORET();
	}

public:
	int task;
	int expires;
	int id;
	co_call handle;
};

下面是全部代码


#include <iostream>
#include <list>
#include <cassert>
#include <algorithm>
#include <mutex>
#include <experimental/coroutine>
#include <experimental/resumable>
#include <experimental/generator>
#include <Windows.h>
#include <thread>
#include <future>

using namespace std;

#define _COT_WAIT(x) (co_yield (x))
#define _COT_NORET() co_return (0)


class co_call
{
public:
	class promise_type
	{
	public:
		using value_type = size_t;
	public:
		promise_type &get_return_object()
		{
			return *this;
		}

		auto initial_suspend()
		{
			return std::experimental::suspend_always{};
		}

		auto final_suspend()
		{
			// suspend it to save the coroutine handle 
			return std::experimental::suspend_always{};
		}

		void yield_value(value_type _Value)
		{
			_CurrentValue = _Value;
		}

		auto return_value(value_type _Value)
		{
			_CurrentValue = _Value;
			return std::experimental::suspend_always{}; // dont suspend it
		}

		value_type operator *(void) const noexcept
		{
			return _CurrentValue;
		}

	public:
		value_type _CurrentValue;
	};

	using value_type = promise_type::value_type;
public:

	explicit co_call(promise_type &_Prom)
		: _Coro(::std::experimental::coroutine_handle<promise_type>::from_promise(_Prom)),
		_Value(*_Prom)
	{

	}

	co_call() = default;

	co_call(co_call const &) = delete;


	co_call(co_call &&_Right) : _Coro(_Right._Coro), _Value(_Right._Value)
	{
		_Right._Coro = nullptr;
		_Right._Value = 0;
	}

	~co_call()
	{
		if (_Coro) {
			_Coro.destroy();
		}
	}
public:
	_NODISCARD value_type resume()
	{
		if (_Coro) {
			_Coro.resume();
			_Value = *_Coro.promise();
			if (_Coro.done() || (_Value == 0))
			{
				_Coro.destroy();
				_Coro = 0;
				_Value = 0;
				return _Value;
			}
		}
		return _Value;
	}

public:
	co_call &operator=(co_call const &) = delete;

	co_call &operator=(co_call &&_Right)
	{
		if (this != _STD addressof(_Right)) {
			_Coro = _Right._Coro;
			_Right._Coro = nullptr;
			_Value = _Right._Value;
			_Right._Value = 0;
		}
		return *this;
	}

	operator bool(void)
	{
		return (_Coro != 0);
	}
private:
	::std::experimental::coroutine_handle<promise_type> _Coro = nullptr;
	value_type _Value = 0;
};

class _XTimer
{
public:

public:
	_XTimer()
		:task(0),
		id(0),
		handle()
	{

	}
	_XTimer(int c,int i)
		:task(c),
		id(i),
		handle()
	{

	}
	~_XTimer()
	{

	}
	
public:

	co_call on_event(size_t now)
	{
		std::cout << "timer ID: " << id << ", BEGIN AT: " << now << ", WAIT: " << task << std::endl;
		
		// 挂起一段时间,让出时间片,执行其他COROUTINE
		_COT_WAIT(task);
		
		std::cout << "timer ID: " << id << ", TASK DONE! (AT:" << now << ") " <<  std::endl;

		_COT_NORET();
	}

public:
	int task;
	int expires;
	int id;
	co_call handle;
};

class _XTimeDisp
{
public:
	using _Cont_ty = ::std::list<class _XTimer*>;
	static constexpr int _TVN_BITS = 4;
	static constexpr int _TVR_BITS = 6;
	static constexpr int _TVN_SIZE = 1 << _TVN_BITS;
	static constexpr int _TVR_SIZE = 1 << _TVR_BITS;
	static constexpr int _TVN_MASK = _TVN_SIZE - 1;
	static constexpr int _TVR_MASK = _TVR_SIZE - 1;

public:
	_XTimeDisp()
		:_tq_mutex(),
		tvec(),
		_sign_mutex(),
		_run_thread(),
		_sign(),
		_thread_state(0)
	{

	}

	~_XTimeDisp()
	{

	}
public:
	static constexpr int _INDEX(int expires, int n)
	{
		return ((expires >> (_TVR_BITS + n * _TVN_BITS)) & _TVN_MASK);
	}

	static constexpr int _OFFSET(int n)
	{
		return (_TVR_SIZE + n*_TVN_SIZE);
	}
public:
	void add(_XTimer* timer)
	{
		unsigned long long expires = timer->expires;
		unsigned long long index = expires - _Check_time;
		
		unsigned int _VecIndex = 0;
		if (index < _TVR_SIZE)		// tvec_1
		{
			tvec_1[expires & _TVR_MASK].push_back(timer);
		}
		else if (index < (1 << (_TVR_BITS + 1 * _TVN_BITS)))	// tvec_2
		{
			tvec_2[_INDEX(expires, 0)].push_back(timer);
		}
		else if (index <= 0)	// 异常处理,视为即将到期的计时器
		{
			tvec_1[_Check_time & _TVR_MASK].push_back(timer);
		}
		else 
		{
			if (index > 0xFFFFFFFFUL)
			{
				index = 0xFFFFFFFFUL;
				expires = index + _Check_time;
			}
			tvec_1[_INDEX(expires, 1)].push_back(timer);
		}
	}

	int cascade(int offset, int index)
	{
		::std::unique_lock<::std::recursive_mutex>  _lock(_tq_mutex);
		_Cont_ty& list = tvec[offset + index];
		_Cont_ty empty;
		::std::swap(empty, list);

		for (auto it = empty.begin(); it != empty.end(); ++it)
		{
			this->add(*it);
		}
		return index;
	}

	void tick(size_t _Now)
	{
		::std::unique_lock<::std::recursive_mutex> _lock(_tq_mutex);
		
		while (_Check_time <= _Now)
		{
			int index = _Check_time & _TVR_MASK;
			if (!index &&		// tv1
				!cascade(_OFFSET(0), _INDEX(_Check_time, 0)))	// tv2
			{
				cascade(_OFFSET(1), _INDEX(_Check_time, 1));	// tv3
			}

			++_Check_time;
			_Cont_ty& list = tvec[index];
			_Cont_ty empty;
			::std::swap(empty, list);

			for (auto it = empty.begin(); it != empty.end(); ++it)
			{ 
				// 如果有句柄说明处于挂起状态,继续执行
				if ((*it)->handle)
				{
					auto ret = (*it)->handle.resume();

					if (ret != 0)
					{
						(*it)->expires += ret;
						add(*it);
					}
				}
				else 
				{
					// 视为第一次执行

					auto res = (*it)->on_event(_Now);
					
					auto ret = res.resume();
					
					if (ret != 0)
					{
						(*it)->expires += ret;
						(*it)->handle = ::std::move(res);
						add(*it);
					}
				}

			}
		}
		
	}

	void run()
	{
		while (_thread_state == 1)
		{
			tick(_Check_time);
			::std::unique_lock<::std::mutex> _nofity(_sign_mutex);
			_sign.wait(_nofity);
		}
	}

	void nofity(size_t now)
	{
		_Check_time = now;
		_sign.notify_one();
		
	}

	void start(size_t time)
	{
		if (_thread_state)
		{
			return;
		}
		_Check_time = time;
		_thread_state = 1;
		_run_thread = ::std::thread(&_XTimeDisp::run, this);
	}

	void stop()
	{
		_thread_state = 0;
		_sign.notify_one();

		if (_run_thread.joinable()) 
		{
			_run_thread.join();
		}
	}
public:
	size_t _Check_time;
	union
	{
		class
		{
		public:
			_Cont_ty tvec_1[_TVR_SIZE];
			_Cont_ty tvec_2[_TVN_SIZE];
			_Cont_ty tvec_3[_TVN_SIZE];
		};
		_Cont_ty tvec[_TVR_SIZE + 2 * _TVN_SIZE];
	};

	::std::recursive_mutex _tq_mutex;
	::std::mutex _sign_mutex;
	::std::condition_variable _sign;
	::std::thread _run_thread;
	int _thread_state;
};


using timer = _XTimer;
using tdc = _XTimeDisp;


void main()
{

	timer t1, t2,t3,t4;

	t1.id = 1;
	t1.expires = 0;
	t1.task = 300;
	t2.id = 2;
	t2.task = 100;
	t2.expires = 3;
	t3.id = 3;
	t3.task = 50;
	t3.expires = 3;
	t4.id = 4;
	t4.task = 30;
	t4.expires = 88;

	tdc tm;
	 
	tm._Check_time = 0;
	tm.add(&t1);
	tm.add(&t2);
	tm.add(&t3);
	tm.add(&t4);

	tm.start(0);
	int tbegin = 0;
	while (1)
	{

		::std::this_thread::sleep_for(::std::chrono::milliseconds(10));
		tm.nofity(tbegin);
		++tbegin;
	}

	system("pause");
}

#undef _COT_WAIT
#undef _COT_NORET

运行截图
在这里插入图片描述
由于没有采取多线程Sleep阻塞,CPU利用自然是比较高的

发布了30 篇原创文章 · 获赞 15 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/u011760195/article/details/104918614
今日推荐