rdkafka 使用案例

main.cpp

#include <iostream>

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
 
#include "rdkafka.h"
 
const int PRODUCER_INIT_FAILED = -1;
const int PRODUCER_INIT_SUCCESS = 0;
const int PUSH_DATA_FAILED = -1;
const int PUSH_DATA_SUCCESS = 0;
 
 
static void logger( const rd_kafka_t *rk, int level, 
    const char *fac, const char *buf ) 
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
        (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
        level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
 
 
class ProducerKafka
{
public:
    ProducerKafka()
    {
    };

    ~ProducerKafka()
    {
    }
 
    int init_kafka(int partition, char *brokers, char *topic);
    int push_data_to_kafka(const char* buf, const int buf_len);
    void destroy();
 
private:
    int partition_;

    //rd
    rd_kafka_t* handler_;
    rd_kafka_conf_t *conf_;

    //topic
    rd_kafka_topic_t *topic_;
    rd_kafka_topic_conf_t *topic_conf_;
};
 
int ProducerKafka::init_kafka( int partition, 
    char *brokers, char *topic )
{
    char tmp[16]={0};
    char errstr[512]={0};
 
    partition_ = partition;
 
/* Kafka configuration */
    conf_ = rd_kafka_conf_new();

//set logger :register log function
    rd_kafka_conf_set_log_cb(conf_, logger);

/* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf_, "internal.termination.signal", tmp, NULL, 0);

    rd_kafka_conf_set(conf_, "message.timeout.ms", "3000", NULL, 0);
    rd_kafka_conf_set(conf_, "socket.timeout.ms", "3000", NULL, 0);

    rd_kafka_conf_set(conf_, "socket.keepalive.enable", "true", NULL, 0);

/*topic configuration*/
    topic_conf_ = rd_kafka_topic_conf_new();

扫描二维码关注公众号,回复: 3148503 查看本文章

    if (!(handler_  = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)))) 
    {
        fprintf(stderr, "*****Failed to create new producer: %s*******\n",errstr);
        return PRODUCER_INIT_FAILED;
    }
 
    rd_kafka_set_log_level(handler_, LOG_DEBUG);
 
    /* Add brokers */
    if (rd_kafka_brokers_add(handler_, brokers) == 0)
    {
        fprintf(stderr, "****** No valid brokers specified********\n");
        return PRODUCER_INIT_FAILED;       
    }


    /* Create topic */
    topic_ = rd_kafka_topic_new(handler_, topic, topic_conf_);

    return PRODUCER_INIT_SUCCESS;
}
 
void ProducerKafka::destroy()
{
    /* Destroy topic */
    rd_kafka_topic_destroy(topic_);
 
    /* Destroy the handle */
    rd_kafka_destroy(handler_);
}
 
int ProducerKafka::push_data_to_kafka( const char* buffer, 
    const int buf_len )
{
    int ret;
    char errstr[512]={0};

    if ( NULL == buffer )
    {
        return 0;
    }

    ret = rd_kafka_produce(topic_, partition_, RD_KAFKA_MSG_F_COPY, 
        (void*)buffer, (size_t)buf_len, NULL, 0, NULL);
 
    if ( ret == -1 )
    {
        fprintf(stderr,"****Failed to produce to topic %s partition %i: %s*****\n",
        rd_kafka_topic_name(topic_), partition_,
        rd_kafka_err2str(rd_kafka_errno2err(errno)));

        rd_kafka_poll(handler_, 0);
        return PUSH_DATA_FAILED;
    }

    fprintf(stderr, "***Sent %d bytes to topic:%s partition:%i*****\n",
        buf_len, rd_kafka_topic_name(topic_), partition_);
 
    rd_kafka_poll(handler_, 0);
 
    return PUSH_DATA_SUCCESS;
}

int main( int argc, char ** argv )
{
    int iRet = -1;

    char test_data[100];
    strcpy(test_data, "helloworld");
 
    ProducerKafka* producer = new ProducerKafka;
    if ( PRODUCER_INIT_SUCCESS == producer->init_kafka( 0, 
        "10.25.165.60:9092,10.25.166.11:9092", "166_11_test"))
    {
        printf("producer init success\n");
    }
    else
    {
        printf("producer init failed\n");
        return 0;
    }

    while (fgets(test_data, sizeof(test_data), stdin)) 
    {
        size_t len = strlen(test_data);
        if (test_data[len - 1] == '\n')
        {
            test_data[--len] = '\0';
        }

        if (strcmp(test_data, "end") == 0)
        {
            break;
        }

        if ( PUSH_DATA_SUCCESS == producer->push_data_to_kafka( 
                test_data, strlen(test_data) ) )
        {
            printf("push data success %s\n", test_data);
        }
        else
        {
            printf("push data failed %s\n", test_data);
        }
    }
 
    producer->destroy();
    iRet = 0;   
    return iRet;

}
 

make.sh

g++ main.cpp -I/usr/local/include/librdkafka -lrdkafka
新增了keeplive机制

src/rdkafka_transport.c 
 文件中
 
       if (rkb->rkb_rk->rk_conf.socket_keepalive)
        {
                if ( setsockopt( s, SOL_SOCKET, SO_KEEPALIVE,
                               (void *)&on, sizeof(on) ) == SOCKET_ERROR )
                {
                        rd_rkb_dbg(rkb, BROKER, "SOCKET",
                                   "Failed to set SO_KEEPALIVE: %s",
                                   socket_strerror(socket_errno));
                }

后面添加了几行

#ifdef __linux__
    /* Default settings are more or less garbage, with the keepalive time
     * set to 7200 by default on Linux. Modify settings to make the feature
     * actually useful. */

    /* Send first probe after interval. */
    int val = 6;
    if (setsockopt(s, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0)
    {
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
            "Failed to set TCP_KEEPIDLE: %s",
            socket_strerror(socket_errno));
    }

    /* Send next probes after the specified interval. Note that we set the
     * delay as interval / 3, as we send three probes before detecting
     * an error (see the next setsockopt call). */
    val = 2;
    if (setsockopt(s, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0)
    {
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
            "Failed to set TCP_KEEPINTVL: %s",
            socket_strerror(socket_errno));

    }

    /* Consider the socket in error state after three we send three ACK
     * probes without getting a reply. */
    val = 3;
    if (setsockopt(s, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0)
    {
        rd_rkb_dbg(rkb, BROKER, "SOCKET",
            "Failed to set TCP_KEEPCNT: %s",
            socket_strerror(socket_errno));

    }
#endif
 

多个broker,关闭一台时,不影响,仍然可以成功produce

猜你喜欢

转载自blog.csdn.net/hintonic/article/details/81283692