简单的Kafka0 8消费者程序

简单的Kafka0.8消费者程序

1 建立连接

建立和kafka的连接,需要以下几个基本信息:

  • brokers IP 和 端口号
  • topic名称

2 基本功能

读取数据需要以下信息

  • partition id
  • 开始的offset

下面的代码来自于exaple,做了简化,去掉了参数功能,指演示以下功能

  • 仅读取本地一个broker的数据
  • partition只有一个,id为0
  • 当接收到外界kill信号或者读取到最后一个offset的时候,会退出程序
  • 当读取数据时出现错误会退出
  • 可以接收rdkafka的事件,虽然现在还不知道有多大用
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include <list>
#include "librdkafka/rdkafkacpp.h"

using std::string;
using std::list;
using std::cout;
using std::endl;

static bool run = true;
static bool exit_eof = true;


class MyEventCb : public RdKafka::EventCb {
public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
      {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
          " (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        break;
      }
  }
};


void msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
  case RdKafka::ERR__TIMED_OUT:
    break;

  case RdKafka::ERR_NO_ERROR:
    /* Real message */
    std::cout << "Read msg at offset " << message->offset() << std::endl;
    if (message->key()) {
      std::cout << "Key: " << *message->key() << std::endl;
    }
    cout << static_cast<const char *>(message->payload()) << endl;
    break;

  case RdKafka::ERR__PARTITION_EOF:
    cout << "reach last message" << endl;
    /* Last message */
    if (exit_eof) {
      run = false;
    }
    break;

  case RdKafka::ERR__UNKNOWN_TOPIC:
  case RdKafka::ERR__UNKNOWN_PARTITION:
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
    break;

  default:
    /* Errors */
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
  }
}


class MyConsumeCb : public RdKafka::ConsumeCb {
public:
  void consume_cb (RdKafka::Message &msg, void *opaque) {
    msg_consume(&msg, opaque);
  }
};

static void sigterm (int sig) {
  run = false;
}

int main (int argc, char **argv) {
  /*
   * Process kill signal, quit from the loop
   */
  signal(SIGINT, sigterm);
  signal(SIGTERM, sigterm);

  /*
   * Create configuration objects
   */
  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

  /*
   * Set configuration properties
   */
  string brokers = "localhost";
  string errstr;
  global_conf->set("metadata.broker.list", brokers, errstr);

  /*
   * Accept event from RdKafka
   */
  MyEventCb ex_event_cb;
  global_conf->set("event_cb", &ex_event_cb, errstr);

  /*
   * Create consumer using accumulated global configuration.
   */
  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);
  if (!consumer) {
    std::cerr << "Failed to create consumer: " << errstr << std::endl;
    exit(1);
  }

  std::cout << "% Created consumer " << consumer->name() << std::endl;

  /*
   * Create topic handle.
   */
  string topic_name = "test";
  RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
  if (!topic) {
    std::cerr << "Failed to create topic: " << errstr << std::endl;
    exit(1);
  }

  /*
   * Start consumer for topic+partition at start offset
   */
  int32_t partition = 0;
  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
  RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
  if (resp != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to start consumer: " <<
      RdKafka::err2str(resp) << std::endl;
    exit(1);
  }

  /*
   * Consume messages
   */
  MyConsumeCb ex_consume_cb;
  int use_ccb = 0;
  while (run) {
    if (use_ccb) {
      consumer->consume_callback(topic, partition, 1000,
                                 &ex_consume_cb, &use_ccb);
    } else {
      RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
      msg_consume(msg, NULL);
      delete msg;
    }
    consumer->poll(0);
  }  

  /*
   * Stop consumer
   */
  consumer->stop(topic, partition);
  consumer->poll(1000);

  delete topic;
  delete consumer;

  /*
   * Wait for RdKafka to decommission.
   * This is not strictly needed (when check outq_len() above), but
   * allows RdKafka to clean up all its resources before the application
   * exits so that memory profilers such as valgrind wont complain about
   * memory leaks.
   */
  RdKafka::wait_destroyed(5000);

  return 0;
}

3 后续解决

 

3.1 run变量

会被signle处理函数,myconsumer回调函数设置,是否都在一个线程中需要确定?如果在不同的线程中,需要使用atomic进行同步,否则可能会出现脏读、脏写的问题

3.2 读取多个broker

3.3 断点续读

3.4 和zookeeper交互读取元数据

Created: 2016-05-01 Sun 17:39

Validate

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow

猜你喜欢

转载自www.cnblogs.com/skiwnywh/p/10321640.html