一,利用NNG pair模式,实现异步通信。
二,manager端 绑定地址,回调函数里 接收 异步消息:
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/supplemental/util/platform.h>
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <signal.h>
#include <sys/wait.h>
using namespace std;
using namespace std::chrono;
static bool exit_flag = false;
void recv_data_callback(void *arg);
static void sig_handler(int sig)
{
exit_flag = true;
std::cout << "sig_handler " << exit_flag << endl;
}
void fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}
class Manager
{
public:
//初始化
bool init()
{
//创建io 并绑定回调函数
rv = nng_aio_alloc(&aio, recv_data_callback, this);
if (rv < 0)
{
fatal("cannot allocate aio", rv);
}
//打开
rv = nng_pair0_open(&sock);
if (rv != 0)
{
fatal("nng_pair0_open", rv);
}
//设置缓冲区大小
nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
//开始监听
if ((rv = nng_listen(sock, url.c_str(), NULL, 0)) != 0)
{
fatal("nng_listen", rv);
}
nng_recv_aio(sock, aio);
isInit = true;
return isInit;
}
//发送数据
void send(const std::string &msgStr)
{
if (!isInit)
return;
if (!isInit)
return;
nng_msg *msg = NULL;
nng_msg_alloc(&msg, sizeof(msgStr));
memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
nng_sendmsg(sock, msg, 0);
}
public:
nng_socket sock;
nng_aio *aio{nullptr};
private:
int rv;
std::string url{"ipc:///tmp/pair"};
bool isInit{false};
};
void recv_data_callback(void *arg)
{
int rv = 0;
Manager *manager = static_cast<Manager*>(arg);
nng_msg *msg = NULL;
size_t json_len = 0;
char * json_str = NULL;
rv = nng_aio_result(manager->aio);
if (0 != rv) {
fatal("nng_recv error ", rv);
}
msg = nng_aio_get_msg(manager->aio);
json_str = static_cast<char*>(nng_msg_body(msg));
json_len = nng_msg_len(msg);
std::cout<<"recv_data_callback "<<json_str<<std::endl;
nng_msg_free(msg);
nng_recv_aio(manager->sock, manager->aio);
}
int main(int argc, char *grgv[])
{
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
signal(SIGABRT, sig_handler);
Manager manager;
if (manager.init())
{
cout << "init success" << endl;
}
else
{
cout << "init failed" << endl;
}
while (!exit_flag)
{
manager.send("Not bad");
this_thread::sleep_for(seconds(1));
}
return 0;
}
三,adapter 端,同步发送数据,单开一个线程 进行数据的轮询接收。
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <nng/supplemental/util/platform.h>
#include <iostream>
#include <thread>
#include <chrono>
#include <atomic>
#include <signal.h>
#include <sys/wait.h>
#include <string.h>
using namespace std;
using namespace std::chrono;
static bool exit_flag = false;
static void sig_handler(int sig)
{
exit_flag = true;
std::cout << "sig_handler " << exit_flag << endl;
}
void fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}
void recv_data_callback(void *arg)
{
}
class Adapter
{
public:
//初始化
bool init()
{
//打开
rv = nng_pair0_open(&sock);
if (rv != 0)
{
fatal("nng_pair0_open", rv);
}
//设置缓冲区大小
nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);
nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);
rv = nng_dial(sock, url.c_str(), &dialer, 0);
if (rv != 0)
{
fatal("nng_dial", rv);
}
isInit = true;
return isInit;
}
//开始接收
void start()
{
if (!isInit)
return;
std::thread t([&]()
{
while (!isStop)
{
nng_msg * msg = NULL;
char * json_str = NULL;
nng_recvmsg(sock, &msg, 0);
json_str = static_cast<char*>(nng_msg_body(msg));
std::cout<<"nng_recvmsg "<<json_str<<std::endl;
} });
t.detach();
}
void stop()
{
isStop = true;
cout << "stop " << isStop << endl;
}
void send(const std::string &msgStr)
{
if (!isInit)
return;
nng_msg *msg = NULL;
nng_msg_alloc(&msg, sizeof(msgStr));
memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));
nng_sendmsg(sock, msg, 0);
}
public:
nng_socket sock;
nng_dialer dialer;
std::atomic<bool> isStop{false};
private:
std::string url{"ipc:///tmp/pair"};
int rv;
bool isInit{false};
};
int main(int argc, char *grgv[])
{
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
signal(SIGABRT, sig_handler);
Adapter adapter;
if (adapter.init())
{
cout << "init success" << endl;
}
else
{
cout << "init failed" << endl;
}
adapter.start();
while (!exit_flag)
{
adapter.send("How are you?");
this_thread::sleep_for(seconds(1));
}
adapter.stop();
return 0;
}
3,CMakeLists.txt 两端 基本一致
cmake_minimum_required (VERSION 2.8.12)
project(adapter)
set(TARGET_NAME adapter)
find_package(nng CONFIG REQUIRED)
find_package(Threads)
add_executable(${TARGET_NAME} adapter.cpp)
target_link_libraries(${TARGET_NAME} nng::nng)
target_compile_definitions(${TARGET_NAME} PRIVATE NNG_ELIDE_DEPRECATED)