ICE异步调用:客户端AMI,服务器AMD

客户端向服务器发出远程调用请求后立即返回,不会阻塞等待;

服务器接收到来自客户端的请求后,把任务放到线程队列中,让工作线程的空闲的时候处理,而不是等待处理。

ICE定义:

#ifndef HELLO_ICE
#define HELLO_ICE

module Demo
{

exception RequestCanceledException
{
};

interface Hello
{
    ["ami", "amd"] void sayHello(int delay)
        throws RequestCanceledException;

    idempotent void shutdown();
};

};

#endif

客户端源码:

#include <Ice/Application.h>
#include "Hello.h"

using namespace std;
using namespace Demo;


//*********************************************************************************************************
//异步调用接口的输出参数和返回值
//统统成为了回调对象的输入参数
class AMI_Hello_sayHelloI : public AMI_Hello_sayHello
{
public:
	//异步方法调用成功
	virtual void ice_response(){cout << "sayHello_async Successfully !" << endl;}
	//异步方法调用异常
	virtual void ice_exception(const Ice::Exception& ex){cerr << "sayHello_async Failed : " << ex << endl;}
};



//*********************************************************************************************************
class AsyncClient : public Ice::Application
{
public:
    virtual int run(int, char*[]);
    virtual void interruptCallback(int);
private:
    void menu();
};

int main(int argc, char* argv[])
{
    AsyncClient app;
    return app.main(argc, argv);
}

int AsyncClient::run(int argc, char* argv[])
{
    callbackOnInterrupt();

	HelloPrx hello = HelloPrx::checkedCast(communicator()->stringToProxy("hello:default -p 10000"));
    if(!hello) {
        cerr << argv[0] << ": invalid proxy" << endl;
        return EXIT_FAILURE;
    }

    menu();

    char c;
    do {
        try{
            cout << "==> ";
            cin >> c;
            if(c == 'i'){//同步方法调用
                hello->sayHello(0);
            }
            else if(c == 'd'){//异步方法调用
                hello->sayHello_async(new AMI_Hello_sayHelloI, 5000);
            }
            else if(c == 's'){
                hello->shutdown();
            }
            else if(c == 'x'){
                // Nothing to do
            }
            else if(c == '?'){
                menu();
            }
            else{
                cout << "unknown command `" << c << "'" << endl;
                menu();
            }
        }
        catch(const Ice::Exception& ex){
            cerr << ex << endl;
        }
    }
    while(cin.good() && c != 'x');

    return EXIT_SUCCESS;
}

void
AsyncClient::interruptCallback(int)
{
    try {
        communicator()->destroy();
    }
    catch(const IceUtil::Exception& ex){
        cerr << appName() << ": " << ex << endl;
    }
    catch(...){
        cerr << appName() << ": unknown exception" << endl;
    }
    exit(EXIT_SUCCESS);
}

void AsyncClient::menu()
{
    cout <<
        "usage:\n"
        "i: send immediate greeting\n"
        "d: send delayed greeting\n"
        "s: shutdown server\n"
        "x: exit\n"
        "?: help\n";
}

服务器端源码:


#include <Ice/Application.h>


#include <Ice/Ice.h>
#include "Hello.h"
//#include "CWorkQueue.h"
//*********************************************************************************************************
#include <IceUtil/Thread.h>
#include <IceUtil/Monitor.h>
#include <IceUtil/Mutex.h>
#include <Ice/LocalException.h>

#include <list>
using namespace std;


//主线程向队列添加任务
//工作线程完成队列中的任务
class CWorkQueue : public IceUtil::Thread
{
public:
    CWorkQueue();

    virtual void run();
    void add(const Demo::AMD_Hello_sayHelloPtr&, int);
    void destroy();
private:
    struct CallbackEntry{//异步方法分派的回调由ICE自己实现
        Demo::AMD_Hello_sayHelloPtr poCallback;
        int iDelay;
    };

    IceUtil::Monitor<IceUtil::Mutex> m_oMonitor;
    std::list<CallbackEntry> m_oCallbakList;
    bool m_bDone;
};
typedef IceUtil::Handle<CWorkQueue> WorkQueuePtr;



CWorkQueue::CWorkQueue() :m_bDone(false){}

void CWorkQueue::run()
{
    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);
    while(!m_bDone) {
        if(m_oCallbakList.size() == 0) {
            m_oMonitor.wait();
        }

        if(m_oCallbakList.size() != 0){
            CallbackEntry entry = m_oCallbakList.front();//获取下一个回调函数

            //
            // Wait for the amount of time indicated in iDelay to 
            // emulate a process that takes a significant period of
            // time to complete.
            //
            m_oMonitor.timedWait(IceUtil::Time::milliSeconds(entry.iDelay));

            if(!m_bDone){
                m_oCallbakList.pop_front();
                cout << "Belated Hello World!" << endl;//打印问候语句
                entry.poCallback->ice_response();//发送响应
            }
        }
    }

	//Throw exception for any outstanding requests.
    for(list<CallbackEntry>::const_iterator p = m_oCallbakList.begin(); p != m_oCallbakList.end(); ++p) {
        (*p).poCallback->ice_exception(Demo::RequestCanceledException());
    }
}


//AMD 操作通常会把请求数据 (也就是,回调对象和操作参数)放入队列 ,
//供应用的某个线程 (或线程池)随后处理用
void 
CWorkQueue::add(const Demo::AMD_Hello_sayHelloPtr& poCallback, int iDelay)
{
    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);

    if(!m_bDone){
        CallbackEntry entry;
        entry.poCallback = poCallback;
        entry.iDelay = iDelay;

        if(m_oCallbakList.size() == 0){
            m_oMonitor.notify();
        }
        m_oCallbakList.push_back(entry);// Add work item.
    }
    else{
        poCallback->ice_exception(Demo::RequestCanceledException());//Destroyed, throw exception.
    }
}

void CWorkQueue::destroy()
{
    IceUtil::Monitor<IceUtil::Mutex>::Lock lock(m_oMonitor);
    m_bDone = true;
    m_oMonitor.notify();//Set done flag and notify.
}


//#include "HelloI.h"
//*********************************************************************************************************
class HelloI : virtual public Demo::Hello
{
public:
    HelloI(const WorkQueuePtr&);//异步方法分配接口由ICE自己实现,直接使用即可
    virtual void sayHello_async(const Demo::AMD_Hello_sayHelloPtr&, int, const Ice::Current&);
    virtual void shutdown(const Ice::Current&);
private:
    WorkQueuePtr m_poWorkQueue;
};


HelloI::HelloI(const WorkQueuePtr& workQueue) : m_poWorkQueue(workQueue){}

void HelloI::sayHello_async(const Demo::AMD_Hello_sayHelloPtr& poCallback, int iDelay, const Ice::Current&)
{
    if(iDelay == 0){
        cout << "Hello World!" << endl;
        poCallback->ice_response();
    }
    else{
        m_poWorkQueue->add(poCallback, iDelay);
    }
}

void HelloI::shutdown(const Ice::Current& curr)
{
    cout << "Shutting down..." << endl;
    m_poWorkQueue->destroy();
    curr.adapter->getCommunicator()->shutdown();
}


//*********************************************************************************************************
class AsyncServer : public Ice::Application
{
public:
    virtual int run(int, char*[]);
    virtual void interruptCallback(int);
private:
    WorkQueuePtr m_poWorkQueue;
};

int main(int argc, char* argv[])
{
    AsyncServer app;
    return app.main(argc, argv);
}

int AsyncServer::run(int argc, char* argv[])
{
    callbackOnInterrupt();

	Ice::ObjectAdapterPtr adapter = communicator()->createObjectAdapterWithEndpoints("Hello", "default -p 10000");
    m_poWorkQueue = new CWorkQueue();
    adapter->add(new HelloI(m_poWorkQueue), communicator()->stringToIdentity("hello"));

    m_poWorkQueue->start();
    adapter->activate();

    communicator()->waitForShutdown();
    m_poWorkQueue->getThreadControl().join();
    return EXIT_SUCCESS;
}

void AsyncServer::interruptCallback(int)
{
    m_poWorkQueue->destroy();
    communicator()->shutdown();
}

猜你喜欢

转载自blog.csdn.net/CherishPrecious/article/details/81637453