客户端向服务器发出远程调用请求后立即返回,不会阻塞等待;
服务器接收到来自客户端的请求后,把任务放到线程队列中,让工作线程的空闲的时候处理,而不是等待处理。
ICE定义:
#ifndef HELLO_ICE
#define HELLO_ICE
module Demo
{
exception RequestCanceledException
{
};
interface Hello
{
["ami", "amd"] void sayHello(int delay)
throws RequestCanceledException;
idempotent void shutdown();
};
};
#endif
客户端源码:
#include <Ice/Application.h>
#include "Hello.h"
using namespace std;
using namespace Demo;
//*********************************************************************************************************
//异步调用接口的输出参数和返回值
//统统成为了回调对象的输入参数
class AMI_Hello_sayHelloI : public AMI_Hello_sayHello
{
public:
//异步方法调用成功
virtual void ice_response(){cout << "sayHello_async Successfully !" << endl;}
//异步方法调用异常
virtual void ice_exception(const Ice::Exception& ex){cerr << "sayHello_async Failed : " << ex << endl;}
};
//*********************************************************************************************************
class AsyncClient : public Ice::Application
{
public:
virtual int run(int, char*[]);
virtual void interruptCallback(int);
private:
void menu();
};
int main(int argc, char* argv[])
{
AsyncClient app;
return app.main(argc, argv);
}
int AsyncClient::run(int argc, char* argv[])
{
callbackOnInterrupt();
HelloPrx hello = HelloPrx::checkedCast(communicator()->stringToProxy("hello:default -p 10000"));
if(!hello) {
cerr << argv[0] << ": invalid proxy" << endl;
return EXIT_FAILURE;
}
menu();
char c;
do {
try{
cout << "==> ";
cin >> c;
if(c == 'i'){//同步方法调用
hello->sayHello(0);
}
else if(c == 'd'){//异步方法调用
hello->sayHello_async(new AMI_Hello_sayHelloI, 5000);
}
else if(c == 's'){
hello->shutdown();
}
else if(c == 'x'){
// Nothing to do
}
else if(c == '?'){
menu();
}
else{
cout << "unknown command `" << c << "'" << endl;
menu();
}
}
catch(const Ice::Exception& ex){
cerr << ex << endl;
}
}
while(cin.good() && c != 'x');
return EXIT_SUCCESS;
}
void
AsyncClient::interruptCallback(int)
{
try {
communicator()->destroy();
}
catch(const IceUtil::Exception& ex){
cerr << appName() << ": " << ex << endl;
}
catch(...){
cerr << appName() << ": unknown exception" << endl;
}
exit(EXIT_SUCCESS);
}
void AsyncClient::menu()
{
cout <<
"usage:\n"
"i: send immediate greeting\n"
"d: send delayed greeting\n"
"s: shutdown server\n"
"x: exit\n"
"?: help\n";
}
服务器端源码:
#include <Ice/Application.h>
#include <Ice/Ice.h>
#include "Hello.h"
//#include "CWorkQueue.h"
//*********************************************************************************************************
#include <IceUtil/Thread.h>
#include <IceUtil/Monitor.h>
#include <IceUtil/Mutex.h>
#include <Ice/LocalException.h>
#include <list>
using namespace std;
//主线程向队列添加任务
//工作线程完成队列中的任务
class CWorkQueue : public IceUtil::Thread
{
public:
CWorkQueue();
virtual void run();
void add(const Demo::AMD_Hello_sayHelloPtr&, int);
void destroy();
private:
struct CallbackEntry{//异步方法分派的回调由ICE自己实现
Demo::AMD_Hello_sayHelloPtr poCallback;
int iDelay;
};
IceUtil::Monitor<IceUtil::Mutex> m_oMonitor;
std::list<CallbackEntry> m_oCallbakList;
bool m_bDone;
};
typedef IceUtil::Handle<CWorkQueue> WorkQueuePtr;
CWorkQueue::CWorkQueue() :m_bDone(false){}
void CWorkQueue::run()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);
while(!m_bDone) {
if(m_oCallbakList.size() == 0) {
m_oMonitor.wait();
}
if(m_oCallbakList.size() != 0){
CallbackEntry entry = m_oCallbakList.front();//获取下一个回调函数
//
// Wait for the amount of time indicated in iDelay to
// emulate a process that takes a significant period of
// time to complete.
//
m_oMonitor.timedWait(IceUtil::Time::milliSeconds(entry.iDelay));
if(!m_bDone){
m_oCallbakList.pop_front();
cout << "Belated Hello World!" << endl;//打印问候语句
entry.poCallback->ice_response();//发送响应
}
}
}
//Throw exception for any outstanding requests.
for(list<CallbackEntry>::const_iterator p = m_oCallbakList.begin(); p != m_oCallbakList.end(); ++p) {
(*p).poCallback->ice_exception(Demo::RequestCanceledException());
}
}
//AMD 操作通常会把请求数据 (也就是,回调对象和操作参数)放入队列 ,
//供应用的某个线程 (或线程池)随后处理用
void
CWorkQueue::add(const Demo::AMD_Hello_sayHelloPtr& poCallback, int iDelay)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);
if(!m_bDone){
CallbackEntry entry;
entry.poCallback = poCallback;
entry.iDelay = iDelay;
if(m_oCallbakList.size() == 0){
m_oMonitor.notify();
}
m_oCallbakList.push_back(entry);// Add work item.
}
else{
poCallback->ice_exception(Demo::RequestCanceledException());//Destroyed, throw exception.
}
}
void CWorkQueue::destroy()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);
m_bDone = true;
m_oMonitor.notify();//Set done flag and notify.
}
//#include "HelloI.h"
//*********************************************************************************************************
class HelloI : virtual public Demo::Hello
{
public:
HelloI(const WorkQueuePtr&);//异步方法分配接口由ICE自己实现,直接使用即可
virtual void sayHello_async(const Demo::AMD_Hello_sayHelloPtr&, int, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
private:
WorkQueuePtr m_poWorkQueue;
};
HelloI::HelloI(const WorkQueuePtr& workQueue) : m_poWorkQueue(workQueue){}
void HelloI::sayHello_async(const Demo::AMD_Hello_sayHelloPtr& poCallback, int iDelay, const Ice::Current&)
{
if(iDelay == 0){
cout << "Hello World!" << endl;
poCallback->ice_response();
}
else{
m_poWorkQueue->add(poCallback, iDelay);
}
}
void HelloI::shutdown(const Ice::Current& curr)
{
cout << "Shutting down..." << endl;
m_poWorkQueue->destroy();
curr.adapter->getCommunicator()->shutdown();
}
//*********************************************************************************************************
class AsyncServer : public Ice::Application
{
public:
virtual int run(int, char*[]);
virtual void interruptCallback(int);
private:
WorkQueuePtr m_poWorkQueue;
};
int main(int argc, char* argv[])
{
AsyncServer app;
return app.main(argc, argv);
}
int AsyncServer::run(int argc, char* argv[])
{
callbackOnInterrupt();
Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("Hello", "default -p 10000");
m_poWorkQueue = new CWorkQueue();
adapter->add(new HelloI(m_poWorkQueue), communicator()->stringToIdentity("hello"));
m_poWorkQueue->start();
adapter->activate();
communicator()->waitForShutdown();
m_poWorkQueue->getThreadControl().join();
return EXIT_SUCCESS;
}
void AsyncServer::interruptCallback(int)
{
m_poWorkQueue->destroy();
communicator()->shutdown();
}