初识 RdKafka
RdKafka 是一个高性能、跨平台、开源的消息队列库,由 C++ 实现。它既可以作为生产者,也可以作为消费者,支持多种可扩展的消息路由策略,并提供了丰富的配置选项和 API 接口。
在实际应用场景中,RdKafka 可以用来解决大规模数据处理、日志收集、事件推送等问题。相比于其他消息队列库,例如 Apache Kafka、RabbitMQ 等,RdKafka 的最大优势在于它的高吞吐量和低延迟。
高级用法
1. 消费者组管理
RdKafka 支持通过消费者组的方式共享消息接收负载。消费者组是多个消费者实例共同处理相同的一批消息的逻辑概念,每个消费者实例只负责处理一部分消息。这种方式可以提高消息处理的并发度和容错性。
使用 RdKafka 创建消费者组十分简单,只需要在配置项中设置 group.id
参数即可:
conf->set("group.id", "consumer-group");
注意,如果多个消费者实例的 group.id
参数相同,即它们属于同一个消费者组。
2. 消息回溯
在某些情况下,消费者需要重新读取历史消息。RdKafka 提供了两种方式来支持消费者进行消息回溯:
-
从指定偏移量开始消费。 消费者可以通过设置
offset
参数来指定从哪个偏移量开始消费。例如,以下代码将创建一个新的消费者,从主题test
的第 10 条消息开始消费:RdKafka::TopicPartition *tp = RdKafka::TopicPartition::create("test", 0, 10); consumer->assign({tp});
-
从最早或最新的消息开始消费。 消费者可以通过设置
auto.offset.reset
参数来指定从哪里开始消费,可以选择earliest
或latest
,分别表示从最早或最新的消息开始消费。例如,以下代码将创建一个新的消费者,从主题test
的最新消息开始消费:conf->set("auto.offset.reset", "latest");
3. 消息分区
在 Kafka 中,一个主题可以被分成多个分区,每个分区存储的消息是有序的。RdKafka 支持根据业务需求自定义消息分区策略。
具体来说,RdKafka 提供了一个抽象类 RdKafka::PartitionerCb
,通过继承该类并实现 partitioner_cb
方法来自定义分区策略。例如,以下代码将创建一个新的生产者,并使用 CustomPartitioner
自定义分区策略:
class CustomPartitioner : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
int32_t partition_count, void *msg_opaque) override {
// 自定义分区策略
return 0;
}
};
conf->set("partitioner_cb", new CustomPartitioner(), errstr);
producer = RdKafka::Producer::create(conf, errstr);
4. 消息筛选
在某些情况下,消费者只对感兴趣的消息进行处理,而忽略其他消息。RdKafka 提供了两种机制来支持消息筛选:
-
主题过滤器。 消费者可以设置主题过滤器参数,只接收符合规则的消息。例如,以下代码将创建一个新的消费者,只接收主题名以
test-
开头的消息:conf->set("topic.whitelist", "test-*"); consumer = RdKafka::Consumer::create(conf, errstr);
-
消息过滤器。 消费者可以实现
ConsumeCb
接口,并在consume_cb
方法中对消息进行筛选。例如,以下代码将创建一个新的消费者,通过CustomConsumeCb
对消息进行筛选:class CustomConsumeCb : public RdKafka::ConsumeCb { public: void consume_cb (RdKafka::Message &msg, void *opaque) override { // 筛选消息 } }; consumer->set_consume_callback(&CustomConsumeCb());
示例代码
下面是一个完整的 RdKafka 生产者和消费者实现,支持自定义分区和消息回溯。生产者从标准输入中读取用户输入的消息,然后发送给 Kafka 集群;消费者从指定偏移量开始消费指定主题的消息,并打印到标准输出中。
生产者代码
#include <iostream>
#include <sstream>
#include <string>
#include <signal.h>
#include <librdkafka/rdkafkacpp.h>
const std::string BROKER_LIST = "localhost:9092";
const std::string TOPIC_NAME = "test";
const int PARTITION = RdKafka::Topic::PARTITION_UA;
volatile sig_atomic_t stop = 0;
void sigterm (int sig) {
stop = 1;
}
class MsgDeliveryRep : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) override {
if (message.err()) {
std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered: " << message.key() << std::endl;
}
}
};
int main() {
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string errstr;
conf->set("bootstrap.servers", BROKER_LIST, errstr);
tconf->set("request.required.acks", "all", errstr);
tconf->set("partitioner_cb", static_cast<RdKafka::PartitionerCb*>(nullptr), errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
signal(SIGTERM, sigterm);
MsgDeliveryRep delivery_report;
producer->set_delivery_report_callback(&delivery_report);
while (!stop) {
std::string line;
if (std::getline(std::cin, line)) {
std::stringstream ss(line);
std::string key, value;
std::getline(ss, key, ',');
std::getline(ss, value);
RdKafka::ErrorCode result = producer->produce(
RdKafka::Topic::create(producer, TOPIC_NAME, tconf),
PARTITION,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(value.c_str()),
value.size(),
const_cast<char *>(key.c_str()),
key.size(),
static_cast<void *>(nullptr)
);
if (result != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(result) << std::endl;
}
}
}
delete producer;
delete conf;
delete tconf;
return 0;
}
消费者代码
#include <iostream>
#include <string>
#include <signal.h>
#include <librdkafka/rdkafkacpp.h>
const std::string BROKER_LIST = "localhost:9092";
const std::string TOPIC_NAME = "test";
const int PARTITION = RdKafka::Topic::PARTITION_UA;
const int32_t OFFSET = RdKafka::Topic::OFFSET_BEGINNING;
volatile sig_atomic_t stop = 0;
void sigterm (int sig) {
stop = 1;
}
class MsgRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) override {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
consumer->assign(partitions);
} else {
consumer->unassign();
}
}
};
void msg_consume(RdKafka::Message* message, void* opaque) {
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
std::cout << "Message received: " << std::string(static_cast<const char*>(message->payload()), message->len()) << std::endl;
break;
case RdKafka::ERR__PARTITION_EOF:
std::cerr << "Reached end of partition." << std::endl;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume error: " << message->errstr() << std::endl;
stop = 1;
break;
default:
std::cerr << "Consume error: " << message->errstr() << std::endl;
stop = 1;
}
}
int main() {
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
std::string errstr;
conf->set("bootstrap.servers", BROKER_LIST, errstr);
tconf->set("auto.offset.reset", "smallest", errstr);
signal(SIGTERM, sigterm);
RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
MsgRebalanceCb rebalance_cb;
consumer->set_rebalance_callback(&rebalance_cb);
consumer->subscribe({TOPIC_NAME});
while (!stop) {
RdKafka::Message *message = consumer->consume(1000);
msg_consume(message, nullptr);
delete message;
}
consumer->close();
delete consumer;
delete conf;
delete tconf;
return 0;
}
总结
本文介绍了 RdKafka 的高级用法,包括消费者组管理、消息回溯、消息分区和消息筛选,并给出了一个完整的生产者和消费者实现。希望能够对大家学习和使用 RdKafka 带来一些帮助。