RdKafka高级用法及例子

初识 RdKafka

RdKafka 是一个高性能、跨平台、开源的消息队列库,由 C++ 实现。它既可以作为生产者,也可以作为消费者,支持多种可扩展的消息路由策略,并提供了丰富的配置选项和 API 接口。

在实际应用场景中,RdKafka 可以用来解决大规模数据处理、日志收集、事件推送等问题。相比于其他消息队列库,例如 Apache Kafka、RabbitMQ 等,RdKafka 的最大优势在于它的高吞吐量和低延迟。

高级用法

1. 消费者组管理

RdKafka 支持通过消费者组的方式共享消息接收负载。消费者组是多个消费者实例共同处理相同的一批消息的逻辑概念,每个消费者实例只负责处理一部分消息。这种方式可以提高消息处理的并发度和容错性。

使用 RdKafka 创建消费者组十分简单,只需要在配置项中设置 group.id 参数即可:

conf->set("group.id", "consumer-group");

注意,如果多个消费者实例的 group.id 参数相同,即它们属于同一个消费者组。

2. 消息回溯

在某些情况下,消费者需要重新读取历史消息。RdKafka 提供了两种方式来支持消费者进行消息回溯:

  • 从指定偏移量开始消费。 消费者可以通过设置 offset 参数来指定从哪个偏移量开始消费。例如,以下代码将创建一个新的消费者,从主题 test 的第 10 条消息开始消费:

    RdKafka::TopicPartition *tp = RdKafka::TopicPartition::create("test", 0, 10);
    consumer->assign({tp});
    
  • 从最早或最新的消息开始消费。 消费者可以通过设置 auto.offset.reset 参数来指定从哪里开始消费,可以选择 earliestlatest,分别表示从最早或最新的消息开始消费。例如,以下代码将创建一个新的消费者,从主题 test 的最新消息开始消费:

    conf->set("auto.offset.reset", "latest");
    

3. 消息分区

在 Kafka 中,一个主题可以被分成多个分区,每个分区存储的消息是有序的。RdKafka 支持根据业务需求自定义消息分区策略。

具体来说,RdKafka 提供了一个抽象类 RdKafka::PartitionerCb,通过继承该类并实现 partitioner_cb 方法来自定义分区策略。例如,以下代码将创建一个新的生产者,并使用 CustomPartitioner 自定义分区策略:

class CustomPartitioner : public RdKafka::PartitionerCb {
public:
    int32_t partitioner_cb(const RdKafka::Topic *topic, const std::string *key,
                           int32_t partition_count, void *msg_opaque) override {
        // 自定义分区策略
        return 0;
    }
};

conf->set("partitioner_cb", new CustomPartitioner(), errstr);
producer = RdKafka::Producer::create(conf, errstr);

4. 消息筛选

在某些情况下,消费者只对感兴趣的消息进行处理,而忽略其他消息。RdKafka 提供了两种机制来支持消息筛选:

  • 主题过滤器。 消费者可以设置主题过滤器参数,只接收符合规则的消息。例如,以下代码将创建一个新的消费者,只接收主题名以 test- 开头的消息:

    conf->set("topic.whitelist", "test-*");
    consumer = RdKafka::Consumer::create(conf, errstr);
    
  • 消息过滤器。 消费者可以实现 ConsumeCb 接口,并在 consume_cb 方法中对消息进行筛选。例如,以下代码将创建一个新的消费者,通过 CustomConsumeCb 对消息进行筛选:

    class CustomConsumeCb : public RdKafka::ConsumeCb {
    public:
        void consume_cb (RdKafka::Message &msg, void *opaque) override {
            // 筛选消息
        }
    };
    
    consumer->set_consume_callback(&CustomConsumeCb());
    

示例代码

下面是一个完整的 RdKafka 生产者和消费者实现,支持自定义分区和消息回溯。生产者从标准输入中读取用户输入的消息,然后发送给 Kafka 集群;消费者从指定偏移量开始消费指定主题的消息,并打印到标准输出中。

生产者代码

#include <iostream>
#include <sstream>
#include <string>
#include <signal.h>
#include <librdkafka/rdkafkacpp.h>

const std::string BROKER_LIST = "localhost:9092";
const std::string TOPIC_NAME = "test";
const int PARTITION = RdKafka::Topic::PARTITION_UA;

volatile sig_atomic_t stop = 0;

void sigterm (int sig) {
    stop = 1;
}

class MsgDeliveryRep : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) override {
        if (message.err()) {
            std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
        } else {
            std::cout << "Message delivered: " << message.key() << std::endl;
        }
    }
};

int main() {
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    std::string errstr;
    conf->set("bootstrap.servers", BROKER_LIST, errstr);
    tconf->set("request.required.acks", "all", errstr);
    tconf->set("partitioner_cb", static_cast<RdKafka::PartitionerCb*>(nullptr), errstr);

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }

    signal(SIGTERM, sigterm);

    MsgDeliveryRep delivery_report;
    producer->set_delivery_report_callback(&delivery_report);

    while (!stop) {
        std::string line;
        if (std::getline(std::cin, line)) {
            std::stringstream ss(line);

            std::string key, value;
            std::getline(ss, key, ',');
            std::getline(ss, value);

            RdKafka::ErrorCode result = producer->produce(
                    RdKafka::Topic::create(producer, TOPIC_NAME, tconf),
                    PARTITION,
                    RdKafka::Producer::RK_MSG_COPY,
                    const_cast<char *>(value.c_str()),
                    value.size(),
                    const_cast<char *>(key.c_str()),
                    key.size(),
                    static_cast<void *>(nullptr)
            );

            if (result != RdKafka::ERR_NO_ERROR) {
                std::cerr << "Failed to produce message: " << RdKafka::err2str(result) << std::endl;
            }
        }
    }

    delete producer;
    delete conf;
    delete tconf;

    return 0;
}

消费者代码

#include <iostream>
#include <string>
#include <signal.h>
#include <librdkafka/rdkafkacpp.h>

const std::string BROKER_LIST = "localhost:9092";
const std::string TOPIC_NAME = "test";
const int PARTITION = RdKafka::Topic::PARTITION_UA;
const int32_t OFFSET = RdKafka::Topic::OFFSET_BEGINNING;

volatile sig_atomic_t stop = 0;

void sigterm (int sig) {
    stop = 1;
}

class MsgRebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions) override {
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
            consumer->assign(partitions);
        } else {
            consumer->unassign();
        }
    }
};

void msg_consume(RdKafka::Message* message, void* opaque) {
    switch (message->err()) {
        case RdKafka::ERR__TIMED_OUT:
            break;

        case RdKafka::ERR_NO_ERROR:
            std::cout << "Message received: " << std::string(static_cast<const char*>(message->payload()), message->len()) << std::endl;
            break;

        case RdKafka::ERR__PARTITION_EOF:
            std::cerr << "Reached end of partition." << std::endl;
            break;

        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
            std::cerr << "Consume error: " << message->errstr() << std::endl;
            stop = 1;
            break;

        default:
            std::cerr << "Consume error: " << message->errstr() << std::endl;
            stop = 1;
    }
}

int main() {
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    std::string errstr;
    conf->set("bootstrap.servers", BROKER_LIST, errstr);
    tconf->set("auto.offset.reset", "smallest", errstr);

    signal(SIGTERM, sigterm);

    RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
    if (!consumer) {
        std::cerr << "Failed to create consumer: " << errstr << std::endl;
        exit(1);
    }

    MsgRebalanceCb rebalance_cb;
    consumer->set_rebalance_callback(&rebalance_cb);

    consumer->subscribe({TOPIC_NAME});

    while (!stop) {
        RdKafka::Message *message = consumer->consume(1000);
        msg_consume(message, nullptr);
        delete message;
    }

    consumer->close();
    delete consumer;
    delete conf;
    delete tconf;

    return 0;
}

总结

本文介绍了 RdKafka 的高级用法,包括消费者组管理、消息回溯、消息分区和消息筛选,并给出了一个完整的生产者和消费者实现。希望能够对大家学习和使用 RdKafka 带来一些帮助。

猜你喜欢

转载自blog.csdn.net/CarryMee/article/details/130728018