package com.paile.kafka.service.impl; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.log4j.Logger; import com.paile.command.MessageConsumerCommand; import com.paile.command.receiver.message.MessageConsumerReceiver; import com.paile.kafka.bean.MessageBean; import com.paile.utils.others.ObjectStreamManager; public class GroupConsumerManager { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private Logger logger = Logger.getLogger(GroupConsumerManager.class); public GroupConsumerManager(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads)throws Exception { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); System.out.println(streams.size()); // executor = Executors.newFixedThreadPool(a_numThreads); // for (final KafkaStream<byte[], byte[]> stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ byte[] bt = it.next().message(); try { Object object = null; try { object = ObjectStreamManager.getInstance().toObject(bt); if(object!=null&&object instanceof MessageBean){ //接收到消息 把消息封装成MessageConsumerCommand命令,交后续执行 MessageBean bean = (MessageBean) object; MessageConsumerReceiver receiver = new MessageConsumerReceiver(bean); MessageConsumerCommand command = new MessageConsumerCommand(receiver); command.execute(); } } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { throw e; } } } } private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "15000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "192.168.1.101:2181"; String groupId = "pailegroup"; String topic = "paile01"; int threads = Integer.parseInt("1"); GroupConsumerManager example = new GroupConsumerManager(zooKeeper, groupId, topic); System.out.println("--------"); try { example.run(threads); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
Kafka消息接收
猜你喜欢
转载自littie1987.iteye.com/blog/2193277
今日推荐
周排行