kafka java代码编写

上篇,我介绍了如何搭建kafka,但在如何使用kafka上,还没做过去但介绍,大多人都是写改main方法去发送和接受,但我们实际业务肯定不是这样但。我们公司是以一种方式,类似一直启动main然后接受消息的。这里我也还没去具体了解,之后我会再看看,了解之后可能会再写一篇文章出来。

kafka启动之后,需要创建topic和partitions,java代码只能帮你传递消息和接受消息,这创建topic和partitions的工作还是得有你自己亲自完成


topic相当于一个主题,broker相当于服务器。比如创建一个日志的topic,发送日志消息 ,创建一个订单的topic,发送订单信息。

当然topic里还有分区(partition)的概念,由于一个topic可能太大,利用分区更好的接受和发送消息,也可以用不同的consumer去消费一个topic下不同partition里的数据,分区规则可以自定义。




每个partition对应一个索引文件,每个索引文件对应一个日志文件,日志里有seq,稀疏索引的主键。。(万一kafka挂了,可以读这个文件,用二分法找到自己之前所读取的那条kafka数据)


像之前一样,进到kafka目录,cd /home/kafka

执行以下命令

bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181 --partition 3 --replication-factor 3 --topic 'test3' 

然后你跑到 cd /home/kafka/kafka-logs就会看到有3个文件生成了

replication-factor 是在几个broke上生产副本,partition是代表一个topic的分区文件



我这里创建了一个wmtest2的topic,副本数是1 --replication-factor1,就等于没有副本,就一个文件,partition是3,整句话运行完之后是zk会再每个broker上创建一个wmtest2的topic,但每个机器存的分区不一样(如我现在三台虚拟机h1,h2,h3 创建之后,wmtest2-0 0号partition放在h1里,wmtest2-1 1号partition放在h2里,wmtest2-2 2号partition放在h3里)

我这里又多建了几个topic,为了让读者能更深刻的理解


3个分区,2个备份


1个分区,2个备份


2个分区,2个备份


以下是h1服务器的topic情况,我这里是创建了3个log文件,所以topic会分配到上文件里的



以下是h2服务器的topic情况



以下是h3服务器topic情况 




也可以根据命令去查询

cd到kafka目录,然后输入以下命令

bin/kafka-topics.sh --describe --zookeeper h1:2181 --topic 'wmtest2'



Leader: 如果有多个brokerBroker保存同一个topic,那么同时只能有一个Broker负责该topic的读写,其它的Broker作为实时备份。负责读写的Broker称为Leader.

Replicas : 表示该topic的几号分区在哪几个broker中保存

Isr : 表示当前有效的broker, Isr是Replicas的子集


现在杀掉一个broker,操作如下

切到第三台机器,为这里是h3,jps一下

然后杀死进程,再看下kafka topic 'wmtest5'的情况


Isr:现在有效的broker就是2了,3已经没了

本来partition1的leader是3,也变成2了

再启动你就发现,Isr有效的broker中3回来了,但是leader还是2


http://blog.csdn.net/wangjia184/article/details/37921183这篇文章也详细介绍了,我所说的东西,还不错


-----------------------现在开始讲解项目工程


首先建一个maven工程

pom文件如下

   <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>0.9.0.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka_2.10</artifactId>
                  <version>0.9.0.0</version>
              </dependency>

就引入这些就可以了

我这里一开始也按照网上流程,那样搭建,跑是能跑起来,但有些我需要模拟但就跑不起来了

一共要建5个类

package com.wm.utilm2;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by wangmiao on 2017/6/9.
 */
public class ConsumerGroup {
    private List<ConsumerRunnable> consumers;

      public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
         consumers = new ArrayList<ConsumerRunnable>(consumerNum);
         for (int i = 0; i < consumerNum; ++i) {
                 ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
                 consumers.add(consumerThread);
             }
     }

     public void execute() {
         for (ConsumerRunnable task : consumers) {
                 new Thread(task).start();
             }
     }
}
package com.wm.utilm2;

/**
 * Created by wangmiao on 2017/6/9.
 */
public class ConsumerMain {
    public static void main(String[] args) {
                 String brokerList = "h1:9092,h2:9092,h3:9092";
                 String groupId = "testGroup1";
                 String topic = "test3";
                 int consumerNum = 1;

                 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
                 consumerGroup.execute();
             }
}

package com.wm.utilm2;

/**
 * Created by wangmiao on 2017/6/9.
 */
public class ConsumerMainTwo {
    public static void main(String[] args) {
                 String brokerList = "h1:9092,h2:9092,h3:9092";
                 String groupId = "testGroup2";
                 String topic = "test3";
                 int consumerNum = 1;

                 ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
                 consumerGroup.execute();
             }
}

package com.wm.utilm2;


import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.util.CollectionUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * Created by wangmiao on 2017/6/8.
 */
public class ConsumerRunnable implements Runnable {
    // 每个线程维护私有的KafkaConsumer实例
      private final KafkaConsumer<String, String> consumer;

     public ConsumerRunnable(String brokerList, String groupId, String topic) {
                  Properties props = new Properties();
                  props.put("bootstrap.servers", brokerList);
                  props.put("group.id", groupId);
                  props.put("enable.auto.commit", "true");        //本例使用自动提交位移
                  props.put("auto.commit.interval.ms", "1000");
                  props.put("session.timeout.ms", "30000");
                  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                  this.consumer = new KafkaConsumer<String, String>(props);
                  consumer.subscribe(Arrays.asList(topic));   // 本例使用分区副本自动分配策略
              }


      public void run() {
          while (true) {
                  ConsumerRecords<String, String> records = consumer.poll(200);   // 本例使用200ms作为获取超时时间
                  for (ConsumerRecord<String, String> record : records) {
                          // 这里面写处理消息的逻辑,本例中只是简单地打印消息
                          System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
                                           " ------message is :" + new String(record.value()));
                      }
              }
      }
}


package com.wm.util;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

/**
 * Created by wangmiao on 2017/6/8.
 */
public class KafkaProducerTest extends Thread {
    private String topic;
    public KafkaProducerTest(String topic){
        super();
        this.topic = topic;
    }

    @Override
    public void run() {
        KafkaProducer<String,String> producer =createProducer();

        int i=0;
            while(true) {
                String msg = "test wm shuai"+i++;

                ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, msg);
                producer.send(producerRecord);
                System.out.println("send " + msg);
                try {
                    Thread.sleep(5000);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }


    }

    private KafkaProducer createProducer(){
        Properties props = new Properties();
//        props.setProperty("zookeeper.connect","h1:2181,h2:2181,h3:2181");
//        props.setProperty("serializer.class", StringEncoder.class.getName());
//        props.setProperty("metadata.broker.list","h1:9092,h2:9092,h3:9092");
//        props.setProperty("partitioner.class","com.wm.util.PersonalPartition");

        props.put("bootstrap.servers", "h1:9092,h2:9092,h3:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer(props);
    }

    public static  void main(String arg[]){
        new KafkaProducerTest("test1").start();
    }
}


ConsumerMainTwo 和 ConsumerMain 是模拟两个用户组,但每个用户组只有一个用户去消费信息(同时启动两个main方法看下消费消息的情况)

可以看到不同的用户组,可以消费同一个topic信息,

可以改变  consumerNum 来模拟一个用户组里的用户数,但这个用户数不能超过partition的数量,不然又得用户是收不到消息的,因为会被组里的其他用户给消费掉。

当之只有一个用户的时候,消费掉水partition1,2,3但是有两个用户组的时候消息会被分配一个用户消费两条,一个用户消费1条。

如何模拟呢:ConsumerMain的consumerNum设置为1,启动两个consumerNum,看到消费情况,然后再kill掉一个,再看看消费情况。

用户组的几种情况就完美模拟了

猜你喜欢

转载自blog.csdn.net/xiao__miao/article/details/72957638