kafka2.0-入门demo_03

上一篇文章简单的讲了搭建kafka的环境以及如何启动。

现在来讲讲kafka的简单使用,然后使用java写个小的测试程序。

首先假设你已经启动了kafka集群,我们来创建一个topic,使用如下命令:

bin/kafka-topics.sh --create --zookeeper 192.168.1.3:2181,192.168.1.128:2181,192.168.130:2181 --replication-factor 3 --partitions 1 --topic mr-yang

192.168.1.3:2181,192.168.1.128:2181,192.168.130:2181是我的zk集群IP,mr-yang为topic名称。

然后我们使用如下命令查看topic的一些信息:

bin/kafka-topics.sh --describe --zookeeper 192.168.1.3:2181,192.168.1.128:2181,192.168.130:2181 --topic mr-yang

结果如下:
这里写图片描述

以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。因为目前我们只有一个partition,因此关于partition的信息只有一行。

leader节点负责给定partition的所有读写请求。如果一个topic有多个partitions,那么每个节点都会其中一部分partition的leader。

replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。

isr 是replicas的一个子集,它只列出当前还存活着的,并且备份了该partition的节点。

现在开始使用java编写测试程序,首先引入maven依赖。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

程序分为两部分,一个是生产,一个是消费者,请先启动消费者程序,如下:

生产者代码:

/**
 * kafka生产者
 * @author yangyaming
 */
public class MyProducer {

    public static void main(String args[]){
         Properties props = new Properties();
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String, String> producer = new KafkaProducer<>(props);
         for (int i = 0; i < 100; i++){
             /**
              * ProducerRecord 参数解析
              * 第一个:mr-yang为生产者 topic名称,
              * 第二个:对于生产者kafka2.0需要你指定一个key,在企业应用中,我们一般会把他当做businessId来用,比如订单ID,用户ID等等。
              * 第三个:消息的主要信息
              */
             producer.send(new ProducerRecord<String, String>("mr-yang", Integer.toString(i), Integer.toString(i)));
         }

         producer.close();
    }
}

消费者代码如下,具体详细注释见代码注释

/**
 * kafka消费者
 */
public class MyConsumer {

    public  static void main(String args[]){
         Properties props = new Properties();
         /** kafka集群ip **/
         props.put("bootstrap.servers", "192.168.1.3:9092,192.168.1.128:9092,192.168.1.130:9092");

         /** 由于kafka有消费组的概念,每个消费者要制定一个group **/
         props.put("group.id", "test");

         /** 我们这里使用kafka high level模式,自动提交offset**/
         props.put("enable.auto.commit", "true");

         /** 自动提交的时间间隔 **/
         props.put("auto.commit.interval.ms", "1000");

         /** 制定key合value的序列化方式,这里制定的是kafka的字符串序列化 **/
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("mr-yang"));
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         }
    }
}

程序运行结果:
这里写图片描述

具体的java文档如下:
kafka2.0 producer javadoc
http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

kafka2.0 consumer javadoc
http://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

猜你喜欢

转载自blog.csdn.net/u014801432/article/details/82466897