1.配置文件 consumer.properties #zookeeper地址 zookeeper.connect=master:2181,slave1:2181,slave2:2181 #zookeeper超时时间 zookeeper.connectiontimeout.ms=1000000 #kafka的consumer组 group.id=test-group 2. 组织代码 import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class init { public static void main(String[] args) throws FileNotFoundException, IOException { //1、读取配置文件 Properties p = new Properties(); //p.load(new FileInputStream(new File("./consumer.properties"))); p.load(init.class.getClassLoader().getResourceAsStream("consumer.properties")); //2、通过配置文件,创建消费者配置 ConsumerConfig config = new ConsumerConfig(p); //3、通过配置,创建消费者 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config); //4、创建封装消息的map Map<String,Integer> topics = new HashMap<String,Integer>(); //5、封装消费的topic名字和消费这个topic中几个partition topics.put("test-topic", 1); //6、创建消息流,传入刚刚创建的map Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topics); //7、获取消息,封装到list中 List<KafkaStream<byte[], byte[]>> partitions = streams.get("test-topic"); //8、创建线程池,用不同的线程消费list中的不同消息,可以消费一个,也可以消费多个 ExecutorService threadPool = Executors.newFixedThreadPool(1); for(KafkaStream<byte[], byte[]> partition : partitions){ //9、将消息传到线程中消费 threadPool.execute(new TestConsumer(partition)); } } } 3.调用显示代码 import java.io.UnsupportedEncodingException; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; public class TestConsumer extends Thread { private KafkaStream<byte[], byte[]> partition; /** * 构造函数,传进来消息 */ public TestConsumer(KafkaStream<byte[], byte[]> partition){ this.partition = partition; } public void run() { //1、取出消息 ConsumerIterator<byte[], byte[]> iterator = partition.iterator(); //2、判断是否有下一条,有的话迭代消息,没有的话,停止等待 while(iterator.hasNext()){ //3、取出消息 MessageAndMetadata<byte[], byte[]> next = iterator.next(); try { //4、打印消息 System.out.println("partiton:" + next.partition());//partition System.out.println("offset:" + next.offset()); //偏移量 System.out.println("message:" + new String(next.message(), "utf-8"));//消息主体 } catch (UnsupportedEncodingException e){ e.printStackTrace(); } } }
kafka消费者实例
猜你喜欢
转载自houston123.iteye.com/blog/2317477
今日推荐
周排行