POCO库 Foundation::Thread模块(二) 主动对象

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/WUDAIJUN/article/details/11038697
 

Poco为使用者提供了一套基于多线程的主动对象,它通过使用多线程来高效地完成任务,而对于调用者却屏蔽了多线程的种种顾虑。甚至都看不出它使用了多线程的痕迹。Poco中一共有三种主动对象:activity,acitivityMethod,

Activity对象

     Acitvity对象是主动对象中最简单的一种对象,它通过对象调用的方式实现通过多线程完成任务,并且要求入口函数是没有参数和返回值的。这样就免去了很多麻烦,如返回值的处理和同步机制。

    Poco对Activity的介绍如下:

	/// This template class helps to implement active objects.
	/// An active object uses threads to decouple method
	/// execution from method invocation, or to perform tasks
	/// autonomously, without intervention of a caller.
	///
	/// An activity is a (typically longer running) method
	/// that executes within its own task. Activities can
	/// be started automatically (upon object construction)
	/// or manually at a later time. Activities can also
	/// be stopped at any time. However, to make stopping
	/// an activity work, the method implementing the
	/// activity has to check periodically whether it
	/// has been requested to stop, and if so, return. 
	/// Activities are stopped before the object they belong to is
	/// destroyed. Methods implementing activities cannot have arguments
	/// or return values. 


我们先来看看Activity对象如何使用:

#include "stdafx.h"
#include<iostream>

#include "Poco/Activity.h"
using namespace std;
using namespace Poco;

class AcivityTest
{
public:
	AcivityTest():
	  _act(this, &AcivityTest::run){}

	  void start()
	  {
		  _act.start();
	  }

	  void stop()
	  {
		  _act.stop();
	  }

	void run()
	{
		while (!_act.isStopped())
		{
			cout<<"Runing"<<endl;
			Sleep(1000);
		}
	}
private:
	Activity<AcivityTest> _act;

};

int main(int argc, char** argv)
{
	
	ActivityTest ts;
	ts.start();
	Sleep(3000);
	ts.stop();
	system("pause");
	return 0;
}

上面的例子可能举得很不贴切,因为它完全没有充分利用到Activity带来的多线程便利,只是为了使用而使用。一个ActivityTest类通过包含一个Activity对象并定义一个void run()入口函数(可任意取名)来初始化该对象。这可以使run函数通过Activity对象以并发的方式高效运行,并且可以随时终止它,但是前提是你在提供的入口run函数中随时监测该对象是否已经被申请停止。上面的程序中,主函数main启动了Activity对象,并且休眠3秒(这三秒完全可以做其他事情),之后停止该对象。在Activity对象入口中,每一秒检测是否被申请停止,如果未停止,则输出一行。因此程序将输出三行Runing。run函数一般可以用于完成比较费时而不是必需或者并不即时的任务。它可以是顺序的,Activity还提供了一个wait接口用于调用方等待其任务完成,这些都是在ActivityTest::run()函数完成后由Activity类自动封装实现的。有了stop()和wait()机制,就能够对Activity的行为更好地利用和控制。

     在Foundation/Include/Poco/Activity.h中找到Activity的源代码:

#include "Poco/Foundation.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/ThreadPool.h"
#include "Poco/Event.h"
#include "Poco/Mutex.h"


namespace Poco {


template <class C>
class Activity: public Runnable
{
public:
	typedef RunnableAdapter<C> RunnableAdapterType;
	typedef typename RunnableAdapterType::Callback Callback;

	Activity(C* pOwner, Callback method):
		_pOwner(pOwner),
		_runnable(*pOwner, method),
		_stopped(true),
		_running(false),
		_done(false)
		/// Creates the activity. Call start() to
		/// start it.
	{
		poco_check_ptr (pOwner);
	}
	
	~Activity()
		/// Stops and destroys the activity.
	{
		stop();
		wait();
	}
	
	void start()
		/// Starts the activity by acquiring a
		/// thread for it from the default thread pool.
	{
		FastMutex::ScopedLock lock(_mutex);
		
		if (!_running)
		{
			_done.reset();
			_stopped = false;
			_running = true;
			try
			{
				ThreadPool::defaultPool().start(*this);
			}
			catch (...)
			{
				_running = false;
				throw;
			}
		}
	}
	
	void stop()
		/// Requests to stop the activity.
	{
		FastMutex::ScopedLock lock(_mutex);

		_stopped = true;
	}
	
	void wait()
		/// Waits for the activity to complete.
	{
		if (_running)
		{
			_done.wait();
		}
	}

	void wait(long milliseconds)
		/// Waits the given interval for the activity to complete.
		/// An TimeoutException is thrown if the activity does not
		/// complete within the given interval.
	{
		if (_running)
		{
			_done.wait(milliseconds);
		}
	}
	
	bool isStopped() const
		/// Returns true if the activity has been requested to stop.
	{
		return _stopped;
	}
	
	bool isRunning() const
		/// Returns true if the activity is running.
	{
		return _running;
	}

protected:
	void run()
	{
		try
		{
			_runnable.run();
		}
		catch (...)
		{
			_running = false;
			_done.set();
			throw;
		}
		_running = false;
		_done.set();
	}
	
private:
	Activity();
	Activity(const Activity&);
	Activity& operator = (const Activity&);

	C*                  _pOwner;
	RunnableAdapterType _runnable;
	volatile bool       _stopped;
	volatile bool       _running;
	Event               _done;
	FastMutex           _mutex;
};


}

代码比较简单,它的模板参数是它所在类C,构造函数是一个C类对象地址和一个C类void fun()函数地址,有了这三件套之后,它就可以通过RunnableAdapter适配器将C::void fun()适配成一个标准的Thread入口RunnableAdapter::run(),然后在Activity::start()中启动RunnableAdapter<C>::run函数,至于wait机制,是通过一个任务完成信号量_done来标志的,stop机制通过一个bool变量_stopped来控制。对这些变量的修改都要通过一个FastMutex::ScopLock局部锁来控制访问。另外,Activity对象不允许默认构造,赋值或复制构造,因为只有有任务它才有意义。
  

 ActiveMethod

相对于Activity的无参数和返回值,有时候我们需要有参数有返回值,进行更有意义的任务。ActiveMethod提供了这样一个接口。首先看看它的使用方法:

#include "stdafx.h"
#include<iostream>

#include "Poco/ActiveMethod.h"
using namespace std;
using namespace Poco;


class ActiveObject
{
public:
	ActiveObject():
	  activeMethod(this, &ActiveObject::activeMethodImpl){}

    ActiveMethod<int, int, ActiveObject> activeMethod;

protected:
	int activeMethodImpl(const int& i)
	{
		return i+1;
	}

};

void main()
{
	ActiveObject obj;
	ActiveResult<int> result = obj.activeMethod(5);
        //这一句赋值立即返回 
	//此时obj中的activeMethodImpl以通过新线程运行,主线程此处可以继续执行其他任务

	result.wait();//等待线程分发任务完成
	cout<<result.data()<<endl;

	system("pause");
}

可以看到,与上面Activity例子一样,我们在使用类ActiveObject中加入了activeMethod成员并且在构造函数中,为它分配了任务(定义执行入口),在使用的时候,我们不是通过activeMethod.start()来启动它,因为我们需要参数和返回值,activeMethod重载了operator(),我们可以直接将该对象像函数一样使用:activeMethod(5),并且将返回值放入ActiveResult中,注意,此时ActiveResult并没有保存真正的结果,因为这一句赋值是立即完成的,而activeMethod的计算则可能花上很长时间,事实上ActiveResult得到的只是一个信号量和一个空结果,当activeMethod执行完成后设置信号量,并放入结果,因此我们在主线程需要得到结果时,要先通过result.wait()等待activeMethod运行结束,然后result.data()里面才是真正的结果。

下面是activeMethod的源码:

template <class ResultType, class ArgType, class OwnerType, class StarterType = ActiveStarter<OwnerType> >
class ActiveMethod
	/// An active method is a method that, when called, executes
	/// in its own thread. ActiveMethod's take exactly one
	/// argument and can return a value. To pass more than one
	/// argument to the method, use a struct.
	///
	/// The way an ActiveMethod is started can be changed by passing a StarterType
	/// template argument with a corresponding class. The default ActiveStarter
	/// starts the method in its own thread, obtained from a thread pool.
	///
	/// For an alternative implementation of StarterType, see ActiveDispatcher.
	///
	/// For methods that do not require an argument or a return value, the Void
	/// class can be used.
{
public:
	typedef ResultType (OwnerType::*Callback)(const ArgType&);
	typedef ActiveResult<ResultType> ActiveResultType;
	typedef ActiveRunnable<ResultType, ArgType, OwnerType> ActiveRunnableType;

	ActiveMethod(OwnerType* pOwner, Callback method):
		_pOwner(pOwner),
		_method(method)
		/// Creates an ActiveMethod object.
	{
		poco_check_ptr (pOwner);
	}
	
	ActiveResultType operator () (const ArgType& arg)
		/// Invokes the ActiveMethod.
	{
		ActiveResultType result(new ActiveResultHolder<ResultType>());
		ActiveRunnableBase::Ptr pRunnable(new ActiveRunnableType(_pOwner, _method, arg, result));
		StarterType::start(_pOwner, pRunnable);
		return result;
	}
		
	ActiveMethod(const ActiveMethod& other):
		_pOwner(other._pOwner),
		_method(other._method)
	{
	}

	ActiveMethod& operator = (const ActiveMethod& other)
	{
		ActiveMethod tmp(other);
		swap(tmp);
		return *this;
	}

	void swap(ActiveMethod& other)
	{
		std::swap(_pOwner, other._pOwner);
		std::swap(_method, other._method);
	}

private:
	ActiveMethod();

	OwnerType* _pOwner;
	Callback   _method;
};

这里面的关键就在于operator()的重载,函数中,先创建ActiveResult对象,然后用使用类对象指针_pOwner,入口_method,用户传入参数arg和刚创建的ActiveResult对象创建一个ActiveRunnableType对象,然后用ActiveStarter::start开始运行。

先看看ActiveRunnable:

class ActiveRunnableBase: public Runnable, public RefCountedObject
	/// The base class for all ActiveRunnable instantiations.
{
public:
	typedef AutoPtr<ActiveRunnableBase> Ptr;
};


template <class ResultType, class ArgType, class OwnerType>
class ActiveRunnable: public ActiveRunnableBase
	/// This class is used by ActiveMethod.
	/// See the ActiveMethod class for more information.
{
public:
	typedef ResultType (OwnerType::*Callback)(const ArgType&);
	typedef ActiveResult<ResultType> ActiveResultType;

	ActiveRunnable(OwnerType* pOwner, Callback method, const ArgType& arg, const ActiveResultType& result):
		_pOwner(pOwner),
		_method(method),
		_arg(arg),
		_result(result)
	{
		poco_check_ptr (pOwner);
	}

	void run()
	{
		ActiveRunnableBase::Ptr guard(this, false); // ensure automatic release when done
		try
		{
			_result.data(new ResultType((_pOwner->*_method)(_arg)));
		}
		catch (Exception& e)
		{
			_result.error(e);
		}
		catch (std::exception& e)
		{
			_result.error(e.what());
		}
		catch (...)
		{
			_result.error("unknown exception");
		}
		_result.notify();
	}

private:
	OwnerType* _pOwner;
	Callback   _method;
	ArgType    _arg;
	ActiveResultType _result;
};


它的作用和实现都很简单,类似于Activity中的RunnableAdapter,它负责适配线程的入口,从Runnable派生,在run()函数中,启动用户定义的入口,并带入参数,将执行结果放入传入的result.data中,并设置result的信号量,表示任务执行完成,使用方可通过result.data()获取执行结果。注意,这里用的result就是在ActiveMethod::operator()中new的ActiveResult。最后来看看ActiveMethod::operator()的下一句:

StarterType::start(_pOwner, pRunnable);

找到ActiveStarter源码:

template <class OwnerType>
class ActiveStarter
	/// The default implementation of the StarterType 
	/// policy for ActiveMethod. It starts the method
	/// in its own thread, obtained from the default
	/// thread pool.
{
public:
	static void start(OwnerType* pOwner, ActiveRunnableBase::Ptr pRunnable)
	{
		ThreadPool::defaultPool().start(*pRunnable);
		pRunnable->duplicate(); // The runnable will release itself.
	}
};

他通过默认线程池来接收这个任务,用ActiveRunnable作为参数,前面说了,ActiveRunnable从Runnable继承,因此默认线程池会分配一个线程执行pRunnable->run(),在前面看到的ActiveRunnable::run()中,会调用用户指定的函数,并传入指定的参数,将结果放入result.data中,并通过result中的信号量通知使用方函数执行完成,结果已准备好。

ActiveResult的实现比较简单,但是代码行数较多,就不贴出来了。它主要包含一个结果ResultType _result和一个信号量Event _event,用户在获取执行结果的时候,需要先调用ActiveResult::wait()等待结果准备好,再通过ActiveResult::data()获取数据。

下面再梳理一下ActiveMethod机制的整个流程,
     在构造ActiveMethod的时候,用户需要通过模板指定返回值类型ResultType(用于实例化ActiveResult和适配ActiveRunnable入口),参数类型ArgType(用于定义ActiveResult::operator()参数和适配ActiveRunnable入口),和目标函数所有者类型OwnerType(用于适配ActiveRunnable入口),并且指定目标函数地址func,和执行的对象指针pObject。ActiveMethod通过这些元素确认了任务入口原型:OwnerType::ResultType func(ArgType arg) 以及调用者:pObject。然后用户可以直接通过activeMethod(arg)的方式来传入参数并调用之前指定的入口,然后跳到了ActiveMethod::operator()中,它创建了ActiveResult,并构造一个ActiveRunnable适配器,在ActiveRunnable中分配线程执行任务,并且将结果放入刚创建的ActiveResult。至此,一次ActiveMethod的使用就完成了。看似麻烦,实际上它的最大优势就在于多线程,它屏蔽了创建线程,线程同步等机制。从用户的角度上来说,只需要构造一个ActiveMethod,然后就可以直接把它当成目标函数那样直接用ActiveResult result = activeMehod(arg)调用,而这一句赋值时立即返回的,然后主线程就做其他的事情,当需要获取执行结果时,再调用activeMethod.wait()等待结果准备好,最后通过activeMethod.data()获取结果。顺便提一点,由于ActiveRunnable实现机制的限制,ActiveMethod只支持一个参数,如果有多个自定义参数,需要使用结构体或类。

猜你喜欢

转载自blog.csdn.net/WUDAIJUN/article/details/11038697