对 rabbitmq-c 进行封装, 支持多线程, 高并发访问
头文件地址:
https://github.com/MwlLj/cpp_component/tree/master/component/header/amqp
源文件地址:
https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/source
测试地址:
https://github.com/MwlLj/cpp_component/tree/master/component/src/amqp/amqp_test/source
完整工程:
https://github.com/MwlLj/cpp_component
支持功能:
1. 断线重连机制
2. 高并发处理
3. 处理类中是一个线程, 可以直接在handler中做耗时操作
example:
#include <iostream>
#include "amqp/amqp.h"
#include "amqp/rabbitmq.h"
#include <vector>
#include <string>
#include <sstream>
#include <thread>
class CHandler : public amqp::IHandler
{
public:
virtual void handle(const amqp::CMessage &msg)
{
std::cout << msg.msg() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
};
void connect(amqp::IAmqp *amqp, int index)
{
bool result = false;
amqp::IAmqp::Dial info;
info.host = "localhost";
info.port = 5672;
info.userName = "guest";
info.userPwd = "guest";
info.virtualUrl = "/";
amqp::IConnect *conn = amqp->dial(info, result);
amqp::IConnect::Channel ch;
ch.channelCount = 1;
amqp::IChannel *channel = conn->channel(ch, result);
amqp::IChannel::Exchange exchange;
exchange.exchangeName = "test-exchange";
exchange.exchangeType = amqp::exchage_type_direct;
channel->exchangeDeclare(exchange);
amqp::IChannel::Publish pub;
pub.exchangeName = "test-exchange";
pub.exchangeType = amqp::exchage_type_direct;
pub.message = "hello hello hello";
pub.routingKey = "test-key";
#if 0
while (1) {
for (int i = 0; i < 10; ++i) {
std::thread([channel, pub, i] {
std::this_thread::sleep_for(std::chrono::milliseconds(10 * (10 - i)));
channel->publish(pub);
}).detach();
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
#else
std::stringstream ss;
ss << "test-queue-" << index;
std::string queueName = ss.str();
ss.str("");
ss << "test-key-" << index;
std::string routingKey = ss.str();
std::cout << routingKey << std::endl;
amqp::IChannel::Queue que;
que.queueName = queueName;
que.isAutoDelete = true;
que.isAutoDelWhenDisconnect = true;
que.isNotAllowNotExist = false;
que.isPersist = false;
channel->queueDeclare(que);
amqp::IChannel::QueueBind queBind;
queBind.exchangeName = "test-exchange";
queBind.routingKey = routingKey;
queBind.queueName = queueName;
channel->queueBind(queBind);
amqp->pushChannel(channel);
amqp->registerRouting(routingKey, new CHandler());
#endif
// conn->close();
}
int main()
{
const int max = 10;
amqp::IAmqp *amqp = new amqp::CRabbitmq(200, 100);
for (int i = 0; i < max; ++i) {
std::thread([amqp, i] {
connect(amqp, i);
}).join();
}
amqp->loop();
std::system("pause");
return 0;
}